提交 d0bef031 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/update

...@@ -35,7 +35,7 @@ TDengine相对于通用数据库,有超高的压缩比,在绝大多数场景 ...@@ -35,7 +35,7 @@ TDengine相对于通用数据库,有超高的压缩比,在绝大多数场景
Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable
``` ```
示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000\*128\*24\*60/15*365 = 44851T。TDengine大概需要消耗44851/5=8970T, 8.9P空间。 示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000\*128\*24\*60/15*365 = 44.8512T。TDengine大概需要消耗44.851/5=8.97024T空间。
用户可以通过参数keep,设置数据在磁盘中的最大保存时长。为进一步减少存储成本,TDengine还提供多级存储,最冷的数据可以存放在最廉价的存储介质上,应用的访问不用做任何调整,只是读取速度降低了。 用户可以通过参数keep,设置数据在磁盘中的最大保存时长。为进一步减少存储成本,TDengine还提供多级存储,最冷的数据可以存放在最廉价的存储介质上,应用的访问不用做任何调整,只是读取速度降低了。
......
...@@ -38,9 +38,9 @@ ...@@ -38,9 +38,9 @@
6. 检查防火墙设置,确认TCP/UDP 端口6030-6042 是打开的 6. 检查防火墙设置,确认TCP/UDP 端口6030-6042 是打开的
7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/lib/taos*里, 并且*/usr/local/lib/taos*在系统库函数搜索路径*LD_LIBRARY_PATH* 7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/taos/driver*里, 并且*/usr/local/taos/driver*在系统库函数搜索路径*LD_LIBRARY_PATH*
8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*driver/c/taos.dll*在你的系统搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*) 8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*C:\TDengine\driver\taos.dll*在你的系统库函数搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*)
9. 如果仍不能排除连接故障,请使用命令行工具nc来分别判断指定端口的TCP和UDP连接是否通畅 9. 如果仍不能排除连接故障,请使用命令行工具nc来分别判断指定端口的TCP和UDP连接是否通畅
检查UDP端口连接是否工作:`nc -vuz {hostIP} {port} ` 检查UDP端口连接是否工作:`nc -vuz {hostIP} {port} `
......
...@@ -121,8 +121,11 @@ function install_config() { ...@@ -121,8 +121,11 @@ function install_config() {
echo -e -n "${GREEN}Enter FQDN:port (like h1.taosdata.com:6030) of an existing TDengine cluster node to join${NC}" echo -e -n "${GREEN}Enter FQDN:port (like h1.taosdata.com:6030) of an existing TDengine cluster node to join${NC}"
echo echo
echo -e -n "${GREEN}OR leave it blank to build one${NC}:" echo -e -n "${GREEN}OR leave it blank to build one${NC}:"
read firstEp #read firstEp
while true; do if exec < /dev/tty; then
read firstEp;
fi
while true; do
if [ ! -z "$firstEp" ]; then if [ ! -z "$firstEp" ]; then
# check the format of the firstEp # check the format of the firstEp
#if [[ $firstEp == $FQDN_PATTERN ]]; then #if [[ $firstEp == $FQDN_PATTERN ]]; then
......
...@@ -4427,8 +4427,8 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { ...@@ -4427,8 +4427,8 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema* pSchema) { int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema* pSchema) {
const char* msg0 = "only support order by primary timestamp"; const char* msg0 = "only support order by primary timestamp";
const char* msg1 = "invalid column name"; const char* msg1 = "invalid column name";
const char* msg2 = "only support order by primary timestamp and queried column"; const char* msg2 = "only support order by primary timestamp or queried column";
const char* msg3 = "only support order by primary timestamp and first tag in groupby clause"; const char* msg3 = "only support order by primary timestamp or first tag in groupby clause";
setDefaultOrderInfo(pQueryInfo); setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......
...@@ -268,7 +268,7 @@ static void dnodeGetEmail(char* filepath) { ...@@ -268,7 +268,7 @@ static void dnodeGetEmail(char* filepath) {
return; return;
} }
if (taosTRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) { if (taosRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno)); dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
} }
......
...@@ -51,6 +51,7 @@ void walCleanUp(); ...@@ -51,6 +51,7 @@ void walCleanUp();
twalh walOpen(char *path, SWalCfg *pCfg); twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh pWal, SWalCfg *pCfg); int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walStop(twalh);
void walClose(twalh); void walClose(twalh);
int32_t walRenew(twalh); int32_t walRenew(twalh);
int32_t walWrite(twalh, SWalHead *); int32_t walWrite(twalh, SWalHead *);
......
...@@ -244,7 +244,7 @@ int32_t shellRunCommand(TAOS* con, char* command) { ...@@ -244,7 +244,7 @@ int32_t shellRunCommand(TAOS* con, char* command) {
} }
*p++ = c; *p++ = c;
if (c == ';') { if (c == ';' && quote == 0) {
c = *p; c = *p;
*p = 0; *p = 0;
if (shellRunSingleCommand(con, cmd) < 0) { if (shellRunSingleCommand(con, cmd) < 0) {
......
...@@ -52,9 +52,10 @@ extern "C" { ...@@ -52,9 +52,10 @@ extern "C" {
#include "osWindows.h" #include "osWindows.h"
#endif #endif
#include "osDef.h"
#include "osAlloc.h"
#include "osAtomic.h" #include "osAtomic.h"
#include "osCommon.h" #include "osCommon.h"
#include "osDef.h"
#include "osDir.h" #include "osDir.h"
#include "osFile.h" #include "osFile.h"
#include "osLz4.h" #include "osLz4.h"
......
...@@ -13,16 +13,14 @@ ...@@ -13,16 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_UTIL_ALLOC_H #ifndef TDENGINE_OS_ALLOC_H
#define TDENGINE_UTIL_ALLOC_H #define TDENGINE_OS_ALLOC_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#define TSDB_USE_SYS_MEM #ifndef TAOS_OS_FUNC_ALLOC
#ifdef TSDB_USE_SYS_MEM
#define tmalloc(size) malloc(size) #define tmalloc(size) malloc(size)
#define tcalloc(size) calloc(1, size) #define tcalloc(size) calloc(1, size)
#define trealloc(p, size) realloc(p, size) #define trealloc(p, size) realloc(p, size)
......
...@@ -72,8 +72,6 @@ extern "C" { ...@@ -72,8 +72,6 @@ extern "C" {
#include <sys/utsname.h> #include <sys/utsname.h>
#define TAOS_OS_FUNC_FILE_SENDIFLE #define TAOS_OS_FUNC_FILE_SENDIFLE
#define taosFSendFile(outfile, infile, offset, count) taosFSendFileImp(outfile, infile, offset, size)
#define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
#define TAOS_OS_FUNC_SEMPHONE #define TAOS_OS_FUNC_SEMPHONE
#define tsem_t dispatch_semaphore_t #define tsem_t dispatch_semaphore_t
......
...@@ -20,20 +20,26 @@ ...@@ -20,20 +20,26 @@
extern "C" { extern "C" {
#endif #endif
ssize_t taosTReadImp(int fd, void *buf, size_t count); #define tread(fd, buf, count) read(fd, buf, count)
ssize_t taosTWriteImp(int fd, void *buf, size_t count); #define twrite(fd, buf, count) write(fd, buf, count)
#define tlseek(fd, offset, whence) lseek(fd, offset, whence)
#define tclose(fd) \
{ \
if (FD_VALID(fd)) { \
close(fd); \
fd = FD_INITIALIZER; \
} \
}
ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size); int64_t taosRead(int32_t fd, void *buf, int64_t count);
int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count); int64_t taosWrite(int32_t fd, void *buf, int64_t count);
int64_t taosLSeek(int32_t fd, int64_t offset, int32_t whence);
int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath);
#define taosClose(x) tclose(x)
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE // TAOS_OS_FUNC_FILE_SENDIFLE
#define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size) int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size);
#define taosFSendFile(outfile, infile, offset, count) taosTSendFileImp(fileno(outfile), fileno(infile), offset, size) int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size);
#endif
#define taosTRead(fd, buf, count) taosTReadImp(fd, buf, count)
#define taosTWrite(fd, buf, count) taosTWriteImp(fd, buf, count)
#define taosLSeek(fd, offset, whence) lseek(fd, offset, whence)
#ifdef TAOS_RANDOM_FILE_FAIL #ifdef TAOS_RANDOM_FILE_FAIL
void taosSetRandomFileFailFactor(int factor); void taosSetRandomFileFailFactor(int factor);
...@@ -42,24 +48,20 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t ...@@ -42,24 +48,20 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t
ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line); ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line);
ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line); ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line);
off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, uint32_t line); off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, uint32_t line);
#undef taosTRead #undef taosRead
#undef taosTWrite #undef taosWrite
#undef taosLSeek #undef taosLSeek
#define taosTRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__) #define taosRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosTWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__) #define taosWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosLSeek(fd, offset, whence) taosLSeekRandomFail(fd, offset, whence, __FILE__, __LINE__) #define taosLSeek(fd, offset, whence) taosLSeekRandomFail(fd, offset, whence, __FILE__, __LINE__)
#endif #endif
#endif #endif
int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstPath);
// TAOS_OS_FUNC_FILE_GETTMPFILEPATH // TAOS_OS_FUNC_FILE_GETTMPFILEPATH
void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath); void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath);
#ifndef TAOS_OS_FUNC_FILE_FTRUNCATE // TAOS_OS_FUNC_FILE_FTRUNCATE
#define taosFtruncate ftruncate int32_t taosFtruncate(int32_t fd, int64_t length);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -33,21 +33,19 @@ extern "C" { ...@@ -33,21 +33,19 @@ extern "C" {
x = FD_INITIALIZER; \ x = FD_INITIALIZER; \
} \ } \
} }
typedef int SOCKET; typedef int32_t SOCKET;
#endif #endif
#ifndef TAOS_OS_DEF_EPOLL #ifndef TAOS_OS_DEF_EPOLL
#define TAOS_EPOLL_WAIT_TIME -1 #define TAOS_EPOLL_WAIT_TIME -1
#endif #endif
#define taosClose(x) taosCloseSocket(x)
#ifdef TAOS_RANDOM_NETWORK_FAIL #ifdef TAOS_RANDOM_NETWORK_FAIL
#ifdef TAOS_RANDOM_NETWORK_FAIL_TEST #ifdef TAOS_RANDOM_NETWORK_FAIL_TEST
ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags); ssize_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags);
ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen); ssize_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count); ssize_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count);
ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count); ssize_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count);
#undef taosSend #undef taosSend
#undef taosSendto #undef taosSendto
#undef taosReadSocket #undef taosReadSocket
...@@ -60,14 +58,14 @@ extern "C" { ...@@ -60,14 +58,14 @@ extern "C" {
#endif #endif
// TAOS_OS_FUNC_SOCKET // TAOS_OS_FUNC_SOCKET
int taosSetNonblocking(SOCKET sock, int on); int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosBlockSIGPIPE(); void taosBlockSIGPIPE();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT // TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen); int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
// TAOS_OS_FUNC_SOCKET_INET // TAOS_OS_FUNC_SOCKET_INET
uint32_t taosInetAddr(char *ipAddr); uint32_t taosInetAddr(char *ipAddr);
const char *taosInetNtoa(struct in_addr ipInt); const char *taosInetNtoa(struct in_addr ipInt);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -62,11 +62,8 @@ extern "C" { ...@@ -62,11 +62,8 @@ extern "C" {
#define TAOS_OS_FUNC_FILE_ISDIR #define TAOS_OS_FUNC_FILE_ISDIR
#define TAOS_OS_FUNC_FILE_ISLNK #define TAOS_OS_FUNC_FILE_ISLNK
#define TAOS_OS_FUNC_FILE_SENDIFLE #define TAOS_OS_FUNC_FILE_SENDIFLE
#define taosFSendFile(outfile, infile, offset, count) taosFSendFileImp(outfile, infile, offset, size)
#define taosTSendFile(dfd, sfd, offset, size) taosTSendFileImp(dfd, sfd, offset, size)
#define TAOS_OS_FUNC_FILE_GETTMPFILEPATH #define TAOS_OS_FUNC_FILE_GETTMPFILEPATH
#define TAOS_OS_FUNC_FILE_FTRUNCATE #define TAOS_OS_FUNC_FILE_FTRUNCATE
extern int taosFtruncate(int fd, int64_t length);
#define TAOS_OS_FUNC_MATH #define TAOS_OS_FUNC_MATH
#define SWAP(a, b, c) \ #define SWAP(a, b, c) \
...@@ -139,7 +136,6 @@ typedef int (*__compar_fn_t)(const void *, const void *); ...@@ -139,7 +136,6 @@ typedef int (*__compar_fn_t)(const void *, const void *);
#define in_addr_t unsigned long #define in_addr_t unsigned long
#define socklen_t int #define socklen_t int
#define htobe64 htonll #define htobe64 htonll
#define twrite write
#define getpid _getpid #define getpid _getpid
struct tm *localtime_r(const time_t *timep, struct tm *result); struct tm *localtime_r(const time_t *timep, struct tm *result);
......
...@@ -19,21 +19,19 @@ ...@@ -19,21 +19,19 @@
#define _SEND_FILE_STEP_ 1000 #define _SEND_FILE_STEP_ 1000
int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count) { int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t count) {
fseek(in_file, (int32_t)(*offset), 0); fseek(in_file, (int32_t)(*offset), 0);
int writeLen = 0; int writeLen = 0;
uint8_t buffer[_SEND_FILE_STEP_] = { 0 }; uint8_t buffer[_SEND_FILE_STEP_] = {0};
for (int len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) { for (int len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
size_t rlen = fread(buffer, 1, _SEND_FILE_STEP_, in_file); size_t rlen = fread(buffer, 1, _SEND_FILE_STEP_, in_file);
if (rlen <= 0) { if (rlen <= 0) {
return writeLen; return writeLen;
} } else if (rlen < _SEND_FILE_STEP_) {
else if (rlen < _SEND_FILE_STEP_) {
fwrite(buffer, 1, rlen, out_file); fwrite(buffer, 1, rlen, out_file);
return (int)(writeLen + rlen); return (int)(writeLen + rlen);
} } else {
else {
fwrite(buffer, 1, _SEND_FILE_STEP_, in_file); fwrite(buffer, 1, _SEND_FILE_STEP_, in_file);
writeLen += _SEND_FILE_STEP_; writeLen += _SEND_FILE_STEP_;
} }
...@@ -44,8 +42,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou ...@@ -44,8 +42,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
size_t rlen = fread(buffer, 1, remain, in_file); size_t rlen = fread(buffer, 1, remain, in_file);
if (rlen <= 0) { if (rlen <= 0) {
return writeLen; return writeLen;
} } else {
else {
fwrite(buffer, 1, remain, out_file); fwrite(buffer, 1, remain, out_file);
writeLen += remain; writeLen += remain;
} }
...@@ -54,7 +51,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou ...@@ -54,7 +51,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
return writeLen; return writeLen;
} }
ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) { int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t* offset, int64_t size) {
uError("not implemented yet"); uError("taosSendFile not implemented yet");
return -1; return -1;
} }
\ No newline at end of file
...@@ -17,10 +17,10 @@ ...@@ -17,10 +17,10 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tulog.h" #include "tulog.h"
#include "talloc.h" #include "osAlloc.h"
#define TSDB_HAVE_MEMALIGN #define TSDB_HAVE_MEMALIGN
#ifndef TSDB_USE_SYS_MEM #ifdef TAOS_OS_FUNC_ALLOC
void *tmalloc(int32_t size) { void *tmalloc(int32_t size) {
void *p = malloc(size); void *p = malloc(size);
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#ifdef TAOS_RANDOM_NETWORK_FAIL #ifdef TAOS_RANDOM_NETWORK_FAIL
ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags) { ssize_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) { if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = ECONNRESET; errno = ECONNRESET;
return -1; return -1;
...@@ -29,7 +29,7 @@ ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags) { ...@@ -29,7 +29,7 @@ ssize_t taosSendRandomFail(int sockfd, const void *buf, size_t len, int flags) {
return send(sockfd, buf, len, flags); return send(sockfd, buf, len, flags);
} }
ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, ssize_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr,
socklen_t addrlen) { socklen_t addrlen) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) { if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = ECONNRESET; errno = ECONNRESET;
...@@ -39,7 +39,7 @@ ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags, ...@@ -39,7 +39,7 @@ ssize_t taosSendToRandomFail(int sockfd, const void *buf, size_t len, int flags,
return sendto(sockfd, buf, len, flags, dest_addr, addrlen); return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
} }
ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count) { ssize_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) { if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = ECONNRESET; errno = ECONNRESET;
return -1; return -1;
...@@ -48,7 +48,7 @@ ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count) { ...@@ -48,7 +48,7 @@ ssize_t taosReadSocketRandomFail(int fd, void *buf, size_t count) {
return read(fd, buf, count); return read(fd, buf, count);
} }
ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count) { ssize_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count) {
if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) { if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
errno = EINTR; errno = EINTR;
return -1; return -1;
...@@ -61,10 +61,10 @@ ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count) { ...@@ -61,10 +61,10 @@ ssize_t taosWriteSocketRandomFail(int fd, const void *buf, size_t count) {
#ifdef TAOS_RANDOM_FILE_FAIL #ifdef TAOS_RANDOM_FILE_FAIL
static int random_file_fail_factor = 20; static int32_t random_file_fail_factor = 20;
static FILE *fpRandomFileFailOutput = NULL; static FILE *fpRandomFileFailOutput = NULL;
void taosSetRandomFileFailFactor(int factor) { void taosSetRandomFileFailFactor(int32_t factor) {
random_file_fail_factor = factor; random_file_fail_factor = factor;
} }
...@@ -77,7 +77,7 @@ static void close_random_file_fail_output() { ...@@ -77,7 +77,7 @@ static void close_random_file_fail_output() {
} }
} }
static void random_file_fail_output_sig(int sig) { static void random_file_fail_output_sig(int32_t sig) {
fprintf(fpRandomFileFailOutput, "signal %d received.\n", sig); fprintf(fpRandomFileFailOutput, "signal %d received.\n", sig);
struct sigaction act = {0}; struct sigaction act = {0};
...@@ -105,7 +105,7 @@ void taosSetRandomFileFailOutput(const char *path) { ...@@ -105,7 +105,7 @@ void taosSetRandomFileFailOutput(const char *path) {
sigaction(SIGILL, &act, NULL); sigaction(SIGILL, &act, NULL);
} }
ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line) { ssize_t taosReadFileRandomFail(int32_t fd, void *buf, size_t count, const char *file, uint32_t line) {
if (random_file_fail_factor > 0) { if (random_file_fail_factor > 0) {
if (rand() % random_file_fail_factor == 0) { if (rand() % random_file_fail_factor == 0) {
errno = EIO; errno = EIO;
...@@ -113,10 +113,10 @@ ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file ...@@ -113,10 +113,10 @@ ssize_t taosReadFileRandomFail(int fd, void *buf, size_t count, const char *file
} }
} }
return taosTReadImp(fd, buf, count); return taosRead(fd, buf, count);
} }
ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *file, uint32_t line) { ssize_t taosWriteFileRandomFail(int32_t fd, void *buf, size_t count, const char *file, uint32_t line) {
if (random_file_fail_factor > 0) { if (random_file_fail_factor > 0) {
if (rand() % random_file_fail_factor == 0) { if (rand() % random_file_fail_factor == 0) {
errno = EIO; errno = EIO;
...@@ -124,10 +124,10 @@ ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *fil ...@@ -124,10 +124,10 @@ ssize_t taosWriteFileRandomFail(int fd, void *buf, size_t count, const char *fil
} }
} }
return taosTWriteImp(fd, buf, count); return taosWrite(fd, buf, count);
} }
off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, uint32_t line) { off_t taosLSeekRandomFail(int32_t fd, off_t offset, int32_t whence, const char *file, uint32_t line) {
if (random_file_fail_factor > 0) { if (random_file_fail_factor > 0) {
if (rand() % random_file_fail_factor == 0) { if (rand() % random_file_fail_factor == 0) {
errno = EIO; errno = EIO;
...@@ -135,7 +135,7 @@ off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, ui ...@@ -135,7 +135,7 @@ off_t taosLSeekRandomFail(int fd, off_t offset, int whence, const char *file, ui
} }
} }
return lseek(fd, offset, whence); return taosLSeek(fd, offset, whence);
} }
#endif //TAOS_RANDOM_FILE_FAIL #endif //TAOS_RANDOM_FILE_FAIL
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "tglobal.h" #include "tglobal.h"
#ifndef TAOS_OS_FUNC_FILE_GETTMPFILEPATH #ifndef TAOS_OS_FUNC_FILE_GETTMPFILEPATH
void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) { void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char *tdengineTmpFileNamePrefix = "tdengine-"; const char *tdengineTmpFileNamePrefix = "tdengine-";
...@@ -39,10 +40,10 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) { ...@@ -39,10 +40,10 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
taosRandStr(rand, tListLen(rand) - 1); taosRandStr(rand, tListLen(rand) - 1);
snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand); snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand);
} }
#endif #endif
// rename file name int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath) {
int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstPath) {
int32_t ts = taosGetTimestampSec(); int32_t ts = taosGetTimestampSec();
char fname[PATH_MAX] = {0}; // max file name length must be less than 255 char fname[PATH_MAX] = {0}; // max file name length must be less than 255
...@@ -51,12 +52,13 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP ...@@ -51,12 +52,13 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
if (delimiterPos == NULL) return -1; if (delimiterPos == NULL) return -1;
int32_t fileNameLen = 0; int32_t fileNameLen = 0;
if (suffix) if (suffix) {
fileNameLen = snprintf(fname, PATH_MAX, "%s.%d.%s", delimiterPos + 1, ts, suffix); fileNameLen = snprintf(fname, PATH_MAX, "%s.%d.%s", delimiterPos + 1, ts, suffix);
else } else {
fileNameLen = snprintf(fname, PATH_MAX, "%s.%d", delimiterPos + 1, ts); fileNameLen = snprintf(fname, PATH_MAX, "%s.%d", delimiterPos + 1, ts);
}
size_t len = (size_t)((delimiterPos - fullPath) + fileNameLen + 1); int32_t len = (int32_t)((delimiterPos - fullPath) + fileNameLen + 1);
if (*dstPath == NULL) { if (*dstPath == NULL) {
*dstPath = calloc(1, len + 1); *dstPath = calloc(1, len + 1);
if (*dstPath == NULL) return -1; if (*dstPath == NULL) return -1;
...@@ -69,9 +71,9 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP ...@@ -69,9 +71,9 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
return rename(fullPath, *dstPath); return rename(fullPath, *dstPath);
} }
ssize_t taosTReadImp(int fd, void *buf, size_t count) { int64_t taosRead(int32_t fd, void *buf, int64_t count) {
size_t leftbytes = count; int64_t leftbytes = count;
ssize_t readbytes; int64_t readbytes;
char * tbuf = (char *)buf; char * tbuf = (char *)buf;
while (leftbytes > 0) { while (leftbytes > 0) {
...@@ -83,19 +85,19 @@ ssize_t taosTReadImp(int fd, void *buf, size_t count) { ...@@ -83,19 +85,19 @@ ssize_t taosTReadImp(int fd, void *buf, size_t count) {
return -1; return -1;
} }
} else if (readbytes == 0) { } else if (readbytes == 0) {
return (ssize_t)(count - leftbytes); return (int64_t)(count - leftbytes);
} }
leftbytes -= readbytes; leftbytes -= readbytes;
tbuf += readbytes; tbuf += readbytes;
} }
return (ssize_t)count; return count;
} }
ssize_t taosTWriteImp(int fd, void *buf, size_t n) { int64_t taosWrite(int32_t fd, void *buf, int64_t n) {
size_t nleft = n; int64_t nleft = n;
ssize_t nwritten = 0; int64_t nwritten = 0;
char * tbuf = (char *)buf; char * tbuf = (char *)buf;
while (nleft > 0) { while (nleft > 0) {
...@@ -110,13 +112,14 @@ ssize_t taosTWriteImp(int fd, void *buf, size_t n) { ...@@ -110,13 +112,14 @@ ssize_t taosTWriteImp(int fd, void *buf, size_t n) {
tbuf += nwritten; tbuf += nwritten;
} }
return (ssize_t)n; return n;
} }
#ifndef TAOS_OS_FUNC_FILE_SENDIFLE #ifndef TAOS_OS_FUNC_FILE_SENDIFLE
ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
size_t leftbytes = size; int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) {
ssize_t sentbytes; int64_t leftbytes = size;
int64_t sentbytes;
while (leftbytes > 0) { while (leftbytes > 0) {
/* /*
...@@ -131,7 +134,7 @@ ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) { ...@@ -131,7 +134,7 @@ ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
return -1; return -1;
} }
} else if (sentbytes == 0) { } else if (sentbytes == 0) {
return (ssize_t)(size - leftbytes); return (int64_t)(size - leftbytes);
} }
leftbytes -= sentbytes; leftbytes -= sentbytes;
...@@ -139,4 +142,17 @@ ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) { ...@@ -139,4 +142,17 @@ ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) {
return size; return size;
} }
int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size) {
return taosSendFile(fileno(outfile), fileno(infile), offset, size);
}
#endif
#ifndef TAOS_OS_FUNC_FILE_FTRUNCATE
int32_t taosFtruncate(int32_t fd, int64_t length) {
return ftruncate(fd, length);
}
#endif #endif
\ No newline at end of file
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
#ifndef TAOS_OS_FUNC_SOCKET #ifndef TAOS_OS_FUNC_SOCKET
int taosSetNonblocking(SOCKET sock, int on) { int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
int flags = 0; int32_t flags = 0;
if ((flags = fcntl(sock, F_GETFL, 0)) < 0) { if ((flags = fcntl(sock, F_GETFL, 0)) < 0) {
uError("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno)); uError("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno));
return 1; return 1;
...@@ -43,7 +43,7 @@ void taosBlockSIGPIPE() { ...@@ -43,7 +43,7 @@ void taosBlockSIGPIPE() {
sigset_t signal_mask; sigset_t signal_mask;
sigemptyset(&signal_mask); sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE); sigaddset(&signal_mask, SIGPIPE);
int rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (rc != 0) { if (rc != 0) {
uError("failed to block SIGPIPE"); uError("failed to block SIGPIPE");
} }
...@@ -53,7 +53,7 @@ void taosBlockSIGPIPE() { ...@@ -53,7 +53,7 @@ void taosBlockSIGPIPE() {
#ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT #ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) { int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
} }
......
...@@ -43,19 +43,19 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) { ...@@ -43,19 +43,19 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) {
#define _SEND_FILE_STEP_ 1000 #define _SEND_FILE_STEP_ 1000
int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t count) { int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t count) {
fseek(in_file, (int32_t)(*offset), 0); fseek(in_file, (int32_t)(*offset), 0);
int writeLen = 0; int64_t writeLen = 0;
uint8_t buffer[_SEND_FILE_STEP_] = { 0 }; uint8_t buffer[_SEND_FILE_STEP_] = { 0 };
for (int len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) { for (int64_t len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
size_t rlen = fread(buffer, 1, _SEND_FILE_STEP_, in_file); size_t rlen = fread(buffer, 1, _SEND_FILE_STEP_, in_file);
if (rlen <= 0) { if (rlen <= 0) {
return writeLen; return writeLen;
} }
else if (rlen < _SEND_FILE_STEP_) { else if (rlen < _SEND_FILE_STEP_) {
fwrite(buffer, 1, rlen, out_file); fwrite(buffer, 1, rlen, out_file);
return (int)(writeLen + rlen); return (int64_t)(writeLen + rlen);
} }
else { else {
fwrite(buffer, 1, _SEND_FILE_STEP_, in_file); fwrite(buffer, 1, _SEND_FILE_STEP_, in_file);
...@@ -63,7 +63,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou ...@@ -63,7 +63,7 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
} }
} }
int remain = count - writeLen; int64_t remain = count - writeLen;
if (remain > 0) { if (remain > 0) {
size_t rlen = fread(buffer, 1, remain, in_file); size_t rlen = fread(buffer, 1, remain, in_file);
if (rlen <= 0) { if (rlen <= 0) {
...@@ -78,12 +78,12 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou ...@@ -78,12 +78,12 @@ int taosFSendFileImp(FILE* out_file, FILE* in_file, int64_t* offset, int32_t cou
return writeLen; return writeLen;
} }
ssize_t taosTSendFileImp(int dfd, int sfd, off_t *offset, size_t size) { int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t* offset, int64_t size) {
uError("taosTSendFileImp no implemented yet"); uError("taosSendFile no implemented yet");
return 0; return 0;
} }
int taosFtruncate(int fd, int64_t length) { int32_t taosFtruncate(int32_t fd, int64_t length) {
uError("taosFtruncate no implemented yet"); uError("taosFtruncate no implemented yet");
return 0; return 0;
} }
\ No newline at end of file
...@@ -34,7 +34,7 @@ void taosWinSocketInit() { ...@@ -34,7 +34,7 @@ void taosWinSocketInit() {
} }
} }
int taosSetNonblocking(SOCKET sock, int on) { int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
u_long mode; u_long mode;
if (on) { if (on) {
mode = 1; mode = 1;
...@@ -48,7 +48,7 @@ int taosSetNonblocking(SOCKET sock, int on) { ...@@ -48,7 +48,7 @@ int taosSetNonblocking(SOCKET sock, int on) {
void taosBlockSIGPIPE() {} void taosBlockSIGPIPE() {}
int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) { int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
if (level == SOL_SOCKET && optname == TCP_KEEPCNT) { if (level == SOL_SOCKET && optname == TCP_KEEPCNT) {
return 0; return 0;
} }
...@@ -72,7 +72,7 @@ int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int op ...@@ -72,7 +72,7 @@ int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int op
uint32_t taosInetAddr(char *ipAddr) { uint32_t taosInetAddr(char *ipAddr) {
uint32_t value; uint32_t value;
int ret = inet_pton(AF_INET, ipAddr, &value); int32_t ret = inet_pton(AF_INET, ipAddr, &value);
if (ret <= 0) { if (ret <= 0) {
return INADDR_NONE; return INADDR_NONE;
} else { } else {
......
...@@ -772,7 +772,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) { ...@@ -772,7 +772,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
int64_t offset = getDataStartOffset(); int64_t offset = getDataStartOffset();
int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset; int32_t size = (int32_t)pSrcBuf->fileSize - (int32_t)offset;
ssize_t rc = taosFSendFile(pDestBuf->f, pSrcBuf->f, &offset, size); int64_t rc = taosFSendFile(pDestBuf->f, pSrcBuf->f, &offset, size);
if (rc == -1) { if (rc == -1) {
// tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); // tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno));
......
...@@ -149,7 +149,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -149,7 +149,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
int sfd = open(name, O_RDONLY); int sfd = open(name, O_RDONLY);
if (sfd < 0) break; if (sfd < 0) break;
ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd); close(sfd);
if (ret < 0) break; if (ret < 0) break;
...@@ -406,7 +406,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -406,7 +406,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
int sfd = open(fname, O_RDONLY); int sfd = open(fname, O_RDONLY);
if (sfd < 0) break; if (sfd < 0) break;
code = taosTSendFile(pPeer->syncFd, sfd, NULL, size); code = taosSendFile(pPeer->syncFd, sfd, NULL, size);
close(sfd); close(sfd);
if (code < 0) break; if (code < 0) break;
......
...@@ -428,7 +428,7 @@ int tsdbUpdateFileHeader(SFile *pFile) { ...@@ -428,7 +428,7 @@ int tsdbUpdateFileHeader(SFile *pFile) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (taosTWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { if (taosWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno)); tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -493,7 +493,7 @@ int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) { ...@@ -493,7 +493,7 @@ int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
return -1; return -1;
} }
if (taosTRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { if (taosRead(pFile->fd, buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to read file %s header part with %d bytes, reason:%s", pFile->fname, TSDB_FILE_HEAD_SIZE, tsdbError("failed to read file %s header part with %d bytes, reason:%s", pFile->fname, TSDB_FILE_HEAD_SIZE,
strerror(errno)); strerror(errno));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
......
...@@ -582,7 +582,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) { ...@@ -582,7 +582,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
if (taosTWrite(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { if (taosWrite(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", pCfg->tsdbId, TSDB_FILE_HEAD_SIZE, fname, tsdbError("vgId:%d failed to write %d bytes to file %s since %s", pCfg->tsdbId, TSDB_FILE_HEAD_SIZE, fname,
strerror(errno)); strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -623,7 +623,7 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) { ...@@ -623,7 +623,7 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
goto _err; goto _err;
} }
if (taosTRead(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { if (taosRead(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to read %d bytes from file %s since %s", TSDB_FILE_HEAD_SIZE, fname, strerror(errno)); tsdbError("failed to read %d bytes from file %s since %s", TSDB_FILE_HEAD_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
......
...@@ -338,7 +338,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ...@@ -338,7 +338,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
return -1; return -1;
} }
if (taosTSendFile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) { if (taosSendFile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) {
tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo), tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo),
helperLastF(pHelper)->fname, helperNewLastF(pHelper)->fname, strerror(errno)); helperLastF(pHelper)->fname, helperNewLastF(pHelper)->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -383,7 +383,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) { ...@@ -383,7 +383,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
pIdx->tid = pHelper->tableInfo.tid; pIdx->tid = pHelper->tableInfo.tid;
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE); ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
if (taosTWrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < (int)pIdx->len) { if (taosWrite(pFile->fd, (void *)(pHelper->pCompInfo), pIdx->len) < (int)pIdx->len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len, tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
pFile->fname, strerror(errno)); pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -435,7 +435,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { ...@@ -435,7 +435,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT(offset == pFile->info.size); ASSERT(offset == pFile->info.size);
if (taosTWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < (int)pFile->info.len) { if (taosWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < (int)pFile->info.len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
pFile->fname, strerror(errno)); pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -457,7 +457,7 @@ int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffe ...@@ -457,7 +457,7 @@ int tsdbLoadCompIdxImpl(SFile *pFile, uint32_t offset, uint32_t len, void *buffe
return -1; return -1;
} }
if (taosTRead(pFile->fd, buffer, len) < len) { if (taosRead(pFile->fd, buffer, len) < len) {
tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, offset, len, tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, offset, len,
strerror(errno)); strerror(errno));
terrno = TSDB_CODE_TDB_FILE_CORRUPTED; terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
...@@ -554,7 +554,7 @@ int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) { ...@@ -554,7 +554,7 @@ int tsdbLoadCompInfoImpl(SFile *pFile, SCompIdx *pIdx, SCompInfo **ppCompInfo) {
return -1; return -1;
} }
if (taosTRead(pFile->fd, (void *)(*ppCompInfo), pIdx->len) < (int)pIdx->len) { if (taosRead(pFile->fd, (void *)(*ppCompInfo), pIdx->len) < (int)pIdx->len) {
tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, pIdx->offset, pIdx->len, tsdbError("%s: read file %s offset %u len %u failed since %s", prefixMsg, pFile->fname, pIdx->offset, pIdx->len,
strerror(errno)); strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -611,7 +611,7 @@ int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { ...@@ -611,7 +611,7 @@ int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
return -1; return -1;
} }
if (taosTRead(pFile->fd, (void *)pHelper->pCompData, tsize) < tsize) { if (taosRead(pFile->fd, (void *)pHelper->pCompData, tsize) < tsize) {
tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname, tsdbError("vgId:%d failed to read %" PRIzu " bytes from file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname,
strerror(errno)); strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -826,7 +826,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -826,7 +826,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
sizeof(TSCKSUM)); sizeof(TSCKSUM));
// Write the whole block to file // Write the whole block to file
if (taosTWrite(pFile->fd, (void *)pCompData, lsize) < lsize) { if (taosWrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, pFile->fname, tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, pFile->fname,
strerror(errno)); strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -1252,7 +1252,7 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl ...@@ -1252,7 +1252,7 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
return -1; return -1;
} }
if (taosTRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) { if (taosRead(pFile->fd, pHelper->pBuffer, pCompCol->len) < pCompCol->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname, tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompCol->len, pFile->fname,
strerror(errno)); strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -1367,7 +1367,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa ...@@ -1367,7 +1367,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (taosTRead(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) { if (taosRead(fd, (void *)pCompData, pCompBlock->len) < pCompBlock->len) {
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompBlock->len, tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pCompBlock->len,
pFile->fname, strerror(errno)); pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
......
...@@ -20,21 +20,21 @@ ...@@ -20,21 +20,21 @@
extern "C" { extern "C" {
#endif #endif
int taosReadn(SOCKET sock, char *buffer, int len); int32_t taosReadn(SOCKET sock, char *buffer, int32_t len);
int taosWriteMsg(SOCKET fd, void *ptr, int nbytes); int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes);
int taosReadMsg(SOCKET fd, void *ptr, int nbytes); int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes);
int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes); int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes);
int taosCopyFds(SOCKET sfd, SOCKET dfd, int64_t len); int32_t taosCopyFds(SOCKET sfd, SOCKET dfd, int64_t len);
int taosSetNonblocking(SOCKET sock, int on); int32_t taosSetNonblocking(SOCKET sock, int32_t on);
SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port); SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int taosKeepTcpAlive(SOCKET sockFd); int32_t taosKeepTcpAlive(SOCKET sockFd);
int taosGetFqdn(char *); int32_t taosGetFqdn(char *);
uint32_t taosGetIpFromFqdn(const char *); uint32_t taosGetIpFromFqdn(const char *);
void tinet_ntoa(char *ipstr, unsigned int ip); void tinet_ntoa(char *ipstr, uint32_t ip);
uint32_t ip2uint(const char *const ip_addr); uint32_t ip2uint(const char *const ip_addr);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -188,7 +188,7 @@ int tdKVStoreStartCommit(SKVStore *pStore) { ...@@ -188,7 +188,7 @@ int tdKVStoreStartCommit(SKVStore *pStore) {
goto _err; goto _err;
} }
if (taosTSendFile(pStore->sfd, pStore->fd, NULL, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { if (taosSendFile(pStore->sfd, pStore->fd, NULL, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to send file %d bytes since %s", TD_KVSTORE_HEADER_SIZE, strerror(errno)); uError("failed to send file %d bytes since %s", TD_KVSTORE_HEADER_SIZE, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
...@@ -248,13 +248,13 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe ...@@ -248,13 +248,13 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
ASSERT(tlen == POINTER_DISTANCE(pBuf, buf)); ASSERT(tlen == POINTER_DISTANCE(pBuf, buf));
ASSERT(tlen == sizeof(SKVRecord)); ASSERT(tlen == sizeof(SKVRecord));
if (taosTWrite(pStore->fd, buf, tlen) < tlen) { if (taosWrite(pStore->fd, buf, tlen) < tlen) {
uError("failed to write %d bytes to file %s since %s", tlen, pStore->fname, strerror(errno)); uError("failed to write %d bytes to file %s since %s", tlen, pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (taosTWrite(pStore->fd, cont, contLen) < contLen) { if (taosWrite(pStore->fd, cont, contLen) < contLen) {
uError("failed to write %d bytes to file %s since %s", contLen, pStore->fname, strerror(errno)); uError("failed to write %d bytes to file %s since %s", contLen, pStore->fname, strerror(errno));
return -1; return -1;
} }
...@@ -292,7 +292,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) { ...@@ -292,7 +292,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
void *pBuf = buf; void *pBuf = buf;
tdEncodeKVRecord(&pBuf, &rInfo); tdEncodeKVRecord(&pBuf, &rInfo);
if (taosTWrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) { if (taosWrite(pStore->fd, buf, POINTER_DISTANCE(pBuf, buf)) < POINTER_DISTANCE(pBuf, buf)) {
uError("failed to write %" PRId64 " bytes to file %s since %s", (int64_t)(POINTER_DISTANCE(pBuf, buf)), pStore->fname, strerror(errno)); uError("failed to write %" PRId64 " bytes to file %s since %s", (int64_t)(POINTER_DISTANCE(pBuf, buf)), pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -339,7 +339,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size) { ...@@ -339,7 +339,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size) {
int fd = open(fname, O_RDONLY); int fd = open(fname, O_RDONLY);
if (fd < 0) goto _err; if (fd < 0) goto _err;
if (taosTRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) goto _err; if (taosRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) goto _err;
if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) goto _err; if (!taosCheckChecksumWhole((uint8_t *)buf, TD_KVSTORE_HEADER_SIZE)) goto _err;
void *pBuf = (void *)buf; void *pBuf = (void *)buf;
...@@ -368,7 +368,7 @@ static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t ...@@ -368,7 +368,7 @@ static int tdLoadKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo, uint32_t
return -1; return -1;
} }
if (taosTRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { if (taosRead(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to read %d bytes from file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); uError("failed to read %d bytes from file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -402,7 +402,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { ...@@ -402,7 +402,7 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
ASSERT(POINTER_DISTANCE(pBuf, buf) + sizeof(TSCKSUM) <= TD_KVSTORE_HEADER_SIZE); ASSERT(POINTER_DISTANCE(pBuf, buf) + sizeof(TSCKSUM) <= TD_KVSTORE_HEADER_SIZE);
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE); taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
if (taosTWrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { if (taosWrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -535,7 +535,7 @@ static int tdRestoreKVStore(SKVStore *pStore) { ...@@ -535,7 +535,7 @@ static int tdRestoreKVStore(SKVStore *pStore) {
ASSERT(pStore->info.size == TD_KVSTORE_HEADER_SIZE); ASSERT(pStore->info.size == TD_KVSTORE_HEADER_SIZE);
while (true) { while (true) {
ssize_t tsize = taosTRead(pStore->fd, tbuf, sizeof(SKVRecord)); int64_t tsize = taosRead(pStore->fd, tbuf, sizeof(SKVRecord));
if (tsize == 0) break; if (tsize == 0) break;
if (tsize < sizeof(SKVRecord)) { if (tsize < sizeof(SKVRecord)) {
uError("failed to read %" PRIzu " bytes from file %s at offset %" PRId64 "since %s", sizeof(SKVRecord), pStore->fname, uError("failed to read %" PRIzu " bytes from file %s at offset %" PRId64 "since %s", sizeof(SKVRecord), pStore->fname,
...@@ -598,7 +598,7 @@ static int tdRestoreKVStore(SKVStore *pStore) { ...@@ -598,7 +598,7 @@ static int tdRestoreKVStore(SKVStore *pStore) {
goto _err; goto _err;
} }
if (taosTRead(pStore->fd, buf, (size_t)pRecord->size) < pRecord->size) { if (taosRead(pStore->fd, buf, (size_t)pRecord->size) < pRecord->size) {
uError("failed to read %" PRId64 " bytes from file %s since %s, offset %" PRId64, pRecord->size, pStore->fname, uError("failed to read %" PRId64 " bytes from file %s since %s, offset %" PRId64, pRecord->size, pStore->fname,
strerror(errno), pRecord->offset); strerror(errno), pRecord->offset);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
......
...@@ -336,11 +336,11 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) { ...@@ -336,11 +336,11 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
lseek(tsLogObj.logHandle->fd, 0, SEEK_END); lseek(tsLogObj.logHandle->fd, 0, SEEK_END);
sprintf(name, "==================================================\n"); sprintf(name, "==================================================\n");
taosTWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name)); taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
sprintf(name, " new log file \n"); sprintf(name, " new log file \n");
taosTWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name)); taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
sprintf(name, "==================================================\n"); sprintf(name, "==================================================\n");
taosTWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name)); taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name));
return 0; return 0;
} }
...@@ -390,7 +390,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { ...@@ -390,7 +390,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
if (tsAsyncLog) { if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len); taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else { } else {
taosTWrite(tsLogObj.logHandle->fd, buffer, len); taosWrite(tsLogObj.logHandle->fd, buffer, len);
} }
if (tsLogObj.maxLines > 0) { if (tsLogObj.maxLines > 0) {
...@@ -400,7 +400,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { ...@@ -400,7 +400,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
} }
} }
if (dflag & DEBUG_SCREEN) taosTWrite(1, buffer, (uint32_t)len); if (dflag & DEBUG_SCREEN) taosWrite(1, buffer, (uint32_t)len);
} }
void taosDumpData(unsigned char *msg, int32_t len) { void taosDumpData(unsigned char *msg, int32_t len) {
...@@ -419,7 +419,7 @@ void taosDumpData(unsigned char *msg, int32_t len) { ...@@ -419,7 +419,7 @@ void taosDumpData(unsigned char *msg, int32_t len) {
pos += 3; pos += 3;
if (c >= 16) { if (c >= 16) {
temp[pos++] = '\n'; temp[pos++] = '\n';
taosTWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos); taosWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos);
c = 0; c = 0;
pos = 0; pos = 0;
} }
...@@ -427,9 +427,7 @@ void taosDumpData(unsigned char *msg, int32_t len) { ...@@ -427,9 +427,7 @@ void taosDumpData(unsigned char *msg, int32_t len) {
temp[pos++] = '\n'; temp[pos++] = '\n';
taosTWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos); taosWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos);
return;
} }
void taosPrintLongString(const char *flags, int32_t dflag, const char *format, ...) { void taosPrintLongString(const char *flags, int32_t dflag, const char *format, ...) {
...@@ -467,7 +465,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, . ...@@ -467,7 +465,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
if (tsAsyncLog) { if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len); taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else { } else {
taosTWrite(tsLogObj.logHandle->fd, buffer, len); taosWrite(tsLogObj.logHandle->fd, buffer, len);
} }
if (tsLogObj.maxLines > 0) { if (tsLogObj.maxLines > 0) {
...@@ -477,7 +475,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, . ...@@ -477,7 +475,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
} }
} }
if (dflag & DEBUG_SCREEN) taosTWrite(1, buffer, (uint32_t)len); if (dflag & DEBUG_SCREEN) taosWrite(1, buffer, (uint32_t)len);
} }
#if 0 #if 0
...@@ -606,7 +604,7 @@ static void *taosAsyncOutputLog(void *param) { ...@@ -606,7 +604,7 @@ static void *taosAsyncOutputLog(void *param) {
while (1) { while (1) {
log_size = taosPollLogBuffer(tLogBuff, tempBuffer, TSDB_DEFAULT_LOG_BUF_UNIT); log_size = taosPollLogBuffer(tLogBuff, tempBuffer, TSDB_DEFAULT_LOG_BUF_UNIT);
if (log_size) { if (log_size) {
taosTWrite(tLogBuff->fd, tempBuffer, log_size); taosWrite(tLogBuff->fd, tempBuffer, log_size);
LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + log_size) % LOG_BUF_SIZE(tLogBuff); LOG_BUF_START(tLogBuff) = (LOG_BUF_START(tLogBuff) + log_size) % LOG_BUF_SIZE(tLogBuff);
} else { } else {
break; break;
......
...@@ -265,7 +265,7 @@ void taosNotePrint(taosNoteInfo * pNote, const char * const format, ...) ...@@ -265,7 +265,7 @@ void taosNotePrint(taosNoteInfo * pNote, const char * const format, ...)
buffer[len] = 0; buffer[len] = 0;
if (pNote->taosNoteFd >= 0) { if (pNote->taosNoteFd >= 0) {
taosTWrite(pNote->taosNoteFd, buffer, (unsigned int)len); taosWrite(pNote->taosNoteFd, buffer, (unsigned int)len);
if (pNote->taosNoteMaxLines > 0) { if (pNote->taosNoteMaxLines > 0) {
pNote->taosNoteLines++; pNote->taosNoteLines++;
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "tsocket.h" #include "tsocket.h"
#include "taoserror.h" #include "taoserror.h"
int taosGetFqdn(char *fqdn) { int32_t taosGetFqdn(char *fqdn) {
char hostname[1024]; char hostname[1024];
hostname[1023] = '\0'; hostname[1023] = '\0';
if (gethostname(hostname, 1023) == -1) { if (gethostname(hostname, 1023) == -1) {
...@@ -26,10 +26,10 @@ int taosGetFqdn(char *fqdn) { ...@@ -26,10 +26,10 @@ int taosGetFqdn(char *fqdn) {
return -1; return -1;
} }
struct addrinfo hints = {0}; struct addrinfo hints = {0};
struct addrinfo *result = NULL; struct addrinfo *result = NULL;
hints.ai_flags = AI_CANONNAME; hints.ai_flags = AI_CANONNAME;
int ret = getaddrinfo(hostname, NULL, &hints, &result); int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) { if (!result) {
uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1; return -1;
...@@ -49,10 +49,10 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) { ...@@ -49,10 +49,10 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) {
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result); int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) { if (result) {
struct sockaddr *sa = result->ai_addr; struct sockaddr * sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in*)sa; struct sockaddr_in *si = (struct sockaddr_in *)sa;
struct in_addr ia = si->sin_addr; struct in_addr ia = si->sin_addr;
uint32_t ip = ia.s_addr; uint32_t ip = ia.s_addr;
freeaddrinfo(result); freeaddrinfo(result);
return ip; return ip;
} else { } else {
...@@ -70,7 +70,7 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) { ...@@ -70,7 +70,7 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) {
} }
} }
// Function converting an IP address string to an unsigned int. // Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) { uint32_t ip2uint(const char *const ip_addr) {
char ip_addr_cpy[20]; char ip_addr_cpy[20];
char ip[5]; char ip[5];
...@@ -81,7 +81,7 @@ uint32_t ip2uint(const char *const ip_addr) { ...@@ -81,7 +81,7 @@ uint32_t ip2uint(const char *const ip_addr) {
s_start = ip_addr_cpy; s_start = ip_addr_cpy;
s_end = ip_addr_cpy; s_end = ip_addr_cpy;
int k; int32_t k;
for (k = 0; *s_start != '\0'; s_start = s_end) { for (k = 0; *s_start != '\0'; s_start = s_end) {
for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) { for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
...@@ -95,17 +95,17 @@ uint32_t ip2uint(const char *const ip_addr) { ...@@ -95,17 +95,17 @@ uint32_t ip2uint(const char *const ip_addr) {
ip[k] = '\0'; ip[k] = '\0';
return *((unsigned int *)ip); return *((uint32_t *)ip);
} }
int taosWriteMsg(SOCKET fd, void *buf, int nbytes) { int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
int nleft, nwritten; int32_t nleft, nwritten;
char *ptr = (char *)buf; char * ptr = (char *)buf;
nleft = nbytes; nleft = nbytes;
while (nleft > 0) { while (nleft > 0) {
nwritten = (int)taosWriteSocket(fd, (char *)ptr, (size_t)nleft); nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft);
if (nwritten <= 0) { if (nwritten <= 0) {
if (errno == EINTR) if (errno == EINTR)
continue; continue;
...@@ -120,16 +120,16 @@ int taosWriteMsg(SOCKET fd, void *buf, int nbytes) { ...@@ -120,16 +120,16 @@ int taosWriteMsg(SOCKET fd, void *buf, int nbytes) {
return (nbytes - nleft); return (nbytes - nleft);
} }
int taosReadMsg(SOCKET fd, void *buf, int nbytes) { int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
int nleft, nread; int32_t nleft, nread;
char *ptr = (char *)buf; char * ptr = (char *)buf;
nleft = nbytes; nleft = nbytes;
if (fd < 0) return -1; if (fd < 0) return -1;
while (nleft > 0) { while (nleft > 0) {
nread = (int)taosReadSocket(fd, ptr, (size_t)nleft); nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft);
if (nread == 0) { if (nread == 0) {
break; break;
} else if (nread < 0) { } else if (nread < 0) {
...@@ -147,11 +147,11 @@ int taosReadMsg(SOCKET fd, void *buf, int nbytes) { ...@@ -147,11 +147,11 @@ int taosReadMsg(SOCKET fd, void *buf, int nbytes) {
return (nbytes - nleft); return (nbytes - nleft);
} }
int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) { int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
taosSetNonblocking(fd, 1); taosSetNonblocking(fd, 1);
int nleft, nwritten, nready; int32_t nleft, nwritten, nready;
fd_set fset; fd_set fset;
struct timeval tv; struct timeval tv;
nleft = nbytes; nleft = nbytes;
...@@ -160,7 +160,7 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) { ...@@ -160,7 +160,7 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
tv.tv_usec = 0; tv.tv_usec = 0;
FD_ZERO(&fset); FD_ZERO(&fset);
FD_SET(fd, &fset); FD_SET(fd, &fset);
if ((nready = select((int)(fd + 1), NULL, &fset, NULL, &tv)) == 0) { if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT; errno = ETIMEDOUT;
uError("fd %d timeout, no enough space to write", fd); uError("fd %d timeout, no enough space to write", fd);
break; break;
...@@ -172,7 +172,7 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) { ...@@ -172,7 +172,7 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
return -1; return -1;
} }
nwritten = (int)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL); nwritten = (int32_t)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
if (nwritten <= 0) { if (nwritten <= 0) {
if (errno == EAGAIN || errno == EINTR) continue; if (errno == EAGAIN || errno == EINTR) continue;
...@@ -189,10 +189,10 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) { ...@@ -189,10 +189,10 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
return (nbytes - nleft); return (nbytes - nleft);
} }
int taosReadn(SOCKET fd, char *ptr, int nbytes) { int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) {
int nread, nready, nleft = nbytes; int32_t nread, nready, nleft = nbytes;
fd_set fset; fd_set fset;
struct timeval tv; struct timeval tv;
while (nleft > 0) { while (nleft > 0) {
...@@ -200,7 +200,7 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) { ...@@ -200,7 +200,7 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
tv.tv_usec = 0; tv.tv_usec = 0;
FD_ZERO(&fset); FD_ZERO(&fset);
FD_SET(fd, &fset); FD_SET(fd, &fset);
if ((nready = select((int)(fd + 1), NULL, &fset, NULL, &tv)) == 0) { if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT; errno = ETIMEDOUT;
uError("fd %d timeout\n", fd); uError("fd %d timeout\n", fd);
break; break;
...@@ -210,7 +210,7 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) { ...@@ -210,7 +210,7 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
return -1; return -1;
} }
if ((nread = (int)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) { if ((nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) {
if (errno == EINTR) continue; if (errno == EINTR) continue;
uError("read error, %d (%s)", errno, strerror(errno)); uError("read error, %d (%s)", errno, strerror(errno));
return -1; return -1;
...@@ -229,8 +229,8 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) { ...@@ -229,8 +229,8 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) { SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in localAddr; struct sockaddr_in localAddr;
SOCKET sockFd; SOCKET sockFd;
int bufSize = 1024000; int32_t bufSize = 1024000;
uDebug("open udp socket:0x%x:%hu", ip, port); uDebug("open udp socket:0x%x:%hu", ip, port);
...@@ -239,7 +239,7 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) { ...@@ -239,7 +239,7 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
localAddr.sin_addr.s_addr = ip; localAddr.sin_addr.s_addr = ip;
localAddr.sin_port = (uint16_t)htons(port); localAddr.sin_port = (uint16_t)htons(port);
if ((sockFd = (int)socket(AF_INET, SOCK_DGRAM, 0)) <= 2) { if ((sockFd = (int32_t)socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
uError("failed to open udp socket: %d (%s)", errno, strerror(errno)); uError("failed to open udp socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd); taosCloseSocketNoCheck(sockFd);
return -1; return -1;
...@@ -268,9 +268,9 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) { ...@@ -268,9 +268,9 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
} }
SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) { SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
SOCKET sockFd = 0; SOCKET sockFd = 0;
int32_t ret;
struct sockaddr_in serverAddr, clientAddr; struct sockaddr_in serverAddr, clientAddr;
int ret;
sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
...@@ -281,7 +281,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie ...@@ -281,7 +281,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
} }
/* set REUSEADDR option, so the portnumber can be re-used */ /* set REUSEADDR option, so the portnumber can be re-used */
int reuse = 1; int32_t reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
...@@ -296,8 +296,8 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie ...@@ -296,8 +296,8 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
/* bind socket to client address */ /* bind socket to client address */
if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) { if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
uError("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", uError("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
clientIp, destIp, destPort, strerror(errno)); strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
return -1; return -1;
} }
...@@ -311,7 +311,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie ...@@ -311,7 +311,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret != 0) { if (ret != 0) {
//uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno)); // uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
sockFd = -1; sockFd = -1;
} else { } else {
...@@ -321,36 +321,36 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie ...@@ -321,36 +321,36 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
return sockFd; return sockFd;
} }
int taosKeepTcpAlive(SOCKET sockFd) { int32_t taosKeepTcpAlive(SOCKET sockFd) {
int alive = 1; int32_t alive = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) { if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
uError("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno)); uError("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
return -1; return -1;
} }
int probes = 3; int32_t probes = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) { if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno)); uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
return -1; return -1;
} }
int alivetime = 10; int32_t alivetime = 10;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) { if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
uError("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno)); uError("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
return -1; return -1;
} }
int interval = 3; int32_t interval = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) { if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
uError("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno)); uError("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
return -1; return -1;
} }
int nodelay = 1; int32_t nodelay = 1;
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) { if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
uError("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno)); uError("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
...@@ -371,8 +371,8 @@ int taosKeepTcpAlive(SOCKET sockFd) { ...@@ -371,8 +371,8 @@ int taosKeepTcpAlive(SOCKET sockFd) {
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd; struct sockaddr_in serverAdd;
SOCKET sockFd; SOCKET sockFd;
int reuse; int32_t reuse;
uDebug("open tcp server socket:0x%x:%hu", ip, port); uDebug("open tcp server socket:0x%x:%hu", ip, port);
...@@ -381,7 +381,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { ...@@ -381,7 +381,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
serverAdd.sin_addr.s_addr = ip; serverAdd.sin_addr.s_addr = ip;
serverAdd.sin_port = (uint16_t)htons(port); serverAdd.sin_port = (uint16_t)htons(port);
if ((sockFd = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { if ((sockFd = (int32_t)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
uError("failed to open TCP socket: %d (%s)", errno, strerror(errno)); uError("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd); taosCloseSocketNoCheck(sockFd);
return -1; return -1;
...@@ -417,38 +417,38 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { ...@@ -417,38 +417,38 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
return sockFd; return sockFd;
} }
void tinet_ntoa(char *ipstr, unsigned int ip) { void tinet_ntoa(char *ipstr, uint32_t ip) {
sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24); sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
} }
#define COPY_SIZE 32768 #define COPY_SIZE 32768
// sendfile shall be used // sendfile shall be used
int taosCopyFds(SOCKET sfd, SOCKET dfd, int64_t len) { int32_t taosCopyFds(SOCKET sfd, SOCKET dfd, int64_t len) {
int64_t leftLen; int64_t leftLen;
int readLen, writeLen; int32_t readLen, writeLen;
char temp[COPY_SIZE]; char temp[COPY_SIZE];
leftLen = len; leftLen = len;
while (leftLen > 0) { while (leftLen > 0) {
if (leftLen < COPY_SIZE) if (leftLen < COPY_SIZE)
readLen = (int)leftLen; readLen = (int32_t)leftLen;
else else
readLen = COPY_SIZE; // 4K readLen = COPY_SIZE; // 4K
int retLen = taosReadMsg(sfd, temp, (int)readLen); int32_t retLen = taosReadMsg(sfd, temp, (int32_t)readLen);
if (readLen != retLen) { if (readLen != retLen) {
uError("read error, readLen:%d retLen:%d len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", readLen, retLen, len, leftLen, uError("read error, readLen:%d retLen:%d len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", readLen, retLen, len,
strerror(errno)); leftLen, strerror(errno));
return -1; return -1;
} }
writeLen = taosWriteMsg(dfd, temp, readLen); writeLen = taosWriteMsg(dfd, temp, readLen);
if (readLen != writeLen) { if (readLen != writeLen) {
uError("copy error, readLen:%d writeLen:%d len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", readLen, writeLen, len, leftLen, uError("copy error, readLen:%d writeLen:%d len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", readLen, writeLen,
strerror(errno)); len, leftLen, strerror(errno));
return -1; return -1;
} }
......
...@@ -387,6 +387,10 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -387,6 +387,10 @@ void vnodeRelease(void *pVnodeRaw) {
pVnode->qMgmt = NULL; pVnode->qMgmt = NULL;
} }
if (pVnode->wal) {
walStop(pVnode->wal);
}
if (pVnode->tsdb) { if (pVnode->tsdb) {
tsdbCloseRepo(pVnode->tsdb, 1); tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
......
...@@ -49,6 +49,8 @@ typedef struct { ...@@ -49,6 +49,8 @@ typedef struct {
int32_t level; int32_t level;
int32_t fsyncPeriod; int32_t fsyncPeriod;
int32_t fsyncSeq; int32_t fsyncSeq;
int8_t stop;
int8_t reserved[3];
char path[WAL_PATH_LEN]; char path[WAL_PATH_LEN];
char name[WAL_FILE_LEN]; char name[WAL_FILE_LEN];
pthread_mutex_t mutex; pthread_mutex_t mutex;
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "talloc.h"
#include "tref.h" #include "tref.h"
#include "twal.h" #include "twal.h"
#include "walInt.h" #include "walInt.h"
...@@ -110,6 +109,16 @@ int32_t walAlter(void *handle, SWalCfg *pCfg) { ...@@ -110,6 +109,16 @@ int32_t walAlter(void *handle, SWalCfg *pCfg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void walStop(void *handle) {
if (handle == NULL) return;
SWal *pWal = handle;
pthread_mutex_lock(&pWal->mutex);
pWal->stop = 1;
pthread_mutex_unlock(&pWal->mutex);
wDebug("vgId:%d, stop write wal", pWal->vgId);
}
void walClose(void *handle) { void walClose(void *handle) {
if (handle == NULL) return; if (handle == NULL) return;
...@@ -123,9 +132,7 @@ void walClose(void *handle) { ...@@ -123,9 +132,7 @@ void walClose(void *handle) {
while (walGetNextFile(pWal, &fileId) >= 0) { while (walGetNextFile(pWal, &fileId) >= 0) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, fileId);
if (fileId == pWal->fileId) { if (remove(pWal->name) < 0) {
wDebug("vgId:%d, wal:%p file:%s, it is closed and kept", pWal->vgId, pWal, pWal->name);
} else if (remove(pWal->name) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
} else { } else {
wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "talloc.h"
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "twal.h" #include "twal.h"
...@@ -29,10 +28,15 @@ int32_t walRenew(void *handle) { ...@@ -29,10 +28,15 @@ int32_t walRenew(void *handle) {
SWal * pWal = handle; SWal * pWal = handle;
int32_t code = 0; int32_t code = 0;
if (pWal->stop) {
wDebug("vgId:%d, do not create a new wal file", pWal->vgId);
return 0;
}
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >= 0) { if (pWal->fd >= 0) {
close(pWal->fd); tclose(pWal->fd);
wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name);
} }
...@@ -90,7 +94,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -90,7 +94,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { if (taosWrite(pWal->fd, pHead, contLen) != contLen) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
} else { } else {
...@@ -151,7 +155,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void * ...@@ -151,7 +155,7 @@ int32_t walRestore(void *handle, void *pVnode, int32_t (*writeFp)(void *, void *
if (!pWal->keep) return TSDB_CODE_SUCCESS; if (!pWal->keep) return TSDB_CODE_SUCCESS;
if (count == 0) { if (count == 0) {
wDebug("vgId:%d, file:%s not exist, renew it", pWal->vgId, pWal->name); wDebug("vgId:%d, wal file not exist, renew it", pWal->vgId);
return walRenew(pWal); return walRenew(pWal);
} else { } else {
// open the existing WAL file in append mode // open the existing WAL file in append mode
...@@ -204,7 +208,7 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, i ...@@ -204,7 +208,7 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, i
return TSDB_CODE_WAL_FILE_CORRUPTED; return TSDB_CODE_WAL_FILE_CORRUPTED;
} }
if (taosTRead(fd, pHead, sizeof(SWalHead)) <= 0) { if (taosRead(fd, pHead, sizeof(SWalHead)) <= 0) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED; return TSDB_CODE_WAL_FILE_CORRUPTED;
} }
...@@ -245,7 +249,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -245,7 +249,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
SWalHead *pHead = buffer; SWalHead *pHead = buffer;
while (1) { while (1) {
int32_t ret = taosTRead(fd, pHead, sizeof(SWalHead)); int32_t ret = taosRead(fd, pHead, sizeof(SWalHead));
if (ret == 0) break; if (ret == 0) break;
if (ret < 0) { if (ret < 0) {
...@@ -282,7 +286,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -282,7 +286,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
pHead = buffer; pHead = buffer;
} }
ret = taosTRead(fd, pHead->cont, pHead->len); ret = taosRead(fd, pHead->cont, pHead->len);
if (ret < 0) { if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
...@@ -305,7 +309,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -305,7 +309,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
} }
close(fd); tclose(fd);
tfree(buffer); tfree(buffer);
return code; return code;
......
...@@ -8,8 +8,8 @@ sleep 3000 ...@@ -8,8 +8,8 @@ sleep 3000
sql connect sql connect
$i = 0 $i = 0
$dbPrefix = tb_in_db $dbPrefix = d
$tbPrefix = tb_in_tb $tbPrefix = t
$db = $dbPrefix . $i $db = $dbPrefix . $i
$tb = $tbPrefix . $i $tb = $tbPrefix . $i
...@@ -22,28 +22,27 @@ sql create table $tb (ts timestamp, speed int) ...@@ -22,28 +22,27 @@ sql create table $tb (ts timestamp, speed int)
$x = 0 $x = 0
while $x < 10 while $x < 10
$ms = $x . m $cc = $x * 60000
sql insert into $tb values (now + $ms , $x ) $ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1 $x = $x + 1
endw endw
print =============== step 2 print =============== step 2
sql insert into $tb values (now - 5m , 10) $x = 0
sql insert into $tb values (now - 6m , 10) while $x < 5
sql insert into $tb values (now - 7m , 10) $cc = $x * 60000
sql insert into $tb values (now - 8m , 10) $ms = 1551481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
sql select * from $tb sql select * from $tb
print $rows points data are retrieved print $rows points data are retrieved
if $rows != 14 then if $rows != 15 then
return -1
endi
sql drop database $db
sleep 1000
sql show databases
if $rows != 0 then
return -1 return -1
endi endi
......
...@@ -9,7 +9,7 @@ NC='\033[0m' ...@@ -9,7 +9,7 @@ NC='\033[0m'
function runSimCaseOneByOne { function runSimCaseOneByOne {
while read -r line; do while read -r line; do
if [[ $line =~ ^./test.sh* ]]; then if [[ $line =~ ^./test.sh* ]] || [[ $line =~ ^run* ]]; then
case=`echo $line | grep sim$ |awk '{print $NF}'` case=`echo $line | grep sim$ |awk '{print $NF}'`
start_time=`date +%s` start_time=`date +%s`
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册