diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 137ad31d8225c524687d574cf13cd944a1afad54..410e0c564eedc5ce7051773fd61fb2030517fd3a 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -8,7 +8,13 @@ target_include_directories( # see https://stackoverflow.com/questions/25676277/cmake-target-include-directories-prints-an-error-when-i-try-to-add-the-source PUBLIC $ ) + add_subdirectory(lz4/build/cmake) +target_include_directories( + lz4_static + PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/lz4/lib +) + add_subdirectory(zlib) target_include_directories( zlib diff --git a/include/common/taosdef.h b/include/common/taosdef.h index d80caad88db865d83986e4c19d95603eadde9884..0bcd693f5e0881f61915e5599c772b8db8033ce0 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -41,15 +41,6 @@ extern "C" { // Bytes for each type. extern const int32_t TYPE_BYTES[15]; -// TODO: replace and remove code below -#define CHAR_BYTES sizeof(char) -#define SHORT_BYTES sizeof(int16_t) -#define INT_BYTES sizeof(int32_t) -#define LONG_BYTES sizeof(int64_t) -#define FLOAT_BYTES sizeof(float) -#define DOUBLE_BYTES sizeof(double) -#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*) - #define TSDB_KEYSIZE sizeof(TSKEY) #if LINUX diff --git a/include/os/os.h b/include/os/os.h index c89dc652117472b3efb4b933fe3d808f533d245c..935923195b3d2eba5da0d9f8df7af3c010dc3be2 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -21,22 +21,29 @@ extern "C" { #endif #include +#include #include +#include #include -#include +#include +#include +#include +#include #include #include #include #include #include +#include -#include -#include -#include -#include -#include -#include -#include +// #include +// #include +// #include +// // #include +// +// #include +// #include +// #include #include "osAtomic.h" #include "osDef.h" @@ -48,7 +55,9 @@ extern "C" { #include "osSemaphore.h" #include "osSocket.h" #include "osString.h" +#include "osSleep.h" #include "osTime.h" +#include "osThread.h" #ifdef __cplusplus } diff --git a/include/os/osDef.h b/include/os/osDef.h index 7de8fd2f26a987cdd851a3c3c5158721e5b34149..053bf42ece7fe814d762a93a0b7972ffb34c9de3 100644 --- a/include/os/osDef.h +++ b/include/os/osDef.h @@ -29,11 +29,6 @@ extern "C" { #define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b))) #define POINTER_DISTANCE(p1, p2) ((char *)(p1) - (char *)(p2)) -#ifndef PATH_MAX - #define PATH_MAX 1024 -#endif - - #if defined(_TD_LINUX_64) || defined(_TD_LINUX_32) || defined(_TD_MIPS_64) || defined(_TD_ARM_32) || defined(_TD_ARM_64) || defined(_TD_DARWIN_64) #if defined(_TD_DARWIN_64) // MacOS @@ -53,6 +48,16 @@ extern "C" { #endif + +// TODO: replace and remove code below +#define CHAR_BYTES sizeof(char) +#define SHORT_BYTES sizeof(int16_t) +#define INT_BYTES sizeof(int32_t) +#define LONG_BYTES sizeof(int64_t) +#define FLOAT_BYTES sizeof(float) +#define DOUBLE_BYTES sizeof(double) +#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*) + #ifdef __cplusplus } #endif diff --git a/include/os/osFile.h b/include/os/osFile.h index 86c08c0a3aca1f2e8434aee84c819d0c17060494..66c3ae9cbd625cdce25d79bf9bb81faa43ada259 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_OS_FILE_H -#define TDENGINE_OS_FILE_H +#ifndef _TD_OS_FILE_H_ +#define _TD_OS_FILE_H_ #ifdef __cplusplus extern "C" { @@ -22,13 +22,6 @@ extern "C" { #include "osSocket.h" -#define FD_VALID(x) ((x) > STDERR_FILENO) -#define FD_INITIALIZER ((int32_t)-1) - -#ifndef PATH_MAX - #define PATH_MAX 256 -#endif - #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) typedef int32_t FileFd; typedef SOCKET SocketFd; @@ -37,40 +30,44 @@ typedef int32_t FileFd; typedef int32_t SocketFd; #endif -int64_t taosRead(FileFd fd, void *buf, int64_t count); -int64_t taosWrite(FileFd fd, void *buf, int64_t count); +#define FD_INITIALIZER ((int32_t)-1) + +#ifndef PATH_MAX +#define PATH_MAX 256 +#endif + +int32_t taosLockFile(FileFd fd); +int32_t taosUnLockFile(FileFd fd); + +int32_t taosUmaskFile(FileFd fd); -int64_t taosLSeek(FileFd fd, int64_t offset, int32_t whence); -int32_t taosFtruncate(FileFd fd, int64_t length); -int32_t taosFsync(FileFd fd); +int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime); +int32_t taosFStatFile(FileFd fd, int64_t *size, int32_t *mtime); -int32_t taosRename(char* oldName, char *newName); -int64_t taosCopy(char *from, char *to); +FileFd taosOpenFileWrite(const char *path); +FileFd taosOpenFileCreateWrite(const char *path); +FileFd taosOpenFileTruncCreateWrite(const char *path); +FileFd taosOpenFileRead(const char *path); + +int64_t taosLSeekFile(FileFd fd, int64_t offset, int32_t whence); +int32_t taosFtruncateFile(FileFd fd, int64_t length); +int32_t taosFsyncFile(FileFd fd); + +int64_t taosReadFile(FileFd fd, void *buf, int64_t count); +int64_t taosWriteFile(FileFd fd, void *buf, int64_t count); + +void taosCloseFile(FileFd fd); + +int32_t taosRenameFile(char *oldName, char *newName); +int64_t taosCopyFile(char *from, char *to); + +void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath); int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t size); int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size); -void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath); -void taosClose(FileFd fd); - -#ifdef TAOS_RANDOM_FILE_FAIL - void taosSetRandomFileFailFactor(int32_t factor); - void taosSetRandomFileFailOutput(const char *path); - #ifdef TAOS_RANDOM_FILE_FAIL_TEST - int64_t taosReadFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line); - int64_t taosWriteFileRandomFail(int32_t fd, void *buf, int32_t count, const char *file, uint32_t line); - int64_t taosLSeekRandomFail(int32_t fd, int64_t offset, int32_t whence, const char *file, uint32_t line); - #undef taosRead - #undef taosWrite - #undef taosLSeek - #define taosRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__) - #define taosWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__) - #define taosLSeek(fd, offset, whence) taosLSeekRandomFail(fd, offset, whence, __FILE__, __LINE__) - #endif -#endif - #ifdef __cplusplus } #endif -#endif +#endif /*_TD_OS_FILE_H_*/ diff --git a/include/os/osRand.h b/include/os/osRand.h new file mode 100644 index 0000000000000000000000000000000000000000..e08768c2cc6b379877fb8be7d3541c13bc431f98 --- /dev/null +++ b/include/os/osRand.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_OS_RAND_H +#define TDENGINE_OS_RAND_H + +#ifdef __cplusplus +extern "C" { +#endif + +uint32_t taosRand(void); +void taosRandStr(char* str, int32_t size); +uint32_t taosSafeRand(void); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/include/os/osSemaphore.h b/include/os/osSemaphore.h index 10d14700e013f66e6d98208f0e65fe1ca5fc3874..d945e721c2999c6c2cd1302e4befc22997db0af4 100644 --- a/include/os/osSemaphore.h +++ b/include/os/osSemaphore.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include + #if defined (_TD_DARWIN_64) typedef struct tsem_s *tsem_t; int tsem_init(tsem_t *sem, int pshared, unsigned int value); diff --git a/include/os/osSleep.h b/include/os/osSleep.h new file mode 100644 index 0000000000000000000000000000000000000000..e42da8d5a64bbc484b15beea19433a710578ff3f --- /dev/null +++ b/include/os/osSleep.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_OS_SLEEP_H +#define TDENGINE_OS_SLEEP_H + +#ifdef __cplusplus +extern "C" { +#endif + +void taosMsleep(int32_t ms); + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/include/os/osSocket.h b/include/os/osSocket.h index 29653f12690cd3e9ea9f00dbca4843de9e2a6ae0..0b183b9ece773b3bbc0b21d57146aa6aa2d6e88b 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -20,6 +20,19 @@ extern "C" { #endif +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + #include "winsock2.h" + #include + #include + #include +#else + #include + #include + #include + #include + #include +#endif + #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags) #define taosSendto(sockfd, buf, len, flags, dest_addr, addrlen) sendto((SOCKET)sockfd, buf, len, flags, dest_addr, addrlen) @@ -35,50 +48,20 @@ extern "C" { #define taosCloseSocketNoCheck(x) close(x) #define taosCloseSocket(x) \ { \ - if (FD_VALID(x)) { \ + if ((x) > -1) { \ close(x); \ x = FD_INITIALIZER; \ } \ } #endif -#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) - #define TAOS_EPOLL_WAIT_TIME 100 - typedef SOCKET eventfd_t; - #define eventfd(a, b) -1 - typedef SOCKET EpollFd; - #define EpollClose(pollFd) epoll_close(pollFd) - #ifndef EPOLLWAKEUP - #define EPOLLWAKEUP (1u << 29) - #endif -#elif defined(_TD_DARWIN_64) - #define TAOS_EPOLL_WAIT_TIME 500 - typedef int32_t SOCKET; - typedef SOCKET EpollFd; - #define EpollClose(pollFd) epoll_close(pollFd) -#else - #define TAOS_EPOLL_WAIT_TIME 500 - typedef int32_t SOCKET; - typedef SOCKET EpollFd; - #define EpollClose(pollFd) taosCloseSocket(pollFd) -#endif +#define TAOS_EPOLL_WAIT_TIME 500 +typedef int32_t SOCKET; +typedef SOCKET EpollFd; +#define EpollClose(pollFd) taosCloseSocket(pollFd) -#ifdef TAOS_RANDOM_NETWORK_FAIL - #ifdef TAOS_RANDOM_NETWORK_FAIL_TEST - int64_t taosSendRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags); - int64_t taosSendToRandomFail(int32_t sockfd, const void *buf, size_t len, int32_t flags, const struct sockaddr *dest_addr, socklen_t addrlen); - int64_t taosReadSocketRandomFail(int32_t fd, void *buf, size_t count); - int64_t taosWriteSocketRandomFail(int32_t fd, const void *buf, size_t count); - #undef taosSend - #undef taosSendto - #undef taosReadSocket - #undef taosWriteSocket - #define taosSend(sockfd, buf, len, flags) taosSendRandomFail(sockfd, buf, len, flags) - #define taosSendto(sockfd, buf, len, flags, dest_addr, addrlen) taosSendToRandomFail(sockfd, buf, len, flags, dest_addr, addrlen) - #define taosReadSocket(fd, buf, len) taosReadSocketRandomFail(fd, buf, len) - #define taosWriteSocket(fd, buf, len) taosWriteSocketRandomFail(fd, buf, len) - #endif -#endif +void taosShutDownSocketRD(SOCKET fd); +void taosShutDownSocketWR(SOCKET fd); int32_t taosSetNonblocking(SOCKET sock, int32_t on); void taosIgnSIGPIPE(); @@ -88,9 +71,7 @@ int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *op int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t* optlen); uint32_t taosInetAddr(char *ipAddr); -#if 0 const char *taosInetNtoa(struct in_addr ipInt); -#endif #if (defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) #define htobe64 htonll diff --git a/include/os/osThread.h b/include/os/osThread.h new file mode 100644 index 0000000000000000000000000000000000000000..ecb085cd06b02a81604dafacad9daf48d5890fa3 --- /dev/null +++ b/include/os/osThread.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_OS_THREAD_H_ +#define _TD_OS_THREAD_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_OS_THREAD_H_*/ diff --git a/include/os/osTime.h b/include/os/osTime.h index 81163b29e78e58c961e6a4da6be167daff9860ad..c5ee26e6eff365a0eb4512f24d9b904b5c0a03a4 100644 --- a/include/os/osTime.h +++ b/include/os/osTime.h @@ -20,20 +20,22 @@ extern "C" { #endif +int32_t taosGetTimeOfDay(struct timeval *tv); + //@return timestamp in second int32_t taosGetTimestampSec(); //@return timestamp in millisecond static FORCE_INLINE int64_t taosGetTimestampMs() { struct timeval systemTime; - gettimeofday(&systemTime, NULL); + taosGetTimeOfDay(&systemTime); return (int64_t)systemTime.tv_sec * 1000L + (int64_t)systemTime.tv_usec / 1000; } //@return timestamp in microsecond static FORCE_INLINE int64_t taosGetTimestampUs() { struct timeval systemTime; - gettimeofday(&systemTime, NULL); + taosGetTimeOfDay(&systemTime); return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec; } diff --git a/include/util/tidpool.h b/include/util/tidpool.h new file mode 100644 index 0000000000000000000000000000000000000000..e4439439ced6522e26c8db4a560c50f5b0cb8a16 --- /dev/null +++ b/include/util/tidpool.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TIDPOOL_H +#define TDENGINE_TIDPOOL_H + +#ifdef __cplusplus +extern "C" { +#endif + +void *taosInitIdPool(int maxId); + +int taosUpdateIdPool(void *handle, int maxId); + +int taosIdPoolMaxSize(void *handle); + +int taosAllocateId(void *handle); + +void taosFreeId(void *handle, int id); + +void taosIdPoolCleanUp(void *handle); + +int taosIdPoolNumOfUsed(void *handle); + +bool taosIdPoolMarkStatus(void *handle, int id); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/include/util/tmempool.h b/include/util/tmempool.h new file mode 100644 index 0000000000000000000000000000000000000000..f2c6a0ef006cc77ba8a865994aaf5fb31d0c21a1 --- /dev/null +++ b/include/util/tmempool.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_TMEMPOOL_H +#define TDENGINE_TMEMPOOL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define mpool_h void * + +mpool_h taosMemPoolInit(int maxNum, int blockSize); + +char *taosMemPoolMalloc(mpool_h handle); + +void taosMemPoolFree(mpool_h handle, char *p); + +void taosMemPoolCleanUp(mpool_h handle); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/include/util/tref.h b/include/util/tref.h new file mode 100644 index 0000000000000000000000000000000000000000..085c10c55198fc92bfacb0628c38f05da508bfb9 --- /dev/null +++ b/include/util/tref.h @@ -0,0 +1,77 @@ + +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TREF_H +#define TDENGINE_TREF_H + +#ifdef __cplusplus +extern "C" { +#endif + +// open a reference set, max is the mod used by hash, fp is the pointer to free resource function +// return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately +int taosOpenRef(int max, void (*fp)(void *)); + +// close the reference set, refId is the return value by taosOpenRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosCloseRef(int refId); + +// add ref, p is the pointer to resource or pointer ID +// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately +int64_t taosAddRef(int refId, void *p); + +// remove ref, rid is the reference ID returned by taosAddRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosRemoveRef(int rsetId, int64_t rid); + +// acquire ref, rid is the reference ID returned by taosAddRef +// return the resource p. On error, NULL is returned, and terrno is set appropriately +void *taosAcquireRef(int rsetId, int64_t rid); + +// release ref, rid is the reference ID returned by taosAddRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosReleaseRef(int rsetId, int64_t rid); + +// return the first reference if rid is 0, otherwise return the next after current reference. +// if return value is NULL, it means list is over(if terrno is set, it means error happens) +void *taosIterateRef(int rsetId, int64_t rid); + +// return the number of references in system +int taosListRef(); + +#define RID_VALID(x) ((x) > 0) + +/* sample code to iterate the refs + +void demoIterateRefs(int rsetId) { + + void *p = taosIterateRef(refId, 0); + while (p) { + // process P + + // get the rid from p + + p = taosIterateRef(rsetId, rid); + } +} + +*/ + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TREF_H diff --git a/include/util/tutil.h b/include/util/tutil.h index 6bcfb5de295c5719032b81c23d16ec2b1476349e..16e5af1aded99ceef3fa6183574504249b0f8b3c 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -23,7 +23,6 @@ extern "C" { #include "os.h" #include "tmd5.h" #include "tcrc32c.h" -#include "taosdef.h" int32_t strdequote(char *src); int32_t strRmquote(char *z, int32_t len); @@ -46,14 +45,19 @@ int taosCheckVersion(char *input_client_version, char *input_server_version, in char * taosIpStr(uint32_t ipInt); uint32_t ip2uint(const char *const ip_addr); -static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { +static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target, int32_t keylen) { MD5_CTX context; MD5Init(&context); MD5Update(&context, inBuf, (unsigned int)inLen); MD5Final(&context); - memcpy(target, context.digest, TSDB_KEY_LEN); + memcpy(target, context.digest, keylen); } +#ifdef tListLen +#undefine tListLen +#endif +#define tListLen(x) (sizeof(x) / sizeof((x)[0])) + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/CMakeLists.txt b/source/libs/transport/CMakeLists.txt index c01996cb8cff7e7510707af1f31d6395594b1c7b..98a380dc8f79c11073d162b78d4817a567e42511 100644 --- a/source/libs/transport/CMakeLists.txt +++ b/source/libs/transport/CMakeLists.txt @@ -4,4 +4,12 @@ target_include_directories( transport PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/transport" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +target_link_libraries( + transport + PUBLIC lz4_static + PUBLIC os + PUBLIC util + PUBLIC common ) \ No newline at end of file diff --git a/source/libs/transport/src/rpcTcp.c b/source/libs/transport/src/rpcTcp.c index 25495182498f7c1a82f9f9459290e44f082f5eb2..5eb7e39ff20bd333aeca9df02fed6aa6fa087e56 100644 --- a/source/libs/transport/src/rpcTcp.c +++ b/source/libs/transport/src/rpcTcp.c @@ -22,6 +22,9 @@ #include "rpcHead.h" #include "rpcTcp.h" + +#include + typedef struct SFdObj { void *signature; SOCKET fd; // TCP socket FD @@ -195,16 +198,7 @@ void taosStopTcpServer(void *handle) { pServerObj->stop = 1; if (pServerObj->fd >= 0) { -#ifdef WINDOWS - closesocket(pServerObj->fd); -#elif defined(__APPLE__) - if (pServerObj->fd!=-1) { - close(pServerObj->fd); - pServerObj->fd = -1; - } -#else - shutdown(pServerObj->fd, SHUT_RD); -#endif + taosShutDownSocketRD(pServerObj->fd); } if (taosCheckPthreadValid(pServerObj->thread)) { if (taosComparePthread(pServerObj->thread, pthread_self())) { @@ -267,8 +261,8 @@ static void *taosAcceptTcpConnection(void *arg) { int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); if (ret != 0) { taosCloseSocket(connFd); - tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno), - taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); + tError("%s failed to set recv timeout fd(%s)for connection from:%hu", pServerObj->label, strerror(errno), + htons(caddr.sin_port)); continue; } @@ -280,12 +274,12 @@ static void *taosAcceptTcpConnection(void *arg) { if (pFdObj) { pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->port = htons(caddr.sin_port); - tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, - taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); + tDebug("%s new TCP connection from %hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, + pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); } else { taosCloseSocket(connFd); - tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), - taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); + tError("%s failed to malloc FdObj(%s) for connection from:%hu", pServerObj->label, strerror(errno), + htons(caddr.sin_port)); } // pick up next thread for next connection @@ -436,7 +430,7 @@ void taosCloseTcpConnection(void *chandle) { // pFdObj->thandle = NULL; pFdObj->closedByApp = 1; - shutdown(pFdObj->fd, SHUT_WR); + taosShutDownSocketWR(pFdObj->fd); } int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { @@ -456,7 +450,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { // notify the upper layer, so it will clean the associated context if (pFdObj->closedByApp == 0) { - shutdown(pFdObj->fd, SHUT_WR); + taosShutDownSocketWR(pFdObj->fd); SRecvInfo recvInfo; recvInfo.msg = NULL; diff --git a/src/os/src/detail/osFile.c b/source/os/src/osFile.c similarity index 68% rename from src/os/src/detail/osFile.c rename to source/os/src/osFile.c index cc12968c72eef5b3970ca68cf660de502b402e1e..5acbfd6fa729f70e296eb3adc28961e4a5131421 100644 --- a/src/os/src/detail/osFile.c +++ b/source/os/src/osFile.c @@ -13,24 +13,29 @@ * along with this program. If not, see . */ -#define _DEFAULT_SOURCE #include "os.h" -#include "tglobal.h" -#include "tulog.h" -void taosClose(FileFd fd) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) +#else +#include +#include +#include +#include +#include +#endif + +void taosCloseFile(FileFd fd) { close(fd); fd = FD_INITIALIZER; } +void taosGetTmpfilePath(const char * inputTmpDir, const char *fileNamePrefix, char *dstPath) { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) - -void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) { const char *tdengineTmpFileNamePrefix = "tdengine-"; char tmpPath[PATH_MAX]; - int32_t len = (int32_t)strlen(tsTempDir); - memcpy(tmpPath, tsTempDir, len); + int32_t len = (int32_t)strlen(inputTmpDir); + memcpy(tmpPath, inputTmpDir, len); if (tmpPath[len - 1] != '/' && tmpPath[len - 1] != '\\') { tmpPath[len++] = '\\'; @@ -46,16 +51,14 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) { char rand[8] = {0}; taosRandStr(rand, tListLen(rand) - 1); snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand); -} #else -void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) { const char *tdengineTmpFileNamePrefix = "tdengine-"; - char tmpPath[PATH_MAX]; - int32_t len = strlen(tsTempDir); - memcpy(tmpPath, tsTempDir, len); + char tmpPath[PATH_MAX]; + int32_t len = strlen(inputTmpDir); + memcpy(tmpPath, inputTmpDir, len); static uint64_t seqId = 0; if (tmpPath[len - 1] != '/') { @@ -73,11 +76,11 @@ void taosGetTmpfilePath(const char *fileNamePrefix, char *dstPath) { sprintf(rand, "%" PRIu64, atomic_add_fetch_64(&seqId, 1)); snprintf(dstPath, PATH_MAX, tmpPath, getpid(), rand); -} #endif +} -int64_t taosRead(FileFd fd, void *buf, int64_t count) { +int64_t taosReadFile(FileFd fd, void *buf, int64_t count) { int64_t leftbytes = count; int64_t readbytes; char * tbuf = (char *)buf; @@ -101,7 +104,7 @@ int64_t taosRead(FileFd fd, void *buf, int64_t count) { return count; } -int64_t taosWrite(FileFd fd, void *buf, int64_t n) { +int64_t taosWriteFile(FileFd fd, void *buf, int64_t n) { int64_t nleft = n; int64_t nwritten = 0; char * tbuf = (char *)buf; @@ -121,42 +124,46 @@ int64_t taosWrite(FileFd fd, void *buf, int64_t n) { return n; } -int64_t taosLSeek(FileFd fd, int64_t offset, int32_t whence) { return (int64_t)lseek(fd, (long)offset, whence); } +int64_t taosLSeekFile(FileFd fd, int64_t offset, int32_t whence) { return (int64_t)lseek(fd, (long)offset, whence); } -int64_t taosCopy(char *from, char *to) { +int64_t taosCopyFile(char *from, char *to) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else char buffer[4096]; int fidto = -1, fidfrom = -1; int64_t size = 0; int64_t bytes; - fidfrom = open(from, O_RDONLY | O_BINARY); + fidfrom = open(from, O_RDONLY); if (fidfrom < 0) goto _err; - fidto = open(to, O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0755); + fidto = open(to, O_WRONLY | O_CREAT | O_EXCL, 0755); if (fidto < 0) goto _err; while (true) { - bytes = taosRead(fidfrom, buffer, sizeof(buffer)); + bytes = taosReadFile(fidfrom, buffer, sizeof(buffer)); if (bytes < 0) goto _err; if (bytes == 0) break; size += bytes; - if (taosWrite(fidto, (void *)buffer, bytes) < bytes) goto _err; + if (taosWriteFile(fidto, (void *)buffer, bytes) < bytes) goto _err; if (bytes < sizeof(buffer)) break; } - taosFsync(fidto); + taosFsyncFile(fidto); - taosClose(fidfrom); - taosClose(fidto); + taosCloseFile(fidfrom); + taosCloseFile(fidto); return size; _err: - if (fidfrom >= 0) taosClose(fidfrom); - if (fidto >= 0) taosClose(fidto); + if (fidfrom >= 0) taosCloseFile(fidfrom); + if (fidto >= 0) taosCloseFile(fidto); remove(to); return -1; +#endif } #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) @@ -306,9 +313,8 @@ int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size #endif +int32_t taosFtruncateFile(FileFd fd, int64_t l_size) { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) - -int32_t taosFtruncate(int32_t fd, int64_t l_size) { if (fd < 0) { errno = EBADF; uError("%s\n", "fd arg was negative"); @@ -357,9 +363,13 @@ int32_t taosFtruncate(int32_t fd, int64_t l_size) { } return 0; +#else + return ftruncate(fd, l_size); +#endif } -int32_t taosFsync(FileFd fd) { +int32_t taosFsyncFile(FileFd fd) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) if (fd < 0) { errno = EBADF; uError("%s\n", "fd arg was negative"); @@ -369,33 +379,126 @@ int32_t taosFsync(FileFd fd) { HANDLE h = (HANDLE)_get_osfhandle(fd); return FlushFileBuffers(h); +#else + return fsync(fd); +#endif } -int32_t taosRename(char *oldName, char *newName) { +int32_t taosRenameFile(char *oldName, char *newName) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) int32_t code = MoveFileEx(oldName, newName, MOVEFILE_REPLACE_EXISTING | MOVEFILE_COPY_ALLOWED); if (code < 0) { - uError("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno)); - } else { - uTrace("successfully to rename file %s to %s", oldName, newName); + printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno)); + } + + return code; +#else + int32_t code = rename(oldName, newName); + if (code < 0) { + printf("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno)); } return code; +#endif +} + + +int32_t taosLockFile(int32_t fd) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + return (int32_t)flock(fd, LOCK_EX | LOCK_NB); +#endif } +int32_t taosUnLockFile(int32_t fd) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; #else + return (int32_t)flock(fd, LOCK_UN | LOCK_NB); +#endif +} -int32_t taosFtruncate(FileFd fd, int64_t length) { return ftruncate(fd, length); } -int32_t taosFsync(FileFd fd) { return fsync(fd); } +int32_t taosUmaskFile(int32_t val) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + return umask(val); +#endif +} -int32_t taosRename(char *oldName, char *newName) { - int32_t code = rename(oldName, newName); +int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + struct stat fileStat; + int32_t code = stat(path, &fileStat); if (code < 0) { - uError("failed to rename file %s to %s, reason:%s", oldName, newName, strerror(errno)); - } else { - uTrace("successfully to rename file %s to %s", oldName, newName); + return code; } - return code; + if (size != NULL) { + *size = fileStat.st_size; + } + + if (mtime != NULL) { + *mtime = fileStat.st_mtime; + } + + return 0; +#endif } +int32_t taosFStatFile(int32_t fd, int64_t *size, int32_t *mtime) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + struct stat fileStat; + int32_t code = fstat(fd, &fileStat); + if (code < 0) { + return code; + } + + if (size != NULL) { + *size = fileStat.st_size; + } + + if (mtime != NULL) { + *mtime = fileStat.st_mtime; + } + + return 0; +#endif +} + +int32_t taosOpenFileWrite(const char *path) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + return open(path, O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO); +#endif +} + +FileFd taosOpenFileRead(const char *path) { + #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + return open(path, O_RDONLY); +#endif +} + +int32_t taosOpenFileCreateWrite(const char *path) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + return open(path, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); +#endif +} + +int32_t taosOpenFileTruncCreateWrite(const char *path) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + return open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); #endif +} \ No newline at end of file diff --git a/src/os/src/detail/osSocket.c b/source/os/src/osSocket.c similarity index 79% rename from src/os/src/detail/osSocket.c rename to source/os/src/osSocket.c index 7ce9d6eb06621f0a691699020cff13b9c15a9a88..7964482db66e250035b81ed5db34fd0ec06d442a 100644 --- a/src/os/src/detail/osSocket.c +++ b/source/os/src/osSocket.c @@ -15,7 +15,26 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "tulog.h" + +void taosShutDownSocketRD(SOCKET fd) { +#ifdef WINDOWS + closesocket(fd); +#elif __APPLE__ + close(fd); +#else + shutdown(fd, SHUT_RD); +#endif +} + +void taosShutDownSocketWR(SOCKET fd) { +#ifdef WINDOWS + closesocket(fd); +#elif __APPLE__ + close(fd); +#else + shutdown(fd, SHUT_WR); +#endif +} #if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) @@ -87,4 +106,19 @@ const char *taosInetNtoa(struct in_addr ipInt) { return inet_ntoa(ipInt); } +#else + +const char *taosInetNtoa(struct in_addr ipInt) { + // not thread safe, only for debug usage while print log + static char tmpDstStr[16]; + return inet_ntop(AF_INET, &ipInt, tmpDstStr, INET6_ADDRSTRLEN); +} + #endif + + +#if defined(_TD_GO_DLL_) + +uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(val >> 32); } + +#endif \ No newline at end of file diff --git a/source/os/src/osTime.c b/source/os/src/osTime.c new file mode 100644 index 0000000000000000000000000000000000000000..27d046d4c91ca31d669415922c3a7a50b91e1825 --- /dev/null +++ b/source/os/src/osTime.c @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" + +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) +#include +#else +#endif + +FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + time_t t; + t = time(NULL); + SYSTEMTIME st; + GetLocalTime(&st); + + tv->tv_sec = (long)t; + tv->tv_usec = st.wMilliseconds * 1000; + + return 0; +#else + return gettimeofday(tv, NULL); +#endif +} \ No newline at end of file diff --git a/source/server/dnode/src/dnodeCfg.c b/source/server/dnode/src/dnodeCfg.c index a004ad72901bc6d54d0fa0c34ae6feee48c31aa3..2162e3f791f5f06acd65d6b0bb08abe76399b929 100644 --- a/source/server/dnode/src/dnodeCfg.c +++ b/source/server/dnode/src/dnodeCfg.c @@ -94,7 +94,7 @@ static int32_t dnodeWriteCfg(DnCfg *cfg) { len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); - taosFsync(fileno(fp)); + taosFsyncFile(fileno(fp)); fclose(fp); free(content); terrno = 0; diff --git a/source/server/dnode/src/dnodeEps.c b/source/server/dnode/src/dnodeEps.c index 725e54bb9163762541425a5df443fbb09f701471..8595b1b3392952d89e4e6adb3e342ca0a52b8a50 100644 --- a/source/server/dnode/src/dnodeEps.c +++ b/source/server/dnode/src/dnodeEps.c @@ -173,7 +173,7 @@ static int32_t dnodeWriteEps(DnEps *eps) { len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); - taosFsync(fileno(fp)); + taosFsyncFile(fileno(fp)); fclose(fp); free(content); terrno = 0; diff --git a/source/server/dnode/src/dnodeMnodeEps.c b/source/server/dnode/src/dnodeMnodeEps.c index ba926b00942f2ebd4101f886946d7aad91a62d88..9858f9d21f6a5d7dd8086f243d8894c8e654330b 100644 --- a/source/server/dnode/src/dnodeMnodeEps.c +++ b/source/server/dnode/src/dnodeMnodeEps.c @@ -82,7 +82,7 @@ static int32_t dnodeWriteMnodeEps(DnMnEps *meps) { len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); - taosFsync(fileno(fp)); + taosFsyncFile(fileno(fp)); fclose(fp); free(content); terrno = 0; diff --git a/source/server/dnode/src/dnodeTelemetry.c b/source/server/dnode/src/dnodeTelemetry.c index 51f1e90cffa1495d9b41d063c521f5f5bb9fe94e..466470c8c7f4b034701f27ede39654169afc1cf6 100644 --- a/source/server/dnode/src/dnodeTelemetry.c +++ b/source/server/dnode/src/dnodeTelemetry.c @@ -15,7 +15,6 @@ #define _DEFAULT_SOURCE #include "os.h" -// #include "osTime.h" #include "tbuffer.h" #include "tglobal.h" #include "tsocket.h" @@ -255,16 +254,16 @@ static void* dnodeTelemThreadFp(void* param) { } static void dnodeGetEmail(DnTelem* telem, char* filepath) { - int32_t fd = open(filepath, O_RDONLY); + int32_t fd = taosOpenFileRead(filepath); if (fd < 0) { return; } - if (taosRead(fd, (void*)telem->email, TSDB_FQDN_LEN) < 0) { + if (taosReadFile(fd, (void*)telem->email, TSDB_FQDN_LEN) < 0) { dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno)); } - taosClose(fd); + taosCloseFile(fd); } int32_t dnodeInitTelemetry(Dnode* dnode, DnTelem** out) { diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index 4d0333883539183be5c092ece4fdba75b6e1600b..d9d2c9ad47912fd1c1e5351c75b6ddbf03d2a324 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -242,7 +242,7 @@ static void dnodeProcessMsgFromShell(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *p static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secret, char *ckey) { if (strcmp(user, "nettestinternal") == 0) { char pass[32] = {0}; - taosEncryptPass((uint8_t *)user, strlen(user), pass); + taosEncryptPass((uint8_t *)user, strlen(user), pass, TSDB_KEY_LEN); *spi = 0; *encrypt = 0; *ckey = 0; diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index c53be0c59bf537f096e16daf6d86579044e730ac..cec2554b52c0fc59cb0f0eaec592a27ca0c178b6 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -3,7 +3,7 @@ add_library(util ${UTIL_SRC}) target_include_directories( util PUBLIC "${CMAKE_SOURCE_DIR}/include/util" - PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/include" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( util diff --git a/source/util/inc/tulog.h b/source/util/inc/tulog.h new file mode 100644 index 0000000000000000000000000000000000000000..566da40a10e078b9789e2e1b76a8d82fe89aef46 --- /dev/null +++ b/source/util/inc/tulog.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_COMMON_ULOG_H +#define TDENGINE_COMMON_ULOG_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tlog.h" + +extern int32_t uDebugFlag; +extern int8_t tscEmbedded; + +#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} +#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }} +#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }} + +#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); } +#define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); } + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/source/util/src/hash.c b/source/util/src/hash.c new file mode 100644 index 0000000000000000000000000000000000000000..21e835a6bf19ce95b71818af3ab2fc14924d45d2 --- /dev/null +++ b/source/util/src/hash.c @@ -0,0 +1,916 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "hash.h" +#include "tulog.h" +// #include "taosdef.h" + +#define EXT_SIZE 1024 + +#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) + +#define DO_FREE_HASH_NODE(_n) \ + do { \ + tfree(_n); \ + } while (0) + +#define FREE_HASH_NODE(_h, _n) \ + do { \ + if ((_h)->freeFp) { \ + (_h)->freeFp(GET_HASH_NODE_DATA(_n)); \ + } \ + \ + DO_FREE_HASH_NODE(_n); \ + } while (0); + +static FORCE_INLINE void __wr_lock(void *lock, int32_t type) { + if (type == HASH_NO_LOCK) { + return; + } + taosWLockLatch(lock); +} + +static FORCE_INLINE void __rd_lock(void *lock, int32_t type) { + if (type == HASH_NO_LOCK) { + return; + } + + taosRLockLatch(lock); +} + +static FORCE_INLINE void __rd_unlock(void *lock, int32_t type) { + if (type == HASH_NO_LOCK) { + return; + } + + taosRUnLockLatch(lock); +} + +static FORCE_INLINE void __wr_unlock(void *lock, int32_t type) { + if (type == HASH_NO_LOCK) { + return; + } + + taosWUnLockLatch(lock); +} + +static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { + int32_t len = MIN(length, HASH_MAX_CAPACITY); + + int32_t i = 4; + while (i < len) i = (i << 1u); + return i; +} + +static FORCE_INLINE SHashNode *doSearchInEntryList(SHashObj *pHashObj, SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) { + SHashNode *pNode = pe->next; + while (pNode) { + if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) { + assert(pNode->hashVal == hashVal); + break; + } + + pNode = pNode->next; + } + + return pNode; +} + +/** + * Resize the hash list if the threshold is reached + * + * @param pHashObj + */ +static void taosHashTableResize(SHashObj *pHashObj); + +/** + * @param key key of object for hash, usually a null-terminated string + * @param keyLen length of key + * @param pData actually data. Requires a consecutive memory block, no pointer is allowed in pData. + * Pointer copy causes memory access error. + * @param dsize size of data + * @return SHashNode + */ +static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal); + +/** + * Update the hash node + * + * @param pNode hash node + * @param key key for generate hash value + * @param keyLen key length + * @param pData actual data + * @param dsize size of actual data + * @return hash node + */ +static FORCE_INLINE SHashNode *doUpdateHashNode(SHashObj *pHashObj, SHashEntry* pe, SHashNode* prev, SHashNode *pNode, SHashNode *pNewNode) { + assert(pNode->keyLen == pNewNode->keyLen); + + pNode->count--; + if (prev != NULL) { + prev->next = pNewNode; + } else { + pe->next = pNewNode; + } + + if (pNode->count <= 0) { + pNewNode->next = pNode->next; + DO_FREE_HASH_NODE(pNode); + } else { + pNewNode->next = pNode; + pe->num++; + atomic_add_fetch_64(&pHashObj->size, 1); + } + + return pNewNode; +} + +/** + * insert the hash node at the front of the linked list + * + * @param pHashObj + * @param pNode + */ +static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode); + +/** + * Check whether the hash table is empty or not. + * + * @param pHashObj the hash table object + * @return if the hash table is empty or not + */ +static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj); + +/** + * Get the next element in hash table for iterator + * @param pIter + * @return + */ + +SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) { + assert(fn != NULL); + if (capacity == 0) { + capacity = 4; + } + + SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj)); + if (pHashObj == NULL) { + uError("failed to allocate memory, reason:%s", strerror(errno)); + return NULL; + } + + // the max slots is not defined by user + pHashObj->capacity = taosHashCapacity((int32_t)capacity); + assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); + pHashObj->equalFp = memcmp; + pHashObj->hashFp = fn; + pHashObj->type = type; + pHashObj->enableUpdate = update; + + pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(void *)); + if (pHashObj->hashList == NULL) { + free(pHashObj); + uError("failed to allocate memory, reason:%s", strerror(errno)); + return NULL; + } else { + pHashObj->pMemBlock = taosArrayInit(8, sizeof(void *)); + + void *p = calloc(pHashObj->capacity, sizeof(SHashEntry)); + for (int32_t i = 0; i < pHashObj->capacity; ++i) { + pHashObj->hashList[i] = (void *)((char *)p + i * sizeof(SHashEntry)); + } + + taosArrayPush(pHashObj->pMemBlock, &p); + } + + return pHashObj; +} + +void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp) { + if (pHashObj != NULL && fp != NULL) { + pHashObj->equalFp = fp; + } +} + +int32_t taosHashGetSize(const SHashObj *pHashObj) { + if (!pHashObj) { + return 0; + } + return (int32_t)atomic_load_64(&pHashObj->size); +} + +static FORCE_INLINE bool taosHashTableEmpty(const SHashObj *pHashObj) { + return taosHashGetSize(pHashObj) == 0; +} + +int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); + SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal); + if (pNewNode == NULL) { + return -1; + } + + // need the resize process, write lock applied + if (HASH_NEED_RESIZE(pHashObj)) { + __wr_lock(&pHashObj->lock, pHashObj->type); + taosHashTableResize(pHashObj); + __wr_unlock(&pHashObj->lock, pHashObj->type); + } + + __rd_lock(&pHashObj->lock, pHashObj->type); + + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + SHashEntry *pe = pHashObj->hashList[slot]; + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWLockLatch(&pe->latch); + } + + SHashNode *pNode = pe->next; + if (pe->num > 0) { + assert(pNode != NULL); + } else { + assert(pNode == NULL); + } + + SHashNode* prev = NULL; + while (pNode) { + if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) { + assert(pNode->hashVal == hashVal); + break; + } + + prev = pNode; + pNode = pNode->next; + } + + if (pNode == NULL) { + // no data in hash table with the specified key, add it into hash table + pushfrontNodeInEntryList(pe, pNewNode); + + if (pe->num == 0) { + assert(pe->next == NULL); + } else { + assert(pe->next != NULL); + } + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWUnLockLatch(&pe->latch); + } + + // enable resize + __rd_unlock(&pHashObj->lock, pHashObj->type); + atomic_add_fetch_64(&pHashObj->size, 1); + + return 0; + } else { + // not support the update operation, return error + if (pHashObj->enableUpdate) { + doUpdateHashNode(pHashObj, pe, prev, pNode, pNewNode); + } else { + DO_FREE_HASH_NODE(pNewNode); + } + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWUnLockLatch(&pe->latch); + } + + // enable resize + __rd_unlock(&pHashObj->lock, pHashObj->type); + + return pHashObj->enableUpdate ? 0 : -1; + } +} + +void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { + return taosHashGetClone(pHashObj, key, keyLen, NULL, NULL); +} +//TODO(yihaoDeng), merge with taosHashGetClone +void* taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void** d, size_t *sz) { + if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) { + return NULL; + } + + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); + + // only add the read lock to disable the resize process + __rd_lock(&pHashObj->lock, pHashObj->type); + + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + SHashEntry *pe = pHashObj->hashList[slot]; + + // no data, return directly + if (atomic_load_32(&pe->num) == 0) { + __rd_unlock(&pHashObj->lock, pHashObj->type); + return NULL; + } + + char *data = NULL; + + // lock entry + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosRLockLatch(&pe->latch); + } + + if (pe->num > 0) { + assert(pe->next != NULL); + } else { + assert(pe->next == NULL); + } + + SHashNode *pNode = doSearchInEntryList(pHashObj, pe, key, keyLen, hashVal); + if (pNode != NULL) { + if (fp != NULL) { + fp(GET_HASH_NODE_DATA(pNode)); + } + + if (*d == NULL) { + *sz = pNode->dataLen + EXT_SIZE; + *d = calloc(1, *sz); + } else if (*sz < pNode->dataLen){ + *sz = pNode->dataLen + EXT_SIZE; + *d = realloc(*d, *sz); + } + memcpy((char *)(*d), GET_HASH_NODE_DATA(pNode), pNode->dataLen); + // just make runtime happy + if ((*sz) - pNode->dataLen > 0) { + memset((char *)(*d) + pNode->dataLen, 0, (*sz) - pNode->dataLen); + } + + data = GET_HASH_NODE_DATA(pNode); + } + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosRUnLockLatch(&pe->latch); + } + + __rd_unlock(&pHashObj->lock, pHashObj->type); + return data; +} + +void* taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void* d) { + if (taosHashTableEmpty(pHashObj) || keyLen == 0 || key == NULL) { + return NULL; + } + + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); + + // only add the read lock to disable the resize process + __rd_lock(&pHashObj->lock, pHashObj->type); + + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + SHashEntry *pe = pHashObj->hashList[slot]; + + // no data, return directly + if (atomic_load_32(&pe->num) == 0) { + __rd_unlock(&pHashObj->lock, pHashObj->type); + return NULL; + } + + char *data = NULL; + + // lock entry + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosRLockLatch(&pe->latch); + } + + if (pe->num > 0) { + assert(pe->next != NULL); + } else { + assert(pe->next == NULL); + } + + SHashNode *pNode = doSearchInEntryList(pHashObj, pe, key, keyLen, hashVal); + if (pNode != NULL) { + if (fp != NULL) { + fp(GET_HASH_NODE_DATA(pNode)); + } + + if (d != NULL) { + memcpy(d, GET_HASH_NODE_DATA(pNode), pNode->dataLen); + } + + data = GET_HASH_NODE_DATA(pNode); + } + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosRUnLockLatch(&pe->latch); + } + + __rd_unlock(&pHashObj->lock, pHashObj->type); + return data; +} + +int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { + return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0); +} + +int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t dsize) { + if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) { + return -1; + } + + uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen); + + // disable the resize process + __rd_lock(&pHashObj->lock, pHashObj->type); + + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + SHashEntry *pe = pHashObj->hashList[slot]; + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWLockLatch(&pe->latch); + } + + // double check after locked + if (pe->num == 0) { + assert(pe->next == NULL); + taosWUnLockLatch(&pe->latch); + + __rd_unlock(&pHashObj->lock, pHashObj->type); + return -1; + } + + int code = -1; + SHashNode *pNode = pe->next; + SHashNode *prevNode = NULL; + + while (pNode) { + if ((pNode->keyLen == keyLen) && ((*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0) && pNode->removed == 0) + break; + + prevNode = pNode; + pNode = pNode->next; + } + + if (pNode) { + code = 0; // it is found + + pNode->count--; + pNode->removed = 1; + if (pNode->count <= 0) { + if (prevNode) { + prevNode->next = pNode->next; + } else { + pe->next = pNode->next; + } + + if (data) memcpy(data, GET_HASH_NODE_DATA(pNode), dsize); + + pe->num--; + atomic_sub_fetch_64(&pHashObj->size, 1); + FREE_HASH_NODE(pHashObj, pNode); + } + } + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWUnLockLatch(&pe->latch); + } + + __rd_unlock(&pHashObj->lock, pHashObj->type); + + return code; +} + +int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param) { + if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) { + return 0; + } + + // disable the resize process + __rd_lock(&pHashObj->lock, pHashObj->type); + + int32_t numOfEntries = (int32_t)pHashObj->capacity; + for (int32_t i = 0; i < numOfEntries; ++i) { + SHashEntry *pEntry = pHashObj->hashList[i]; + if (pEntry->num == 0) { + continue; + } + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWLockLatch(&pEntry->latch); + } + + // todo remove the first node + SHashNode *pNode = NULL; + while((pNode = pEntry->next) != NULL) { + if (fp && (!fp(param, GET_HASH_NODE_DATA(pNode)))) { + pEntry->num -= 1; + atomic_sub_fetch_64(&pHashObj->size, 1); + + pEntry->next = pNode->next; + + if (pEntry->num == 0) { + assert(pEntry->next == NULL); + } else { + assert(pEntry->next != NULL); + } + + FREE_HASH_NODE(pHashObj, pNode); + } else { + break; + } + } + + // handle the following node + if (pNode != NULL) { + assert(pNode == pEntry->next); + SHashNode *pNext = NULL; + + while ((pNext = pNode->next) != NULL) { + // not qualified, remove it + if (fp && (!fp(param, GET_HASH_NODE_DATA(pNext)))) { + pNode->next = pNext->next; + pEntry->num -= 1; + atomic_sub_fetch_64(&pHashObj->size, 1); + + if (pEntry->num == 0) { + assert(pEntry->next == NULL); + } else { + assert(pEntry->next != NULL); + } + + FREE_HASH_NODE(pHashObj, pNext); + } else { + pNode = pNext; + } + } + } + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWUnLockLatch(&pEntry->latch); + } + } + + __rd_unlock(&pHashObj->lock, pHashObj->type); + return 0; +} + +void taosHashClear(SHashObj *pHashObj) { + if (pHashObj == NULL) { + return; + } + + SHashNode *pNode, *pNext; + + __wr_lock(&pHashObj->lock, pHashObj->type); + + for (int32_t i = 0; i < pHashObj->capacity; ++i) { + SHashEntry *pEntry = pHashObj->hashList[i]; + if (pEntry->num == 0) { + assert(pEntry->next == 0); + continue; + } + + pNode = pEntry->next; + assert(pNode != NULL); + + while (pNode) { + pNext = pNode->next; + FREE_HASH_NODE(pHashObj, pNode); + + pNode = pNext; + } + + pEntry->num = 0; + pEntry->next = NULL; + } + + pHashObj->size = 0; + __wr_unlock(&pHashObj->lock, pHashObj->type); +} + +void taosHashCleanup(SHashObj *pHashObj) { + if (pHashObj == NULL) { + return; + } + + taosHashClear(pHashObj); + tfree(pHashObj->hashList); + + // destroy mem block + size_t memBlock = taosArrayGetSize(pHashObj->pMemBlock); + for (int32_t i = 0; i < memBlock; ++i) { + void *p = taosArrayGetP(pHashObj->pMemBlock, i); + tfree(p); + } + + taosArrayDestroy(pHashObj->pMemBlock); + + memset(pHashObj, 0, sizeof(SHashObj)); + free(pHashObj); +} + +// for profile only +int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) { + if (pHashObj == NULL || taosHashTableEmpty(pHashObj)) { + return 0; + } + + int32_t num = 0; + + for (int32_t i = 0; i < pHashObj->size; ++i) { + SHashEntry *pEntry = pHashObj->hashList[i]; + if (num < pEntry->num) { + num = pEntry->num; + } + } + + return num; +} + +void taosHashTableResize(SHashObj *pHashObj) { + if (!HASH_NEED_RESIZE(pHashObj)) { + return; + } + + // double the original capacity + SHashNode *pNode = NULL; + SHashNode *pNext = NULL; + + int32_t newSize = (int32_t)(pHashObj->capacity << 1u); + if (newSize > HASH_MAX_CAPACITY) { + // uDebug("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", + // pHashObj->capacity, HASH_MAX_CAPACITY); + return; + } + + int64_t st = taosGetTimestampUs(); + void *pNewEntryList = realloc(pHashObj->hashList, sizeof(void *) * newSize); + if (pNewEntryList == NULL) { // todo handle error + // uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); + return; + } + + pHashObj->hashList = pNewEntryList; + + size_t inc = newSize - pHashObj->capacity; + void * p = calloc(inc, sizeof(SHashEntry)); + + for (int32_t i = 0; i < inc; ++i) { + pHashObj->hashList[i + pHashObj->capacity] = (void *)((char *)p + i * sizeof(SHashEntry)); + } + + taosArrayPush(pHashObj->pMemBlock, &p); + + pHashObj->capacity = newSize; + for (int32_t i = 0; i < pHashObj->capacity; ++i) { + SHashEntry *pe = pHashObj->hashList[i]; + + if (pe->num == 0) { + assert(pe->next == NULL); + } else { + assert(pe->next != NULL); + } + + if (pe->num == 0) { + assert(pe->next == NULL); + continue; + } + + while ((pNode = pe->next) != NULL) { + int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity); + if (j != i) { + pe->num -= 1; + pe->next = pNode->next; + + if (pe->num == 0) { + assert(pe->next == NULL); + } else { + assert(pe->next != NULL); + } + + SHashEntry *pNewEntry = pHashObj->hashList[j]; + pushfrontNodeInEntryList(pNewEntry, pNode); + } else { + break; + } + } + + if (pNode != NULL) { + while ((pNext = pNode->next) != NULL) { + int32_t j = HASH_INDEX(pNext->hashVal, pHashObj->capacity); + if (j != i) { + pe->num -= 1; + + pNode->next = pNext->next; + pNext->next = NULL; + + // added into new slot + SHashEntry *pNewEntry = pHashObj->hashList[j]; + + if (pNewEntry->num == 0) { + assert(pNewEntry->next == NULL); + } else { + assert(pNewEntry->next != NULL); + } + + pushfrontNodeInEntryList(pNewEntry, pNext); + } else { + pNode = pNext; + } + } + + if (pe->num == 0) { + assert(pe->next == NULL); + } else { + assert(pe->next != NULL); + } + + } + + } + + int64_t et = taosGetTimestampUs(); + + uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", (int32_t)pHashObj->capacity, + ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0); +} + +SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { + SHashNode *pNewNode = malloc(sizeof(SHashNode) + keyLen + dsize); + + if (pNewNode == NULL) { + uError("failed to allocate memory, reason:%s", strerror(errno)); + return NULL; + } + + pNewNode->keyLen = (uint32_t)keyLen; + pNewNode->hashVal = hashVal; + pNewNode->dataLen = (uint32_t) dsize; + pNewNode->count = 1; + pNewNode->removed = 0; + pNewNode->next = NULL; + + memcpy(GET_HASH_NODE_DATA(pNewNode), pData, dsize); + memcpy(GET_HASH_NODE_KEY(pNewNode), key, keyLen); + + return pNewNode; +} + +void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode) { + assert(pNode != NULL && pEntry != NULL); + + pNode->next = pEntry->next; + pEntry->next = pNode; + + pEntry->num += 1; +} + +size_t taosHashGetMemSize(const SHashObj *pHashObj) { + if (pHashObj == NULL) { + return 0; + } + + return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj); +} + +FORCE_INLINE void *taosHashGetDataKey(SHashObj *pHashObj, void *data) { + SHashNode * node = GET_HASH_PNODE(data); + return GET_HASH_NODE_KEY(node); +} + +FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) { + SHashNode * node = GET_HASH_PNODE(data); + return node->keyLen; +} + + +// release the pNode, return next pNode, and lock the current entry +static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) { + + SHashNode *pOld = (SHashNode *)GET_HASH_PNODE(p); + SHashNode *prevNode = NULL; + + *slot = HASH_INDEX(pOld->hashVal, pHashObj->capacity); + SHashEntry *pe = pHashObj->hashList[*slot]; + + // lock entry + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWLockLatch(&pe->latch); + } + + SHashNode *pNode = pe->next; + + while (pNode) { + if (pNode == pOld) + break; + + prevNode = pNode; + pNode = pNode->next; + } + + if (pNode) { + pNode = pNode->next; + while (pNode) { + if (pNode->removed == 0) break; + pNode = pNode->next; + } + + pOld->count--; + if (pOld->count <=0) { + if (prevNode) { + prevNode->next = pOld->next; + } else { + pe->next = pOld->next; + } + + pe->num--; + atomic_sub_fetch_64(&pHashObj->size, 1); + FREE_HASH_NODE(pHashObj, pOld); + } + } else { + uError("pNode:%p data:%p is not there!!!", pNode, p); + } + + return pNode; +} + +void *taosHashIterate(SHashObj *pHashObj, void *p) { + if (pHashObj == NULL) return NULL; + + int slot = 0; + char *data = NULL; + + // only add the read lock to disable the resize process + __rd_lock(&pHashObj->lock, pHashObj->type); + + SHashNode *pNode = NULL; + if (p) { + pNode = taosHashReleaseNode(pHashObj, p, &slot); + if (pNode == NULL) { + SHashEntry *pe = pHashObj->hashList[slot]; + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWUnLockLatch(&pe->latch); + } + + slot = slot + 1; + } + } + + if (pNode == NULL) { + for (; slot < pHashObj->capacity; ++slot) { + SHashEntry *pe = pHashObj->hashList[slot]; + + // lock entry + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWLockLatch(&pe->latch); + } + + pNode = pe->next; + while (pNode) { + if (pNode->removed == 0) break; + pNode = pNode->next; + } + + if (pNode) break; + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWUnLockLatch(&pe->latch); + } + } + } + + if (pNode) { + SHashEntry *pe = pHashObj->hashList[slot]; + pNode->count++; + data = GET_HASH_NODE_DATA(pNode); + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWUnLockLatch(&pe->latch); + } + } + + __rd_unlock(&pHashObj->lock, pHashObj->type); + return data; + +} + +void taosHashCancelIterate(SHashObj *pHashObj, void *p) { + if (pHashObj == NULL || p == NULL) return; + + // only add the read lock to disable the resize process + __rd_lock(&pHashObj->lock, pHashObj->type); + + int slot; + taosHashReleaseNode(pHashObj, p, &slot); + + SHashEntry *pe = pHashObj->hashList[slot]; + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWUnLockLatch(&pe->latch); + } + + __rd_unlock(&pHashObj->lock, pHashObj->type); +} diff --git a/src/util/src/tlog.c b/source/util/src/tlog.c similarity index 89% rename from src/util/src/tlog.c rename to source/util/src/tlog.c index 1ce3eadf58432337511d0d600848ad334b96fc91..1906ea1629e3aa85b1f087837a4d1fd31980ece4 100644 --- a/src/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -19,7 +19,7 @@ #include "tlog.h" #include "tnote.h" #include "tutil.h" - + #define MAX_LOGLINE_SIZE (1000) #define MAX_LOGLINE_BUFFER_SIZE (MAX_LOGLINE_SIZE + 10) #define MAX_LOGLINE_CONTENT_SIZE (MAX_LOGLINE_SIZE - 100) @@ -85,6 +85,8 @@ int64_t dbgWSize = 0; char tsLogDir[TSDB_FILENAME_LEN] = "/var/log/power"; #elif (_TD_TQ_ == true) char tsLogDir[TSDB_FILENAME_LEN] = "/var/log/tq"; +#elif (_TD_PRO_ == true) +char tsLogDir[TSDB_FILENAME_LEN] = "/var/log/ProDB"; #else char tsLogDir[PATH_MAX] = "/var/log/taos"; #endif @@ -134,11 +136,11 @@ void taosCloseLog() { // taosCloseLog(); } -static bool taosLockFile(int32_t fd) { +static bool taosLockLogFile(int32_t fd) { if (fd < 0) return false; if (tsLogObj.fileNum > 1) { - int32_t ret = flock(fd, LOCK_EX | LOCK_NB); + int32_t ret = taosUnLockFile(fd); if (ret == 0) { return true; } @@ -147,11 +149,11 @@ static bool taosLockFile(int32_t fd) { return false; } -static void taosUnLockFile(int32_t fd) { +static void taosUnLockLogFile(int32_t fd) { if (fd < 0) return; if (tsLogObj.fileNum > 1) { - flock(fd, LOCK_UN | LOCK_NB); + taosUnLockFile(fd); } } @@ -183,9 +185,9 @@ static void *taosThreadToOpenNewFile(void *param) { char name[LOG_FILE_NAME_LEN + 20]; sprintf(name, "%s.%d", tsLogObj.logName, tsLogObj.flag); - umask(0); + taosUmaskFile(0); - int32_t fd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); + int32_t fd = taosOpenFileTruncCreateWrite(name); if (fd < 0) { tsLogObj.openInProgress = 0; tsLogObj.lines = tsLogObj.maxLines - 1000; @@ -193,8 +195,8 @@ static void *taosThreadToOpenNewFile(void *param) { return NULL; } - taosLockFile(fd); - (void)lseek(fd, 0, SEEK_SET); + taosLockLogFile(fd); + (void)taosLSeekFile(fd, 0, SEEK_SET); int32_t oldFd = tsLogObj.logHandle->fd; tsLogObj.logHandle->fd = fd; @@ -246,7 +248,7 @@ void taosResetLog() { } static bool taosCheckFileIsOpen(char *logFileName) { - int32_t fd = open(logFileName, O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO); + int32_t fd = taosOpenFileWrite(logFileName); if (fd < 0) { if (errno == ENOENT) { return false; @@ -256,12 +258,12 @@ static bool taosCheckFileIsOpen(char *logFileName) { } } - if (taosLockFile(fd)) { - taosUnLockFile(fd); - taosClose(fd); + if (taosLockLogFile(fd)) { + taosUnLockLogFile(fd); + taosCloseFile(fd); return false; } else { - taosClose(fd); + taosCloseFile(fd); return true; } } @@ -298,9 +300,9 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) { maxFileNum = 1; #endif - char name[LOG_FILE_NAME_LEN + 50] = "\0"; - struct stat logstat0, logstat1; - int32_t size; + char name[LOG_FILE_NAME_LEN + 50] = "\0"; + int32_t logstat0_mtime, logstat1_mtime; + int32_t size; tsLogObj.maxLines = maxLines; tsLogObj.fileNum = maxFileNum; @@ -310,14 +312,14 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) { strcpy(name, fn); strcat(name, ".0"); } - bool log0Exist = stat(name, &logstat0) >= 0; + bool log0Exist = taosStatFile(name, NULL, &logstat0_mtime) >= 0; if (strlen(fn) < LOG_FILE_NAME_LEN + 50 - 2) { strcpy(name, fn); strcat(name, ".1"); } - bool log1Exist = stat(name, &logstat1) >= 0; - + bool log1Exist = taosStatFile(name, NULL, &logstat1_mtime) >= 0; + // if none of the log files exist, open 0, if both exists, open the old one if (!log0Exist && !log1Exist) { tsLogObj.flag = 0; @@ -326,39 +328,39 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) { } else if (!log0Exist) { tsLogObj.flag = 1; } else { - tsLogObj.flag = (logstat0.st_mtime > logstat1.st_mtime) ? 0 : 1; + tsLogObj.flag = (logstat0_mtime > logstat1_mtime) ? 0 : 1; } char fileName[LOG_FILE_NAME_LEN + 50] = "\0"; sprintf(fileName, "%s.%d", tsLogObj.logName, tsLogObj.flag); pthread_mutex_init(&tsLogObj.logMutex, NULL); - umask(0); - tsLogObj.logHandle->fd = open(fileName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + taosUmaskFile(0); + tsLogObj.logHandle->fd = taosOpenFileCreateWrite(fileName); if (tsLogObj.logHandle->fd < 0) { printf("\nfailed to open log file:%s, reason:%s\n", fileName, strerror(errno)); return -1; } - taosLockFile(tsLogObj.logHandle->fd); + taosLockLogFile(tsLogObj.logHandle->fd); // only an estimate for number of lines - struct stat filestat; - if (fstat(tsLogObj.logHandle->fd, &filestat) < 0) { + int64_t filesize = 0; + if (taosFStatFile(tsLogObj.logHandle->fd, &filesize, NULL) < 0) { printf("\nfailed to fstat log file:%s, reason:%s\n", fileName, strerror(errno)); return -1; } - size = (int32_t)filestat.st_size; + size = (int32_t)filesize; tsLogObj.lines = size / 60; - lseek(tsLogObj.logHandle->fd, 0, SEEK_END); + taosLSeekFile(tsLogObj.logHandle->fd, 0, SEEK_END); sprintf(name, "==================================================\n"); - taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name)); + taosWriteFile(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name)); sprintf(name, " new log file \n"); - taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name)); + taosWriteFile(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name)); sprintf(name, "==================================================\n"); - taosWrite(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name)); + taosWriteFile(tsLogObj.logHandle->fd, name, (uint32_t)strlen(name)); return 0; } @@ -377,7 +379,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { struct timeval timeSecs; time_t curTime; - gettimeofday(&timeSecs, NULL); + taosGetTimeOfDay(&timeSecs); curTime = timeSecs.tv_sec; ptm = localtime_r(&curTime, &Tm); @@ -408,7 +410,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { if (tsAsyncLog) { taosPushLogBuffer(tsLogObj.logHandle, buffer, len); } else { - taosWrite(tsLogObj.logHandle->fd, buffer, len); + taosWriteFile(tsLogObj.logHandle->fd, buffer, len); } if (tsLogObj.maxLines > 0) { @@ -419,7 +421,7 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { } if (dflag & DEBUG_SCREEN) - taosWrite(1, buffer, (uint32_t)len); + taosWriteFile(1, buffer, (uint32_t)len); if (dflag == 255) nInfo(buffer, len); } @@ -439,7 +441,7 @@ void taosDumpData(unsigned char *msg, int32_t len) { pos += 3; if (c >= 16) { temp[pos++] = '\n'; - taosWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos); + taosWriteFile(tsLogObj.logHandle->fd, temp, (uint32_t)pos); c = 0; pos = 0; } @@ -447,7 +449,7 @@ void taosDumpData(unsigned char *msg, int32_t len) { temp[pos++] = '\n'; - taosWrite(tsLogObj.logHandle->fd, temp, (uint32_t)pos); + taosWriteFile(tsLogObj.logHandle->fd, temp, (uint32_t)pos); } void taosPrintLongString(const char *flags, int32_t dflag, const char *format, ...) { @@ -464,7 +466,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, . struct timeval timeSecs; time_t curTime; - gettimeofday(&timeSecs, NULL); + taosGetTimeOfDay(&timeSecs); curTime = timeSecs.tv_sec; ptm = localtime_r(&curTime, &Tm); @@ -485,7 +487,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, . if (tsAsyncLog) { taosPushLogBuffer(tsLogObj.logHandle, buffer, len); } else { - taosWrite(tsLogObj.logHandle->fd, buffer, len); + taosWriteFile(tsLogObj.logHandle->fd, buffer, len); } if (tsLogObj.maxLines > 0) { @@ -495,7 +497,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, . } } - if (dflag & DEBUG_SCREEN) taosWrite(1, buffer, (uint32_t)len); + if (dflag & DEBUG_SCREEN) taosWriteFile(1, buffer, (uint32_t)len); } #if 0 @@ -506,8 +508,8 @@ void taosCloseLog() { static void taosCloseLogByFd(int32_t fd) { if (fd >= 0) { - taosUnLockFile(fd); - taosClose(fd); + taosUnLockLogFile(fd); + taosCloseFile(fd); } } @@ -644,12 +646,12 @@ static void taosWriteLog(SLogBuff *tLogBuff) { } if (start < end) { - taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); + taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); } else { int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start; - taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize); + taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize); - taosWrite(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end); + taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end); } dbgWN++;