diff --git a/.gitignore b/.gitignore index c5d90eea5042bd069e69420c70d478ef9bd2e2a3..e6e327327c2a72bc2262d9b2923314c950c78cf6 100644 --- a/.gitignore +++ b/.gitignore @@ -66,6 +66,8 @@ CMakeError.log /test/cfg /src/.vs *.o +version.c +taos.rc src/connector/jdbc/.settings/ tests/comparisonTest/cassandra/cassandratest/.classpath tests/comparisonTest/cassandra/cassandratest/.project diff --git a/CMakeLists.txt b/CMakeLists.txt index bfb8e902a88d57e3d6b9894718c773b394c5e183..565ab32f005226a13368e2df444929625092e9ad 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,7 @@ INCLUDE(cmake/input.inc) INCLUDE(cmake/platform.inc) INCLUDE(cmake/define.inc) INCLUDE(cmake/env.inc) +INCLUDE(cmake/version.inc) INCLUDE(cmake/install.inc) ADD_SUBDIRECTORY(deps) diff --git a/cmake/version.inc b/cmake/version.inc new file mode 100644 index 0000000000000000000000000000000000000000..8c025201421e285ea54a895225d90205680f1e84 --- /dev/null +++ b/cmake/version.inc @@ -0,0 +1,42 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +SET(TD_VER_1 "2") +SET(TD_VER_2 "0") +SET(TD_VER_3 "2") +SET(TD_VER_4 "3") +SET(TD_VER_GIT "d711657139620f6c50f362597020705b8ad26bd2") +SET(TD_VER_GIT_INTERNAL "1d74ae24c541ffbb280e8630883c0236cd45f8c7") + +SET(TD_VER_VERTYPE "stable") +SET(TD_VER_CPUTYPE "x64") +SET(TD_VER_OSTYPE "Linux") + +SET(TD_VER_COMPATIBLE "2.0.0.0") +STRING(TIMESTAMP TD_VER_DATE "%Y-%m-%d %H:%M:%S") + +IF (TD_LINUX_64) + SET(TD_VER_CPUTYPE "x64") +ENDIF () + +IF (TD_LINUX_32) + SET(TD_VER_CPUTYPE "x86") +ENDIF () + +IF (TD_ARM_64) + SET(TD_VER_CPUTYPE "aarch64") +ENDIF () + +IF (TD_ARM_32) + SET(TD_VER_CPUTYPE "aarch32") +ENDIF () + +IF (TD_WINDOWS_64) + SET(TD_VER_CPUTYPE "x64") +ENDIF () + +IF (TD_WINDOWS_32) + SET(TD_VER_CPUTYPE "x86") +ENDIF () + +CONFIGURE_FILE("${TD_COMMUNITY_DIR}/src/util/src/version.c.in" "${TD_COMMUNITY_DIR}/src/util/src/version.c") diff --git a/deps/MsvcLibX/CMakeLists.txt b/deps/MsvcLibX/CMakeLists.txt index fc77a3b447d337cf0b1e4343c9fb2f9d97fd423c..4428579e1c098425c9d72d7d58a5fda15cd34c93 100644 --- a/deps/MsvcLibX/CMakeLists.txt +++ b/deps/MsvcLibX/CMakeLists.txt @@ -4,5 +4,5 @@ PROJECT(TDengine) IF (TD_WINDOWS) INCLUDE_DIRECTORIES(include) AUX_SOURCE_DIRECTORY(src SRC) - ADD_LIBRARY(MsvcLibXw64 ${SRC}) + ADD_LIBRARY(MsvcLibXw ${SRC}) ENDIF () diff --git a/packaging/docker/Dockerfile b/packaging/docker/Dockerfile index 3fb34e8286e7765effd24ff06b9ab42854d7f556..668d5a49eb53aebe6185d1dc75a22afab4da3367 100644 --- a/packaging/docker/Dockerfile +++ b/packaging/docker/Dockerfile @@ -12,6 +12,6 @@ ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" ENV LANG=en_US.UTF-8 ENV LANGUAGE=en_US:en ENV LC_ALL=en_US.UTF-8 -EXPOSE 6030 6031 6032 6033 6034 6035 6036 6037 6038 6039 6040 6041 +EXPOSE 6030 6031 6032 6033 6034 6035 6036 6037 6038 6039 6040 6041 6042 CMD ["taosd"] VOLUME [ "/var/lib/taos", "/var/log/taos","/etc/taos/" ] diff --git a/packaging/release.sh b/packaging/release.sh index fe46c9792845fead4362fdd5d6cc3f3b7699ff25..bf355bdcd43f5229b1e7a82a2efda61fde6d7d18 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -10,6 +10,7 @@ set -e # -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] # -V [stable | beta] # -l [full | lite] +# -s [static | dynamic] # -n [2.0.0.3] # set parameters by default value @@ -18,9 +19,10 @@ verType=stable # [stable, beta] cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...] osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] pagMode=full # [full | lite] +soMode=dynamic # [static | dynamic] verNumber="" -while getopts "hv:V:c:o:l:n:" arg +while getopts "hv:V:c:o:l:s:n:" arg do case $arg in v) @@ -39,6 +41,10 @@ do #echo "pagMode=$OPTARG" pagMode=$(echo $OPTARG) ;; + s) + #echo "soMode=$OPTARG" + soMode=$(echo $OPTARG) + ;; n) #echo "verNumber=$OPTARG" verNumber=$(echo $OPTARG) @@ -53,6 +59,7 @@ do echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] " echo " -V [stable | beta] " echo " -l [full | lite] " + echo " -s [static | dynamic] " echo " -n [version number] " exit 0 ;; @@ -63,7 +70,7 @@ do esac done -echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} verNumber=${verNumber}" +echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} verNumber=${verNumber}" curr_dir=$(pwd) @@ -223,9 +230,9 @@ cd ${compile_dir} # check support cpu type if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]] ; then if [ "$verMode" != "cluster" ]; then - cmake ../ -DCPUTYPE=${cpuType} -DPAGMODE=${pagMode} -DOSTYPE=${osType} + cmake ../ -DCPUTYPE=${cpuType} -DPAGMODE=${pagMode} -DOSTYPE=${osType} -DSOMODE=${soMode} else - cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} + cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} fi else echo "input cpuType=${cpuType} error!!!" diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index d5cbf3cab6d408914303e3c06230930bb59f734d..5b5fb3435d6c1261c2372bddcd85a0bcd3a110f9 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -35,12 +35,14 @@ IF (TD_LINUX) ELSEIF (TD_WINDOWS) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows/win32) - + + CONFIGURE_FILE("${TD_COMMUNITY_DIR}/src/client/src/taos.rc.in" "${TD_COMMUNITY_DIR}/src/client/src/taos.rc") + ADD_LIBRARY(taos_static STATIC ${SRC}) TARGET_LINK_LIBRARIES(taos_static trpc tutil query) # generate dynamic library (*.dll) - ADD_LIBRARY(taos SHARED ${SRC}) + ADD_LIBRARY(taos SHARED ${SRC} ${TD_COMMUNITY_DIR}/src/client/src/taos.rc) IF (NOT TD_GODLL) SET_TARGET_PROPERTIES(taos PROPERTIES LINK_FLAGS /DEF:${TD_COMMUNITY_DIR}/src/client/src/taos.def) ENDIF () diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 2ca6ba669185d5bc8123d21e64805e65a8718945..cb285c8df132db2ba20ade3a6dccf2e266ae2643 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -108,7 +108,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff void tscDestroyDataBlock(STableDataBlocks* pDataBlock); void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); -SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, +SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, uint32_t offset); void* tscDestroyBlockArrayList(SArray* pDataBlockList); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 6d02bc7fbdae33bf45368f267f65304d089db6f6..e117f544bf6d3fc7a83aa824eec782140152db50 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -234,7 +234,7 @@ typedef struct { char * curSql; // current sql, resume position of sql after parsing paused int8_t parseFinished; - short numOfCols; + int16_t numOfCols; uint32_t allocSize; char * payload; int32_t payloadLen; diff --git a/src/client/src/taos.rc.in b/src/client/src/taos.rc.in new file mode 100644 index 0000000000000000000000000000000000000000..05dbd9bb7d0557999cba6a7fe24a52b94f70c54e --- /dev/null +++ b/src/client/src/taos.rc.in @@ -0,0 +1,31 @@ +1 VERSIONINFO + FILEVERSION ${TD_VER_1}, ${TD_VER_2}, ${TD_VER_3} + PRODUCTVERSION ${TD_VER_1}, ${TD_VER_2}, ${TD_VER_3} + FILEFLAGSMASK 0x17L +#ifdef _DEBUG + FILEFLAGS 0x1L +#else + FILEFLAGS 0x0L +#endif + FILEOS 0x4L + FILETYPE 0x0L + FILESUBTYPE 0x0L +BEGIN + BLOCK "StringFileInfo" + BEGIN + BLOCK "040904b0" + BEGIN + VALUE "FileDescription", "Native C Driver for TDengine" + VALUE "FileVersion", "${TD_VER_1}, ${TD_VER_2}, ${TD_VER_3}" + VALUE "InternalName", "taos.dll(${TD_VER_CPUTYPE})" + VALUE "LegalCopyright", "Copyright (C) 2020 TAOS Data" + VALUE "OriginalFilename", "" + VALUE "ProductName", "taos.dll(${TD_VER_CPUTYPE})" + VALUE "ProductVersion", "${TD_VER_1}.${TD_VER_2}.${TD_VER_3}.${TD_VER_4}" + END + END + BLOCK "VarFileInfo" + BEGIN + VALUE "Translation", 0x409, 1200 + END +END \ No newline at end of file diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index f48e7b7691536cb8f9a5b8561e95b8a5a0f0fefc..6ff97e9d00ad9cbf8cd6fdc92713b111a497c321 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -285,9 +285,9 @@ void tscKillConnection(STscObj *pObj) { SSqlObj *pSql = pObj->sqlList; while (pSql) { - //taosStopRpcConn(pSql->thandle); pSql = pSql->next; } + SSqlStream *pStream = pObj->streamList; while (pStream) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9282fa74fbab3e32f8332b69d3800d84c37e5dd6..4f179adf72320c971dc74ccf28b12f4d022a17fc 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -226,13 +226,17 @@ int tscSendMsgToServer(SSqlObj *pSql) { .handle = &pSql->pRpcCtx, .code = 0 }; - // NOTE: the rpc context should be acquired before sending data to server. // Otherwise, the pSql object may have been released already during the response function, which is // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // cause crash. - rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); - return TSDB_CODE_SUCCESS; + if (pObj != NULL && pObj->signature == pObj) { + rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); + return TSDB_CODE_SUCCESS; + } else { + //pObj->signature has been reset by other thread, ignore concurrency problem + return TSDB_CODE_TSC_CONN_KILLED; + } } void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 1bd885466c995883b7487b1efcb1dadc8d7a5013..de998a6b59fdfdb6182bc7d794f8eef237b687a0 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -201,7 +201,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, } TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) { - tscDebug("try to create a connection to %s:%u, user:%s db:%s", ip, port, user, db); + tscDebug("try to create a connection to %s:%u, user:%s db:%s", ip, port != 0 ? port : tsServerPort , user, db); if (user == NULL) user = TSDB_DEFAULT_USER; if (pass == NULL) pass = TSDB_DEFAULT_PASS; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 64a871ff746c157300fa2ee4bccc291354d12702..fa148ebb2a142074c24211c0276d786cb6cc855a 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -404,7 +404,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { taosTFree(pDataBlock); } -SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, +SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes, uint32_t offset) { uint32_t needed = pDataBlock->numOfParams + 1; if (needed > pDataBlock->numOfAllocedParams) { diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 4ffc631566f490d06f40a13a6d8978d3a72104a5..ef0713c4152832aca87f2b768ddcf3789b581a71 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -29,6 +29,7 @@ extern uint16_t tsServerPort; extern uint16_t tsDnodeShellPort; extern uint16_t tsDnodeDnodePort; extern uint16_t tsSyncPort; +extern uint16_t tsArbitratorPort; extern int32_t tsStatusInterval; extern int32_t tsNumOfMnodes; extern int32_t tsEnableVnodeBak; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index eb6fcde38502d9b0430c815e0629317339d7dc98..795585e5c9220dd209b85731b6278f1acf279aa5 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -37,6 +37,7 @@ uint16_t tsServerPort = 6030; uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035] uint16_t tsDnodeDnodePort = 6035; // udp/tcp uint16_t tsSyncPort = 6040; +uint16_t tsArbitratorPort = 6042; int32_t tsStatusInterval = 1; // second int32_t tsNumOfMnodes = 3; int32_t tsEnableVnodeBak = 1; @@ -1331,7 +1332,10 @@ int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) { *port = atoi(temp+1); } - if (*port == 0) *port = tsServerPort; + if (*port == 0) { + *port = tsServerPort; + return -1; + } return 0; } diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 6f32bc0f7ad3e7a5e1704cc24cf54ac62dbfd694..56316e9619344243ab2e5fa46e0acfc6e0331e8a 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -119,11 +119,8 @@ int32_t main(int32_t argc, char *argv[]) { syslog(LOG_INFO, "Started TDengine service successfully."); - for (int res = tsem_wait(&exitSem); res != 0; res = tsem_wait(&exitSem)) { - if (res != EINTR) { - syslog(LOG_ERR, "failed to wait exit semphore: %d", res); - break; - } + if (tsem_wait(&exitSem) != 0) { + syslog(LOG_ERR, "failed to wait exit semphore: %s", strerror(errno)); } dnodeCleanUpSystem(); diff --git a/src/inc/taos.h b/src/inc/taos.h index f3cc9bb4d79f1ca23d983154e7dad17f07802926..7e8f174b7c3737463f7e66dfdc6d5cd906791ee3 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -93,7 +93,7 @@ DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision DLL_EXPORT void taos_free_result(TAOS_RES *res); DLL_EXPORT int taos_field_count(TAOS_RES *tres); DLL_EXPORT int taos_num_fields(TAOS_RES *res); -DLL_EXPORT int taos_affected_rows(TAOS_RES *taos); +DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index ff5b9a958f53ab8f65511143ba0922269821bb41..b6d185db9cf21e6d73ec590a770baf88ef7a40ca 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -63,7 +63,7 @@ typedef struct tstr { extern const int32_t TYPE_BYTES[11]; // TODO: replace and remove code below #define CHAR_BYTES sizeof(char) -#define SHORT_BYTES sizeof(short) +#define SHORT_BYTES sizeof(int16_t) #define INT_BYTES sizeof(int) #define LONG_BYTES sizeof(int64_t) #define FLOAT_BYTES sizeof(float) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 57c2b322faee5f56a5b8e1f1fe38a597a63a0b9e..e7c3d5e4cce154b84c03dadc3f4f0a7c7f1ac69d 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -66,6 +66,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_MSG_TYPE, 0, 0x0011, "Invalid me TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_RESPONSE_TYPE, 0, 0x0012, "Invalid response type") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, 0, 0x0013, "Invalid timestamp") TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, 0, 0x0014, "Database not ready") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, 0, 0x0015, "Unable to resolve FQDN") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "Operation not supported") @@ -96,6 +97,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_APP_ERROR, 0, 0x0211, "Applicatio TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission") +TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed") // mnode TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 1daa1da03805415ed58b9ec4b11c791656af2a9a..5e7da9bbdcdb386670571f0f14dc6849a936158b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -424,7 +424,10 @@ typedef struct SColumnInfo { int16_t type; int16_t bytes; int16_t numOfFilters; - SColumnFilterInfo *filters; + union{ + int64_t placeholder; + SColumnFilterInfo *filters; + }; } SColumnInfo; typedef struct STableIdInfo { diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 4776d1cda79bf26cab94c9af52dfbf2c6da762de..be73d8d383e231235a4bdb07635f077803f1a03a 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -115,7 +115,7 @@ int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); -uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int32_t *size); +uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); // the TSDB repository info typedef struct STsdbRepoInfo { diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 2dfac7ec3282bc2f404eeffa4ccc7e85ad6cf78a..ff9c9901bd91e05f443ada68508cdccbc13ef066 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -64,7 +64,7 @@ typedef struct { if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated. */ -typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); +typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file diff --git a/src/kit/taosnetwork/client.c b/src/kit/taosnetwork/client.c index 102f9b9d89b3e39218d53105fe620b0726e41d45..b7db2ba0a2ec607de1dde010fc89be1cda1c9c9a 100644 --- a/src/kit/taosnetwork/client.c +++ b/src/kit/taosnetwork/client.c @@ -27,35 +27,46 @@ #include #include #include +#include -#define MAX_PKG_LEN (64*1000) -#define BUFFER_SIZE (MAX_PKG_LEN + 1024) +#define MAX_PKG_LEN (64*1000) +#define BUFFER_SIZE (MAX_PKG_LEN + 1024) +#define TEST_FQDN_LEN 128 +#define TEST_IPv4ADDR_LEN 16 typedef struct { - int port; - char *host; + uint16_t port; + uint32_t hostIp; + char fqdn[TEST_FQDN_LEN]; uint16_t pktLen; } info_s; typedef struct Arguments { - char * host; + char host[TEST_IPv4ADDR_LEN]; + char fqdn[TEST_FQDN_LEN]; uint16_t port; uint16_t max_port; uint16_t pktLen; } SArguments; static struct argp_option options[] = { - {0, 'h', "host", 0, "The host to connect to TDEngine. Default is localhost.", 0}, + {0, 'h', "host ip", 0, "The host ip to connect to TDEngine. Default is localhost.", 0}, {0, 'p', "port", 0, "The TCP or UDP port number to use for the connection. Default is 6030.", 1}, - {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6060.", 2}, + {0, 'm', "max port", 0, "The max TCP or UDP port number to use for the connection. Default is 6042.", 2}, + {0, 'f', "host fqdn", 0, "The host fqdn to connect to TDEngine.", 3}, {0, 'l', "test pkg len", 0, "The len of pkg for test. Default is 1000 Bytes, max not greater than 64k Bytes.\nNotes: This parameter must be consistent between the client and the server.", 3}}; static error_t parse_opt(int key, char *arg, struct argp_state *state) { - + wordexp_t full_path; SArguments *arguments = state->input; switch (key) { case 'h': - arguments->host = arg; + if (wordexp(arg, &full_path, 0) != 0) { + fprintf(stderr, "Invalid host ip %s\n", arg); + return -1; + } + strcpy(arguments->host, full_path.we_wordv[0]); + wordfree(&full_path); break; case 'p': arguments->port = atoi(arg); @@ -66,6 +77,14 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'l': arguments->pktLen = atoi(arg); break; + case 'f': + if (wordexp(arg, &full_path, 0) != 0) { + fprintf(stderr, "Invalid host fqdn %s\n", arg); + return -1; + } + strcpy(arguments->fqdn, full_path.we_wordv[0]); + wordfree(&full_path); + break; default: return ARGP_ERR_UNKNOWN; @@ -76,8 +95,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = {options, parse_opt, 0, 0}; int checkTcpPort(info_s *info) { - int port = info->port; - char *host = info->host; int clientSocket; struct sockaddr_in serverAddr; @@ -88,21 +105,35 @@ int checkTcpPort(info_s *info) { printf("socket() fail: %s\n", strerror(errno)); return -1; } + + // set send and recv overtime + struct timeval timeout; + timeout.tv_sec = 2; //s + timeout.tv_usec = 0; //us + if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + perror("setsockopt send timer failed:"); + } + if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + perror("setsockopt recv timer failed:"); + } + serverAddr.sin_family = AF_INET; - serverAddr.sin_port = htons(port); + serverAddr.sin_port = htons(info->port); - serverAddr.sin_addr.s_addr = inet_addr(host); + serverAddr.sin_addr.s_addr = info->hostIp; //printf("=================================\n"); if (connect(clientSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) { - printf("connect() fail: %s\n", strerror(errno)); + printf("connect() fail: %s\t", strerror(errno)); return -1; } //printf("Connect to: %s:%d...success\n", host, port); memset(sendbuf, 0, BUFFER_SIZE); memset(recvbuf, 0, BUFFER_SIZE); - sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", host, port); + struct in_addr ipStr; + memcpy(&ipStr, &info->hostIp, 4); + sprintf(sendbuf, "client send tcp pkg to %s:%d, content: 1122334455", inet_ntoa(ipStr), info->port); sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); send(clientSocket, sendbuf, info->pktLen, 0); @@ -120,7 +151,7 @@ int checkTcpPort(info_s *info) { if (errno == EINTR) { continue; } else { - printf("recv ack pkg from TCP port: %d fail:%s.\n", port, strerror(errno)); + printf("recv ack pkg from TCP port: %d fail:%s.\n", info->port, strerror(errno)); close(clientSocket); return -1; } @@ -132,7 +163,7 @@ int checkTcpPort(info_s *info) { } if (iDataNum < info->pktLen) { - printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, port); + printf("recv ack pkg len: %d, less than req pkg len: %d from tcp port: %d\n", iDataNum, info->pktLen, info->port); return -1; } //printf("Read ack pkg len:%d from tcp port: %d, buffer: %s %s\n", info->pktLen, port, recvbuf, recvbuf+iDataNum-8); @@ -142,8 +173,6 @@ int checkTcpPort(info_s *info) { } int checkUdpPort(info_s *info) { - int port = info->port; - char *host = info->host; int clientSocket; struct sockaddr_in serverAddr; @@ -154,15 +183,28 @@ int checkUdpPort(info_s *info) { perror("socket"); return -1; } + + // set overtime + struct timeval timeout; + timeout.tv_sec = 2; //s + timeout.tv_usec = 0; //us + if (setsockopt(clientSocket, SOL_SOCKET,SO_SNDTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + perror("setsockopt send timer failed:"); + } + if (setsockopt(clientSocket, SOL_SOCKET,SO_RCVTIMEO, (char *)&timeout, sizeof(struct timeval)) == -1) { + perror("setsockopt recv timer failed:"); + } serverAddr.sin_family = AF_INET; - serverAddr.sin_port = htons(port); - serverAddr.sin_addr.s_addr = inet_addr(host); + serverAddr.sin_port = htons(info->port); + serverAddr.sin_addr.s_addr = info->hostIp; memset(sendbuf, 0, BUFFER_SIZE); memset(recvbuf, 0, BUFFER_SIZE); - sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", host, port); + struct in_addr ipStr; + memcpy(&ipStr, &info->hostIp, 4); + sprintf(sendbuf, "client send udp pkg to %s:%d, content: 1122334455", inet_ntoa(ipStr), info->port); sprintf(sendbuf + info->pktLen - 16, "1122334455667788"); socklen_t sin_size = sizeof(*(struct sockaddr *)&serverAddr); @@ -176,7 +218,7 @@ int checkUdpPort(info_s *info) { iDataNum = recvfrom(clientSocket, recvbuf, BUFFER_SIZE, 0, (struct sockaddr *)&serverAddr, &sin_size); if (iDataNum < info->pktLen) { - printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\n", iDataNum, info->pktLen, port); + printf("Read ack pkg len: %d, less than req pkg len: %d from udp port: %d\t\t", iDataNum, info->pktLen, info->port); return -1; } @@ -185,43 +227,87 @@ int checkUdpPort(info_s *info) { return 0; } -int main(int argc, char *argv[]) { - SArguments arguments = {"127.0.0.1", 6030, 6060, 1000}; - info_s info; - int ret; - - argp_parse(&argp, argc, argv, 0, 0, &arguments); - if (arguments.pktLen > MAX_PKG_LEN) { - printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN); - exit(0); - } +int32_t getIpFromFqdn(const char *fqdn, uint32_t* ip) { + struct addrinfo hints = {0}; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; - printf("host: %s\tport: %d\tmax_port: %d\tpkgLen: %d\n", arguments.host, arguments.port, arguments.max_port, arguments.pktLen); + struct addrinfo *result = NULL; - int port = arguments.port; + int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result); + if (result) { + struct sockaddr *sa = result->ai_addr; + struct sockaddr_in *si = (struct sockaddr_in*)sa; + struct in_addr ia = si->sin_addr; + *ip = ia.s_addr; + freeaddrinfo(result); + return 0; + } else { + printf("Failed get the ip address from fqdn:%s, code:%d, reason:%s", fqdn, ret, gai_strerror(ret)); + return -1; + } +} - info.host = arguments.host; - info.pktLen = arguments.pktLen; +void checkPort(uint32_t hostIp, uint16_t startPort, uint16_t maxPort, uint16_t pktLen) { + int ret; + info_s info; + memset(&info, 0, sizeof(info_s)); + info.hostIp = hostIp; + info.pktLen = pktLen; - for (; port <= arguments.max_port; port++) { + for (uint16_t port = startPort; port <= maxPort; port++) { //printf("test: %s:%d\n", info.host, port); printf("\n"); info.port = port; ret = checkTcpPort(&info); if (ret != 0) { - printf("tcp port:%d test fail.\t\t", port); + printf("tcp port:%d test fail.\t\n", port); } else { printf("tcp port:%d test ok.\t\t", port); } ret = checkUdpPort(&info); if (ret != 0) { - printf("udp port:%d test fail.\t\t", port); + printf("udp port:%d test fail.\t\n", port); } else { printf("udp port:%d test ok.\t\t", port); } } + printf("\n"); + return ; +} + +int main(int argc, char *argv[]) { + SArguments arguments = {"127.0.0.1", "", 6030, 6042, 1000}; + int ret; + + argp_parse(&argp, argc, argv, 0, 0, &arguments); + if (arguments.pktLen > MAX_PKG_LEN) { + printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN); + exit(0); + } + + printf("host ip: %s\thost fqdn: %s\tport: %d\tmax_port: %d\tpkgLen: %d\n", arguments.host, arguments.fqdn, arguments.port, arguments.max_port, arguments.pktLen); + + if (arguments.host[0] != 0) { + printf("\nstart connect to %s test:\n", arguments.host); + checkPort(inet_addr(arguments.host), arguments.port, arguments.max_port, arguments.pktLen); + printf("\n"); + } + + if (arguments.fqdn[0] != 0) { + uint32_t hostIp = 0; + ret = getIpFromFqdn(arguments.fqdn, &hostIp); + if (ret) { + printf("\n"); + return 0; + } + printf("\nstart connetc to %s test:\n", arguments.fqdn); + checkPort(hostIp, arguments.port, arguments.max_port, arguments.pktLen); + printf("\n"); + } + return 0; } diff --git a/src/kit/taosnetwork/server.c b/src/kit/taosnetwork/server.c index 1c3bc6fa098d793c2c7d1c355c524c755301b57b..97be1d3b63369b26dedb0707d3a096b044f982ef 100644 --- a/src/kit/taosnetwork/server.c +++ b/src/kit/taosnetwork/server.c @@ -142,9 +142,9 @@ static void *bindTcpPort(void *sarg) { printf("recv Client: %s pkg from TCP port: %d, pkg len: %d\n", inet_ntoa(clientAddr.sin_addr), port, iDataNum); if (iDataNum > 0) { send(client, buffer, iDataNum, 0); - break; } } + close(serverSocket); return NULL; } @@ -201,7 +201,7 @@ static void *bindUdpPort(void *sarg) { int main(int argc, char *argv[]) { - SArguments arguments = {"127.0.0.1", 6030, 6060, 1000}; + SArguments arguments = {"127.0.0.1", 6030, 6042, 1000}; argp_parse(&argp, argc, argv, 0, 0, &arguments); if (arguments.pktLen > MAX_PKG_LEN) { printf("test pkg len overflow: %d, max len not greater than %d bytes\n", arguments.pktLen, MAX_PKG_LEN); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 0367ce21fd201fa3c8b570e6d47a9802b268e837..cdea9eda60a4537137058b307163199bff213051 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -354,7 +354,7 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, SMnodeMsg mDebug("db:%s, already exist, ignore exist is set", pCreate->db); return TSDB_CODE_SUCCESS; } else { - mError("db:%s, is already exist, ignore exist not set", pCreate->db); + mError("db:%s, already exist, ignore exist not set", pCreate->db); return TSDB_CODE_MND_DB_ALREADY_EXIST; } } diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 5da57a30934d75335ca28c0d7e6dcfe3e43b47b2..ccb85c330135c07f6529628cc1a8e309750434a7 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -518,7 +518,7 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) { SDnodeObj *pDnode = mnodeGetDnodeByEp(ep); if (pDnode != NULL) { mnodeDecDnodeRef(pDnode); - mError("dnode:%d is already exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort); + mError("dnode:%d, already exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort); return TSDB_CODE_MND_DNODE_ALREADY_EXIST; } diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 85457d7a26b84adfd383d447c84604c15a436cee..06f992c26a4b2fd6eec19abb9743e4add2536000 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -100,7 +100,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { }; tstrncpy(connObj.user, user, sizeof(connObj.user)); - SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME); + SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000); mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); return pConn; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 3651aa8aade8d2f8b5393532d395c8f4809a715b..9efb804734558278b9d6e9efeb29c4d03b65e21d 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -224,7 +224,7 @@ void sdbUpdateMnodeRoles() { mnodeUpdateMnodeEpSet(); } -static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { +static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { sdbUpdateMnodeRoles(); return 0; } diff --git a/src/os/inc/osSemphone.h b/src/os/inc/osSemphone.h index fd88d2d7981a4e0f9dc397e73bf242eb7e059cf7..4280b458a65b23b3aa680c6ed46f61beb5c48948 100644 --- a/src/os/inc/osSemphone.h +++ b/src/os/inc/osSemphone.h @@ -23,7 +23,7 @@ extern "C" { #ifndef TAOS_OS_FUNC_SEMPHONE #define tsem_t sem_t #define tsem_init sem_init - #define tsem_wait sem_wait + int tsem_wait(tsem_t* sem); #define tsem_post sem_post #define tsem_destroy sem_destroy #endif diff --git a/src/os/src/detail/osSemphone.c b/src/os/src/detail/osSemphone.c index 74f8859029f95c67a2f298a0dbfa983978b4b78f..b91888845edd9e98315994eaada9eca245aacb24 100644 --- a/src/os/src/detail/osSemphone.c +++ b/src/os/src/detail/osSemphone.c @@ -16,6 +16,18 @@ #define _DEFAULT_SOURCE #include "os.h" +#ifndef TAOS_OS_FUNC_SEMPHONE + +int tsem_wait(tsem_t* sem) { + int ret = 0; + do { + ret = sem_wait(sem); + } while (ret != 0 && errno == EINTR); + return ret; +} + +#endif + #ifndef TAOS_OS_FUNC_SEMPHONE_PTHREAD bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } diff --git a/src/os/src/windows/CMakeLists.txt b/src/os/src/windows/CMakeLists.txt index 588d3b7f68ea6f54051d4723a0730b9316f53157..9dcc9e7e6d93ff200b7571d98724f898712658eb 100644 --- a/src/os/src/windows/CMakeLists.txt +++ b/src/os/src/windows/CMakeLists.txt @@ -4,4 +4,4 @@ PROJECT(TDengine) AUX_SOURCE_DIRECTORY(. SRC) ADD_LIBRARY(os ${SRC}) -TARGET_LINK_LIBRARIES(os winmm IPHLPAPI ws2_32 MsvcLibXw64) +TARGET_LINK_LIBRARIES(os winmm IPHLPAPI ws2_32 MsvcLibXw) diff --git a/src/os/src/windows/w64Atomic.c b/src/os/src/windows/wAtomic.c similarity index 100% rename from src/os/src/windows/w64Atomic.c rename to src/os/src/windows/wAtomic.c diff --git a/src/os/src/windows/w64Dir.c b/src/os/src/windows/wDir.c similarity index 100% rename from src/os/src/windows/w64Dir.c rename to src/os/src/windows/wDir.c diff --git a/src/os/src/windows/w64Env.c b/src/os/src/windows/wEnv.c similarity index 100% rename from src/os/src/windows/w64Env.c rename to src/os/src/windows/wEnv.c diff --git a/src/os/src/windows/w64File.c b/src/os/src/windows/wFile.c similarity index 100% rename from src/os/src/windows/w64File.c rename to src/os/src/windows/wFile.c diff --git a/src/os/src/windows/w64Getline.c b/src/os/src/windows/wGetline.c similarity index 100% rename from src/os/src/windows/w64Getline.c rename to src/os/src/windows/wGetline.c diff --git a/src/os/src/windows/w64Godll.c b/src/os/src/windows/wGodll.c similarity index 100% rename from src/os/src/windows/w64Godll.c rename to src/os/src/windows/wGodll.c diff --git a/src/os/src/windows/w64Lz4.c b/src/os/src/windows/wLz4.c similarity index 100% rename from src/os/src/windows/w64Lz4.c rename to src/os/src/windows/wLz4.c diff --git a/src/os/src/windows/w64Semphone.c b/src/os/src/windows/wSemphone.c similarity index 100% rename from src/os/src/windows/w64Semphone.c rename to src/os/src/windows/wSemphone.c diff --git a/src/os/src/windows/w64Socket.c b/src/os/src/windows/wSocket.c similarity index 100% rename from src/os/src/windows/w64Socket.c rename to src/os/src/windows/wSocket.c diff --git a/src/os/src/windows/w64String.c b/src/os/src/windows/wString.c similarity index 100% rename from src/os/src/windows/w64String.c rename to src/os/src/windows/wString.c diff --git a/src/os/src/windows/w64Strptime.c b/src/os/src/windows/wStrptime.c similarity index 100% rename from src/os/src/windows/w64Strptime.c rename to src/os/src/windows/wStrptime.c diff --git a/src/os/src/windows/w64Sysinfo.c b/src/os/src/windows/wSysinfo.c similarity index 100% rename from src/os/src/windows/w64Sysinfo.c rename to src/os/src/windows/wSysinfo.c diff --git a/src/os/src/windows/w64Time.c b/src/os/src/windows/wTime.c similarity index 100% rename from src/os/src/windows/w64Time.c rename to src/os/src/windows/wTime.c diff --git a/src/os/src/windows/w64Timer.c b/src/os/src/windows/wTimer.c similarity index 100% rename from src/os/src/windows/w64Timer.c rename to src/os/src/windows/wTimer.c diff --git a/src/os/src/windows/w64Wordexp.c b/src/os/src/windows/wWordexp.c similarity index 100% rename from src/os/src/windows/w64Wordexp.c rename to src/os/src/windows/wWordexp.c diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 7913b1c5b0781b9776dc9342757f2110027e07b1..3aecd127ef8e7503c6db4670d349a4dd8dddeb4b 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -563,7 +563,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, uint32_t peerIp = taosGetIpFromFqdn(peerFqdn); if (peerIp == 0xFFFFFFFF) { tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); - terrno = TSDB_CODE_RPC_APP_ERROR; + terrno = TSDB_CODE_RPC_FQDN_ERROR; return NULL; } diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index be1e01cb2371b82acf4cf63e896a6fed68399222..cd1252f4b414f5ad2dcd56e2a7969ab433e40452 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -74,7 +74,7 @@ typedef struct { uint32_t magic; uint32_t index; uint64_t fversion; - int32_t size; + int64_t size; } SFileInfo; typedef struct { diff --git a/src/sync/inc/taosTcpPool.h b/src/sync/inc/taosTcpPool.h index 1e410acc26c3a5c82088bfd25d6a0d840835ba2f..5f7ca9ede587a6d609c8c449e66c462f0d9f6dad 100644 --- a/src/sync/inc/taosTcpPool.h +++ b/src/sync/inc/taosTcpPool.h @@ -26,7 +26,7 @@ typedef void* tthread_h; typedef struct { int numOfThreads; uint32_t serverIp; - short port; + int16_t port; int bufferSize; void (*processBrokenLink)(void *ahandle); int (*processIncomingMsg)(void *ahandle, void *buffer); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index eaa073348a90ad4e3bb9469dc0705475aa981ae8..b92ef4cf260f831c3ddb3c1ebb34b7d4735f3056 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -108,8 +108,7 @@ static void syncModuleInitFunc() { tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); } -void *syncStart(const SSyncInfo *pInfo) -{ +void *syncStart(const SSyncInfo *pInfo) { const SSyncCfg *pCfg = &pInfo->syncCfg; SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1); @@ -189,9 +188,8 @@ void *syncStart(const SSyncInfo *pInfo) return pNode; } -void syncStop(void *param) -{ - SSyncNode *pNode = param; +void syncStop(void *param) { + SSyncNode * pNode = param; SSyncPeer *pPeer; if (pNode == NULL) return; @@ -215,9 +213,8 @@ void syncStop(void *param) syncDecNodeRef(pNode); } -int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) -{ - SSyncNode *pNode = param; +int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { + SSyncNode * pNode = param; int i, j; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; @@ -283,10 +280,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) return 0; } -int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) -{ - SSyncNode *pNode = param; - SSyncPeer *pPeer; +int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { + SSyncNode * pNode = param; + SSyncPeer * pPeer; SSyncHead *pSyncHead; SWalHead *pWalHead = data; int fwdLen; @@ -334,9 +330,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) return code; } -void syncConfirmForward(void *param, uint64_t version, int32_t code) -{ - SSyncNode *pNode = param; +void syncConfirmForward(void *param, uint64_t version, int32_t code) { + SSyncNode *pNode = param; if (pNode == NULL) return; if (pNode->quorum <= 1) return; @@ -387,10 +382,9 @@ void syncRecover(void *param) { pthread_mutex_unlock(&(pNode->mutex)); } -int syncGetNodesRole(void *param, SNodesRole *pNodesRole) -{ +int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { SSyncNode *pNode = param; - + pNodesRole->selfIndex = pNode->selfIndex; for (int i=0; ireplica; ++i) { pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; @@ -400,8 +394,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) return 0; } -static void syncAddArbitrator(SSyncNode *pNode) -{ +static void syncAddArbitrator(SSyncNode *pNode) { SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; // if not configured, return right away @@ -413,9 +406,11 @@ static void syncAddArbitrator(SSyncNode *pNode) SNodeInfo nodeInfo; nodeInfo.nodeId = 0; - taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort); - nodeInfo.nodePort += TSDB_PORT_SYNC; - + int ret = taosGetFqdnPortFromEp(tsArbitrator, nodeInfo.nodeFqdn, &nodeInfo.nodePort); + if (-1 == ret) { + nodeInfo.nodePort = tsArbitratorPort; + } + if (pPeer) { if ((strcmp(nodeInfo.nodeFqdn, pPeer->fqdn) == 0) && (nodeInfo.nodePort == pPeer->port)) { return; @@ -454,13 +449,11 @@ static void syncDecNodeRef(SSyncNode *pNode) } } -void syncAddPeerRef(SSyncPeer *pPeer) -{ +void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } -int syncDecPeerRef(SSyncPeer *pPeer) -{ +int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { syncDecNodeRef(pPeer->pSyncNode); @@ -473,18 +466,16 @@ int syncDecPeerRef(SSyncPeer *pPeer) return 1; } -static void syncClosePeerConn(SSyncPeer *pPeer) -{ +static void syncClosePeerConn(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); taosClose(pPeer->syncFd); - if (pPeer->peerFd >=0) { + if (pPeer->peerFd >= 0) { pPeer->peerFd = -1; taosFreeTcpConn(pPeer->pConn); } } -static void syncRemovePeer(SSyncPeer *pPeer) -{ +static void syncRemovePeer(SSyncPeer *pPeer) { sInfo("%s, it is removed", pPeer->id); pPeer->ip = 0; @@ -492,8 +483,7 @@ static void syncRemovePeer(SSyncPeer *pPeer) syncDecPeerRef(pPeer); } -static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) -{ +static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); if (ip == -1) return NULL; @@ -523,25 +513,24 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) return pPeer; } -void syncBroadcastStatus(SSyncNode *pNode) -{ +void syncBroadcastStatus(SSyncNode *pNode) { SSyncPeer *pPeer; for (int i = 0; i < pNode->replica; ++i) { - if ( i == pNode->selfIndex ) continue; + if (i == pNode->selfIndex) continue; pPeer = pNode->peerInfo[i]; syncSendPeersStatusMsgToPeer(pPeer, 1); } -} +} static void syncResetFlowCtrl(SSyncNode *pNode) { - for (int i = 0; i < pNode->replica; ++i) { pNode->peerInfo[i]->numOfRetrieves = 0; } - if (pNode->notifyFlowCtrl) - (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + if (pNode->notifyFlowCtrl) { + (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + } } static void syncChooseMaster(SSyncNode *pNode) { @@ -598,9 +587,9 @@ static void syncChooseMaster(SSyncNode *pNode) { } else { sDebug("vgId:%d, failed to choose master", pNode->vgId); } -} - -static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { +} + +static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { int onlineNum = 0; int index = -1; int replica = pNode->replica; @@ -617,7 +606,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { replica = pNode->replica + 1; } - if (onlineNum <= replica*0.5) { + if (onlineNum <= replica * 0.5) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { nodeRole = TAOS_SYNC_ROLE_UNSYNCED; pNode->peerInfo[pNode->selfIndex]->role = nodeRole; @@ -625,13 +614,13 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); } } else { - for (int i=0; ireplica; ++i) { + for (int i = 0; i < pNode->replica; ++i) { SSyncPeer *pTemp = pNode->peerInfo[i]; - if ( pTemp->role != TAOS_SYNC_ROLE_MASTER ) continue; - if ( index < 0 ) { + if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue; + if (index < 0) { index = i; - } else { // multiple masters, it shall not happen - if ( i == pNode->selfIndex ) { + } else { // multiple masters, it shall not happen + if (i == pNode->selfIndex) { sError("%s, peer is master, work as slave instead", pTemp->id); nodeRole = TAOS_SYNC_ROLE_SLAVE; (*pNode->notifyRole)(pNode->ahandle, nodeRole); @@ -640,7 +629,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { } } - SSyncPeer *pMaster = (index>=0) ? pNode->peerInfo[index]:NULL; + SSyncPeer *pMaster = (index >= 0) ? pNode->peerInfo[index] : NULL; return pMaster; } @@ -649,7 +638,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) { int code = 0; if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { - sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); + sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); nodeRole = TAOS_SYNC_ROLE_UNSYNCED; (*pNode->notifyRole)(pNode->ahandle, nodeRole); code = -1; @@ -658,13 +647,12 @@ static int syncValidateMaster(SSyncPeer *pPeer) { if ( i == pNode->selfIndex ) continue; syncRestartPeer(pNode->peerInfo[i]); } - } + } return code; } -static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) -{ +static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) { SSyncNode *pNode = pPeer->pSyncNode; int8_t peerOldRole = pPeer->role; int8_t selfOldRole = nodeRole; @@ -686,14 +674,14 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne if (syncValidateMaster(pPeer) < 0) return; if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) { - if ( nodeVersion < pMaster->version) { + if (nodeVersion < pMaster->version) { syncRequired = 1; } else { sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version); nodeRole = TAOS_SYNC_ROLE_SLAVE; (*pNode->notifyRole)(pNode->ahandle, nodeRole); } - } else if ( nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { + } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { // nodeVersion = pMaster->version; } } else { @@ -734,20 +722,18 @@ static void syncRestartPeer(SSyncPeer *pPeer) { pPeer->sstatus = TAOS_SYNC_STATUS_INIT; int ret = strcmp(pPeer->fqdn, tsNodeFqdn); - if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort) ) - taosTmrReset(syncCheckPeerConnection, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); + if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) + taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); } -void syncRestartConnection(SSyncPeer *pPeer) -{ +void syncRestartConnection(SSyncPeer *pPeer) { if (pPeer->ip == 0) return; syncRestartPeer(pPeer); syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE); } -static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) -{ +static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; sDebug("%s, sync-req is received", pPeer->id); @@ -782,8 +768,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) } } -static void syncNotStarted(void *param, void *tmrId) -{ +static void syncNotStarted(void *param, void *tmrId) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; @@ -803,14 +788,13 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { pthread_mutex_unlock(&(pNode->mutex)); } -static void syncRecoverFromMaster(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncRecoverFromMaster(SSyncPeer *pPeer) { + SSyncNode *pNode = pPeer->pSyncNode; - if ( nodeSStatus != TAOS_SYNC_STATUS_INIT) { + if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus); return; - } + } taosTmrStopA(&pPeer->timer); if (tsSyncNum >= tsMaxSyncNum) { @@ -840,9 +824,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) return; } -static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SFwdRsp *pFwdRsp = (SFwdRsp *) cont; SSyncFwds *pSyncFwds = pNode->pSyncFwds; SFwdInfo *pFwdInfo; @@ -862,10 +845,8 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) } } - -static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SWalHead *pHead = (SWalHead *)cont; sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); @@ -884,9 +865,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) return; } -static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SPeersStatus *pPeersStatus = (SPeersStatus *)cont; sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, @@ -909,10 +889,10 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { } // head.len = htonl(head.len); - if (pHead->len <0) { + if (pHead->len < 0) { sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len); return -1; - } + } int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); if (bytes != pHead->len) { @@ -923,9 +903,8 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { return 0; } -static int syncProcessPeerMsg(void *param, void *buffer) -{ - SSyncPeer *pPeer = param; +static int syncProcessPeerMsg(void *param, void *buffer) { + SSyncPeer * pPeer = param; SSyncHead head; char *cont = (char *)buffer; @@ -953,8 +932,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) #define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA -static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) -{ +static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { SSyncNode *pNode = pPeer->pSyncNode; char msg[statusMsgLen] = {0}; @@ -1011,7 +989,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { firstPkt.port = tsSyncPort; firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId - if ( write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { + if (write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { sDebug("%s, connection to peer server is setup", pPeer->id); pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; @@ -1024,8 +1002,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { } } -static void syncCheckPeerConnection(void *param, void *tmrId) -{ +static void syncCheckPeerConnection(void *param, void *tmrId) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; @@ -1037,8 +1014,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) pthread_mutex_unlock(&(pNode->mutex)); } -static void syncCreateRestoreDataThread(SSyncPeer *pPeer) -{ +static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); pthread_attr_t thattr; @@ -1059,8 +1035,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) } } -static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) -{ +static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { char ipstr[24]; int i; @@ -1137,8 +1112,7 @@ static void syncProcessBrokenLink(void *param) { syncDecNodeRef(pNode); } -static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) -{ +static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; uint64_t time = taosGetTimestampMs(); @@ -1160,8 +1134,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); } -static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) -{ +static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; int fwds = pSyncFwds->fwds; @@ -1178,8 +1151,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) } } -static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) -{ +static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) { int confirm = 0; if (pFwdInfo->code == 0) pFwdInfo->code = code; @@ -1200,8 +1172,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code } } -static void syncMonitorFwdInfos(void *param, void *tmrId) -{ +static void syncMonitorFwdInfos(void *param, void *tmrId) { SSyncNode *pNode = param; SSyncFwds *pSyncFwds = pNode->pSyncFwds; uint64_t time = taosGetTimestampMs(); @@ -1220,6 +1191,3 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); } - - - diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 505882b20943e47322a81da6491e59dd550d6577..2a0bee3726d939cc9da436af36ce02d45dad34d5 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -28,7 +28,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind char fname[TSDB_FILENAME_LEN*3] = {0}; uint32_t magic; uint64_t fversion; - int32_t size; + int64_t size; uint32_t index = sindex; SSyncNode *pNode = pPeer->pSyncNode; @@ -48,8 +48,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind } } -static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) -{ +static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { SSyncNode *pNode = pPeer->pSyncNode; SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info @@ -113,7 +112,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) close(dfd); if (ret<0) break; - sDebug("%s, %s is received, size:%d", pPeer->id, minfo.name, minfo.size); + sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size); } @@ -130,8 +129,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) return code; } -static int syncRestoreWal(SSyncPeer *pPeer) -{ +static int syncRestoreWal(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; int ret, code = -1; @@ -172,8 +170,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) return offset; } -static int syncProcessBufferedFwd(SSyncPeer *pPeer) -{ +static int syncProcessBufferedFwd(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; int forwards = 0; @@ -201,8 +198,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) return pRecv->code; } -int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) -{ +int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { SSyncNode *pNode = pPeer->pSyncNode; SRecvBuffer *pRecv = pNode->pRecv; @@ -222,8 +218,7 @@ int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) return pRecv->code; } -static void syncCloseRecvBuffer(SSyncNode *pNode) -{ +static void syncCloseRecvBuffer(SSyncNode *pNode) { if (pNode->pRecv) { taosTFree(pNode->pRecv->buffer); } @@ -231,8 +226,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode) taosTFree(pNode->pRecv); } -static int syncOpenRecvBuffer(SSyncNode *pNode) -{ +static int syncOpenRecvBuffer(SSyncNode *pNode) { syncCloseRecvBuffer(pNode); SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1); @@ -253,8 +247,7 @@ static int syncOpenRecvBuffer(SSyncNode *pNode) return 0; } -static int syncRestoreDataStepByStep(SSyncPeer *pPeer) -{ +static int syncRestoreDataStepByStep(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; nodeSStatus = TAOS_SYNC_STATUS_FILE; uint64_t fversion = 0; @@ -292,10 +285,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer) return 0; } -void *syncRestoreData(void *param) -{ - SSyncPeer *pPeer = (SSyncPeer *)param; - SSyncNode *pNode = pPeer->pSyncNode; +void *syncRestoreData(void *param) { + SSyncPeer *pPeer = (SSyncPeer *)param; + SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); __sync_fetch_and_add(&tsSyncNum, 1); @@ -326,4 +318,3 @@ void *syncRestoreData(void *param) return NULL; } - diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 1dd1cda343e3d507e1f251eedb427791bd812e63..8aa317b1ac7c770b6b102545716656541b8bb67e 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -27,11 +27,10 @@ #include "tsync.h" #include "syncInt.h" -static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) -{ +static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { sDebug("%s, start to monitor:%s", pPeer->id, name); - if (pPeer->notifyFd <=0) { + if (pPeer->notifyFd <= 0) { pPeer->watchNum = 0; pPeer->notifyFd = inotify_init1(IN_NONBLOCK); if (pPeer->notifyFd < 0) { @@ -70,9 +69,8 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) return 0; } -static int syncAreFilesModified(SSyncPeer *pPeer) -{ - if (pPeer->notifyFd <=0) return 0; +static int syncAreFilesModified(SSyncPeer *pPeer) { + if (pPeer->notifyFd <= 0) return 0; char buf[2048]; int len = read(pPeer->notifyFd, buf, sizeof(buf)); @@ -96,12 +94,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) } } - return code; + return code; } -static int syncRetrieveFile(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static int syncRetrieveFile(SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; SFileInfo fileInfo; SFileAck fileAck; int code = -1; @@ -128,7 +125,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) // wait for the ack from peer ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); - if (ret <0) break; + if (ret < 0) break; // set the peer sync version pPeer->sversion = fileInfo.fversion; @@ -148,13 +145,13 @@ static int syncRetrieveFile(SSyncPeer *pPeer) // send the file to peer int sfd = open(name, O_RDONLY); - if ( sfd < 0 ) break; + if (sfd < 0) break; ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); - close(sfd); - if (ret <0) break; + close(sfd); + if (ret < 0) break; - sDebug("%s, %s is sent, size:%d", pPeer->id, name, fileInfo.size); + sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size); fileInfo.index++; // check if processed files are modified @@ -170,8 +167,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) /* if only a partial record is read out, set the IN_MODIFY flag in event, so upper layer will reload the file to get a complete record */ -static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) -{ +static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) { int ret; ret = read(sfd, pHead, sizeof(SWalHead)); @@ -185,7 +181,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) } ret = read(sfd, pHead->cont, pHead->len); - if (ret <0) return -1; + if (ret < 0) return -1; if (ret != pHead->len) { // file is not at end yet, it shall be reloaded @@ -194,10 +190,9 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) } return sizeof(SWalHead) + pHead->len; -} +} -static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) -{ +static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) { pPeer->watchNum = 0; taosClose(pPeer->notifyFd); pPeer->notifyFd = inotify_init1(IN_NONBLOCK); @@ -221,18 +216,17 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) return -1; } - return 0; + return 0; } -static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) -{ +static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { char buf[2048]; int len = read(pPeer->notifyFd, buf, sizeof(buf)); - if (len <0 && errno != EAGAIN) { - sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); + if (len < 0 && errno != EAGAIN) { + sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); return -1; } - + if (len == 0) return 0; struct inotify_event *event; @@ -248,8 +242,7 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) return 0; } -static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) -{ +static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { SWalHead *pHead = (SWalHead *) malloc(640000); int code = -1; int32_t bytes = 0; @@ -261,9 +254,12 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion); while (1) { - int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); - if (wsize <0) break; - if (wsize == 0) { code = 0; break; } + int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); + if (wsize < 0) break; + if (wsize == 0) { + code = 0; + break; + } sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); @@ -286,8 +282,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, return -1; } -static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) -{ +static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) { SSyncNode *pNode = pPeer->pSyncNode; int code = -1; char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file @@ -350,12 +345,16 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) } if (code < 0) break; - if (pPeer->sversion >= fversion && fversion > 0) break; + if (pPeer->sversion >= fversion && fversion > 0) break; - index++; wname[0] = 0; + index++; + wname[0] = 0; code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); - if ( code < 0) break; - if ( wname[0] == 0 ) {code = 0; break;} + if (code < 0) break; + if (wname[0] == 0) { + code = 0; + break; + } // current last wal is closed, there is a new one sDebug("%s, last wal is closed, try new one", pPeer->id); @@ -366,9 +365,8 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) return code; } -static int syncRetrieveWal(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static int syncRetrieveWal(SSyncPeer *pPeer) { + SSyncNode * pNode = pPeer->pSyncNode; char fname[TSDB_FILENAME_LEN * 3]; char wname[TSDB_FILENAME_LEN * 2]; int32_t size; @@ -396,7 +394,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) // send wal file, // inotify is not required, old wal file won't be modified, even remove is ok - if ( stat(fname, &fstat) < 0 ) break; + if (stat(fname, &fstat) < 0) break; size = fstat.st_size; sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); @@ -425,9 +423,8 @@ static int syncRetrieveWal(SSyncPeer *pPeer) return code; } -static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) -{ - SSyncNode *pNode = pPeer->pSyncNode; +static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) { + SSyncNode *pNode = pPeer->pSyncNode; SFirstPkt firstPkt; memset(&firstPkt, 0, sizeof(firstPkt)); @@ -462,9 +459,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) return 0; } -void *syncRetrieveData(void *param) -{ - SSyncPeer *pPeer = (SSyncPeer *)param; +void *syncRetrieveData(void *param) { + SSyncPeer * pPeer = (SSyncPeer *)param; SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index b523728bf9357e17602c75e3822284c2871e8511..2f064ceb3690e4e8111d67fd2fce224add74717c 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -48,8 +48,7 @@ static void *taosProcessTcpData(void *param); static SThreadObj *taosGetTcpThread(SPoolObj *pPool); static void taosStopPoolThread(SThreadObj* pThread); -void *taosOpenTcpThreadPool(SPoolInfo *pInfo) -{ +void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { pthread_attr_t thattr; SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); @@ -89,8 +88,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) return pPool; } -void taosCloseTcpThreadPool(void *param) -{ +void taosCloseTcpThreadPool(void *param) { SPoolObj *pPool = (SPoolObj *)param; SThreadObj *pThread; @@ -107,8 +105,7 @@ void taosCloseTcpThreadPool(void *param) uDebug("%p TCP pool is closed", pPool); } -void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) -{ +void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) { struct epoll_event event; SPoolObj *pPool = (SPoolObj *)param; @@ -145,9 +142,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) return pConn; } -void taosFreeTcpConn(void *param) -{ - SConnObj *pConn = (SConnObj *)param; +void taosFreeTcpConn(void *param) { + SConnObj * pConn = (SConnObj *)param; SThreadObj *pThread = pConn->pThread; uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/tarbitrator.c index 625c0d68386535297ffb1e92e112e1a18d58a48b..b704b1ecaec45577c52665a6c857bfbe49b05e72 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/tarbitrator.c @@ -40,13 +40,9 @@ typedef struct { void *pConn; } SNodeConn; -uint16_t tsArbitratorPort = 0; - int main(int argc, char *argv[]) { char arbLogPath[TSDB_FILENAME_LEN + 16] = {0}; - tsArbitratorPort = tsServerPort + TSDB_PORT_ARBITRATOR; - for (int i=1; itsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h index 3b4e8e375717f71d7305aba765ed7bc3685a46d1..b2b0ff05f58478e3778d3abab72ae3511f683aca 100644 --- a/src/util/inc/tkvstore.h +++ b/src/util/inc/tkvstore.h @@ -58,7 +58,7 @@ int tdKVStoreStartCommit(SKVStore *pStore); int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen); int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid); int tdKVStoreEndCommit(SKVStore *pStore); -void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size); +void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size); #ifdef __cplusplus } diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index 95645882547110cb36fdc16eafe8cc7d1fe09bcf..ba711ced8ff074e639a046c5dd9908865a3e9468 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -220,8 +220,14 @@ static int32_t compareStrPatternComp(const void* pLeft, const void* pRight) { char pattern[128] = {0}; memcpy(pattern, varDataVal(pRight), varDataLen(pRight)); assert(varDataLen(pRight) < 128); - - int32_t ret = patternMatch(pattern, varDataVal(pLeft), varDataLen(pLeft), &pInfo); + + size_t sz = varDataLen(pLeft); + char *buf = malloc(sz + 1); + memcpy(buf, varDataVal(pLeft), sz); + buf[sz] = 0; + + int32_t ret = patternMatch(pattern, buf, sz, &pInfo); + free(buf); return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; } diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index 696819e5d0c9c806512ba14040c5a2a997bbe390..6ba1d87d92ecf216bfde346f9d3ff1563515d34d 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -332,7 +332,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) { return 0; } -void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) { +void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size) { char buf[TD_KVSTORE_HEADER_SIZE] = "\0"; SStoreInfo info = {0}; @@ -349,7 +349,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) { close(fd); *magic = info.magic; - *size = (int32_t)offset; + *size = offset; return; diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c index cf7f5c10d4c369211baddcf43e1ec0d10fea82b1..f014dd0fab5494bd85f197e2c79fac53359e8edf 100644 --- a/src/util/src/tsched.c +++ b/src/util/src/tsched.c @@ -123,11 +123,6 @@ void *taosProcessSchedQueue(void *param) { while (1) { if (tsem_wait(&pSched->fullSem) != 0) { - if (errno == EINTR) { - /* sem_wait is interrupted by interrupt, ignore and continue */ - uDebug("wait %s fullSem was interrupted", pSched->label); - continue; - } uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); } if (pSched->stop) { @@ -163,12 +158,8 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { return 0; } - while (tsem_wait(&pSched->emptySem) != 0) { - if (errno != EINTR) { - uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); - break; - } - uDebug("wait %s emptySem was interrupted", pSched->label); + if (tsem_wait(&pSched->emptySem) != 0) { + uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); } if (pthread_mutex_lock(&pSched->queueMutex) != 0) diff --git a/src/util/src/version.c b/src/util/src/version.c deleted file mode 100644 index 6f7b29ebb9596435e9b7e34da3ec5d97740ac38e..0000000000000000000000000000000000000000 --- a/src/util/src/version.c +++ /dev/null @@ -1,7 +0,0 @@ -char version[12] = "2.0.2.0"; -char compatible_version[12] = "2.0.0.0"; -char gitinfo[48] = "d711657139620f6c50f362597020705b8ad26bd2"; -char gitinfoOfInternal[48] = "1d74ae24c541ffbb280e8630883c0236cd45f8c7"; -char buildinfo[64] = "Built by root at 2020-08-24 16:31"; - -void libtaos_2_0_2_0_Linux_x64_beta() {}; diff --git a/src/util/src/version.c.in b/src/util/src/version.c.in new file mode 100644 index 0000000000000000000000000000000000000000..c7aea2afb1fb40ea3ce1902cd47b8d2b76fee650 --- /dev/null +++ b/src/util/src/version.c.in @@ -0,0 +1,7 @@ +char version[12] = "${TD_VER_1}.${TD_VER_2}.${TD_VER_3}.${TD_VER_4}"; +char compatible_version[12] = "${TD_VER_COMPATIBLE}"; +char gitinfo[48] = "${TD_VER_GIT}"; +char gitinfoOfInternal[48] = "${TD_VER_GIT_INTERNAL}"; +char buildinfo[64] = "Built at ${TD_VER_DATE}"; + +void libtaos_${TD_VER_1}_${TD_VER_2}_${TD_VER_3}_${TD_VER_4}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {}; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 3c9da7fd233d744ded167bb716b68b90dcf01c78..176a45116837eeb9c62be490de1d871659f741ce 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -41,7 +41,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeCtrlFlow(void *handle, int32_t mseconds); @@ -290,6 +290,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->sync = syncStart(&syncInfo); if (pVnode->sync == NULL) { + vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, + tstrerror(terrno)); vnodeCleanUp(pVnode); return terrno; } @@ -536,7 +538,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { return 0; } -static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion) { +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { SVnodeObj *pVnode = ahandle; *fversion = pVnode->fversion; return tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size); @@ -549,7 +551,7 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { static void vnodeNotifyRole(void *ahandle, int8_t role) { SVnodeObj *pVnode = ahandle; - vInfo("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role); + vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]); pVnode->role = role; dnodeSendStatusMsgToMnode(); diff --git a/tests/examples/C#/TDengineDriver.cs b/tests/examples/C#/TDengineDriver.cs index 2797c362c156633148dbd8154052e04df4fde039..b6f143e1813d60c1ac4ae8356efdca4929c51345 100644 --- a/tests/examples/C#/TDengineDriver.cs +++ b/tests/examples/C#/TDengineDriver.cs @@ -20,16 +20,17 @@ using System.Runtime.InteropServices; namespace TDengineDriver { enum TDengineDataType { - TSDB_DATA_TYPE_BOOL = 1, - TSDB_DATA_TYPE_TINYINT = 2, - TSDB_DATA_TYPE_SMALLINT = 3, - TSDB_DATA_TYPE_INT = 4, - TSDB_DATA_TYPE_BIGINT = 5, - TSDB_DATA_TYPE_FLOAT = 6, - TSDB_DATA_TYPE_DOUBLE = 7, - TSDB_DATA_TYPE_BINARY = 8, - TSDB_DATA_TYPE_TIMESTAMP = 9, - TSDB_DATA_TYPE_NCHAR = 10 + TSDB_DATA_TYPE_NULL = 0, // 1 bytes + TSDB_DATA_TYPE_BOOL = 1, // 1 bytes + TSDB_DATA_TYPE_TINYINT = 2, // 1 bytes + TSDB_DATA_TYPE_SMALLINT = 3, // 2 bytes + TSDB_DATA_TYPE_INT = 4, // 4 bytes + TSDB_DATA_TYPE_BIGINT = 5, // 8 bytes + TSDB_DATA_TYPE_FLOAT = 6, // 4 bytes + TSDB_DATA_TYPE_DOUBLE = 7, // 8 bytes + TSDB_DATA_TYPE_BINARY = 8, // string + TSDB_DATA_TYPE_TIMESTAMP = 9,// 8 bytes + TSDB_DATA_TYPE_NCHAR = 10 // unicode string } enum TDengineInitOption @@ -79,54 +80,53 @@ namespace TDengineDriver class TDengine { public const int TSDB_CODE_SUCCESS = 0; - - [DllImport("taos.dll", EntryPoint = "taos_init", CallingConvention = CallingConvention.StdCall)] + + [DllImport("taos.dll", EntryPoint = "taos_init", CallingConvention = CallingConvention.Cdecl)] static extern public void Init(); - [DllImport("taos.dll", EntryPoint = "taos_options", CallingConvention = CallingConvention.StdCall)] + [DllImport("taos.dll", EntryPoint = "taos_cleanup", CallingConvention = CallingConvention.Cdecl)] + static extern public void Cleanup(); + + [DllImport("taos.dll", EntryPoint = "taos_options", CallingConvention = CallingConvention.Cdecl)] static extern public void Options(int option, string value); - [DllImport("taos.dll", EntryPoint = "taos_connect", CallingConvention = CallingConvention.StdCall)] - static extern public long Connect(string ip, string user, string password, string db, int port); + [DllImport("taos.dll", EntryPoint = "taos_connect", CallingConvention = CallingConvention.Cdecl)] + static extern public IntPtr Connect(string ip, string user, string password, string db, short port); - [DllImport("taos.dll", EntryPoint = "taos_errstr", CallingConvention = CallingConvention.StdCall)] - static extern private IntPtr taos_errstr(long taos); - static public string Error(long conn) + [DllImport("taos.dll", EntryPoint = "taos_errstr", CallingConvention = CallingConvention.Cdecl)] + static extern private IntPtr taos_errstr(IntPtr res); + static public string Error(IntPtr res) { - IntPtr errPtr = taos_errstr(conn); + IntPtr errPtr = taos_errstr(res); return Marshal.PtrToStringAnsi(errPtr); } - [DllImport("taos.dll", EntryPoint = "taos_errno", CallingConvention = CallingConvention.StdCall)] - static extern public int ErrorNo(long taos); - - [DllImport("taos.dll", EntryPoint = "taos_query", CallingConvention = CallingConvention.StdCall)] - static extern public int Query(long taos, string sqlstr); + [DllImport("taos.dll", EntryPoint = "taos_errno", CallingConvention = CallingConvention.Cdecl)] + static extern public int ErrorNo(IntPtr res); - [DllImport("taos.dll", EntryPoint = "taos_affected_rows", CallingConvention = CallingConvention.StdCall)] - static extern public int AffectRows(long taos); + [DllImport("taos.dll", EntryPoint = "taos_query", CallingConvention = CallingConvention.Cdecl)] + static extern public IntPtr Query(IntPtr conn, string sqlstr); - [DllImport("taos.dll", EntryPoint = "taos_use_result", CallingConvention = CallingConvention.StdCall)] - static extern public long UseResult(long taos); + [DllImport("taos.dll", EntryPoint = "taos_affected_rows", CallingConvention = CallingConvention.Cdecl)] + static extern public int AffectRows(IntPtr res); - [DllImport("taos.dll", EntryPoint = "taos_field_count", CallingConvention = CallingConvention.StdCall)] - static extern public int FieldCount(long taos); + [DllImport("taos.dll", EntryPoint = "taos_field_count", CallingConvention = CallingConvention.Cdecl)] + static extern public int FieldCount(IntPtr res); - [DllImport("taos.dll", EntryPoint = "taos_fetch_fields", CallingConvention = CallingConvention.StdCall)] - static extern private IntPtr taos_fetch_fields(long res); - static public List FetchFields(long taos) + [DllImport("taos.dll", EntryPoint = "taos_fetch_fields", CallingConvention = CallingConvention.Cdecl)] + static extern private IntPtr taos_fetch_fields(IntPtr res); + static public List FetchFields(IntPtr res) { const int fieldSize = 68; List metas = new List(); - long result = TDengine.UseResult(taos); - if (result == 0) + if (res == IntPtr.Zero) { return metas; } - int fieldCount = FieldCount(taos); - IntPtr fieldsPtr = taos_fetch_fields(result); + int fieldCount = FieldCount(res); + IntPtr fieldsPtr = taos_fetch_fields(res); for (int i = 0; i < fieldCount; ++i) { @@ -134,21 +134,21 @@ namespace TDengineDriver TDengineMeta meta = new TDengineMeta(); meta.name = Marshal.PtrToStringAnsi(fieldsPtr + offset); - meta.size = Marshal.ReadInt16(fieldsPtr + offset + 64); - meta.type = Marshal.ReadByte(fieldsPtr + offset + 66); + meta.type = Marshal.ReadByte(fieldsPtr + offset + 65); + meta.size = Marshal.ReadInt16(fieldsPtr + offset + 66); metas.Add(meta); } return metas; } - [DllImport("taos.dll", EntryPoint = "taos_fetch_row", CallingConvention = CallingConvention.StdCall)] - static extern public IntPtr FetchRows(long res); + [DllImport("taos.dll", EntryPoint = "taos_fetch_row", CallingConvention = CallingConvention.Cdecl)] + static extern public IntPtr FetchRows(IntPtr res); - [DllImport("taos.dll", EntryPoint = "taos_free_result", CallingConvention = CallingConvention.StdCall)] - static extern public IntPtr FreeResult(long res); + [DllImport("taos.dll", EntryPoint = "taos_free_result", CallingConvention = CallingConvention.Cdecl)] + static extern public IntPtr FreeResult(IntPtr res); - [DllImport("taos.dll", EntryPoint = "taos_close", CallingConvention = CallingConvention.StdCall)] - static extern public int Close(long taos); + [DllImport("taos.dll", EntryPoint = "taos_close", CallingConvention = CallingConvention.Cdecl)] + static extern public int Close(IntPtr taos); } } \ No newline at end of file diff --git a/tests/examples/C#/TDengineTest.cs b/tests/examples/C#/TDengineTest.cs index 235775f68fcb7280b491f347a0962010af535e29..6b3f1160adc08fd18a16a1292b08a8b798ce389d 100644 --- a/tests/examples/C#/TDengineTest.cs +++ b/tests/examples/C#/TDengineTest.cs @@ -28,7 +28,7 @@ namespace TDengineDriver private string configDir; private string user; private string password; - private int port = 0; + private short port = 0; //sql parameters private string dbName; @@ -43,7 +43,7 @@ namespace TDengineDriver private long batchRows; private long beginTimestamp = 1551369600000L; - private long conn = 0; + private IntPtr conn = IntPtr.Zero; private long rowsInserted = 0; static void Main(string[] args) @@ -191,7 +191,7 @@ namespace TDengineDriver { string db = ""; this.conn = TDengine.Connect(this.host, this.user, this.password, db, this.port); - if (this.conn == 0) + if (this.conn == IntPtr.Zero) { Console.WriteLine("Connect to TDengine failed"); ExitProgram(); @@ -211,58 +211,62 @@ namespace TDengineDriver StringBuilder sql = new StringBuilder(); sql.Append("create database if not exists ").Append(this.dbName); - int code = TDengine.Query(this.conn, sql.ToString()); - if (code == TDengine.TSDB_CODE_SUCCESS) + IntPtr res = TDengine.Query(this.conn, sql.ToString()); + if (res != IntPtr.Zero) { Console.WriteLine(sql.ToString() + " success"); } else { - Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(conn)); + Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res)); ExitProgram(); } + TDengine.FreeResult(res); sql.Clear(); sql.Append("use ").Append(this.dbName); - code = TDengine.Query(this.conn, sql.ToString()); - if (code == TDengine.TSDB_CODE_SUCCESS) + res = TDengine.Query(this.conn, sql.ToString()); + if (res != IntPtr.Zero) { Console.WriteLine(sql.ToString() + " success"); } else { - Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(this.conn)); + Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res)); ExitProgram(); } + TDengine.FreeResult(res); sql.Clear(); - sql.Append("create table if not exists ").Append(this.stableName).Append("(ts timestamp, v1 int) tags(t1 int)"); - code = TDengine.Query(this.conn, sql.ToString()); - if (code == TDengine.TSDB_CODE_SUCCESS) + sql.Append("create table if not exists ").Append(this.stableName).Append("(ts timestamp, v1 bool, v2 tinyint, v3 smallint, v4 int, v5 bigint, v6 float, v7 double, v8 binary(10), v9 nchar(10)) tags(t1 int)"); + res = TDengine.Query(this.conn, sql.ToString()); + if (res != IntPtr.Zero) { Console.WriteLine(sql.ToString() + " success"); } else { - Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(this.conn)); + Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res)); ExitProgram(); } + TDengine.FreeResult(res); for (int i = 0; i < this.tableCount; i++) { sql.Clear(); sql = sql.Append("create table if not exists ").Append(this.tablePrefix).Append(i) .Append(" using ").Append(this.stableName).Append(" tags(").Append(i).Append(")"); - code = TDengine.Query(this.conn, sql.ToString()); - if (code == TDengine.TSDB_CODE_SUCCESS) + res = TDengine.Query(this.conn, sql.ToString()); + if (res != IntPtr.Zero) { Console.WriteLine(sql.ToString() + " success"); } else { - Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(this.conn)); + Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res)); ExitProgram(); } + TDengine.FreeResult(res); } Console.WriteLine("create db and table success"); @@ -287,16 +291,22 @@ namespace TDengineDriver for (int batch = 0; batch < this.batchRows; ++batch) { long rows = loop * this.batchRows + batch; - sql.Append("(").Append(this.beginTimestamp + rows).Append(",").Append(rows).Append(")"); + sql.Append("(") + .Append(this.beginTimestamp + rows) + .Append(", 1, 2, 3,") + .Append(rows) + .Append(", 5, 6, 7, 'abc', 'def')"); } - int code = TDengine.Query(conn, sql.ToString()); - if (code != TDengine.TSDB_CODE_SUCCESS) + IntPtr res = TDengine.Query(this.conn, sql.ToString()); + if (res == IntPtr.Zero) { - Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(conn)); + Console.WriteLine(sql.ToString() + " failure, reason: " + TDengine.Error(res)); } - int affectRows = TDengine.AffectRows(conn); + int affectRows = TDengine.AffectRows(res); this.rowsInserted += affectRows; + + TDengine.FreeResult(res); } } @@ -316,51 +326,45 @@ namespace TDengineDriver System.DateTime start = new System.DateTime(); long queryRows = 0; - - for (int i = 0; i < this.tableCount; ++i) + + for (int i = 0; i < 1/*this.tableCount*/; ++i) { String sql = "select * from " + this.dbName + "." + tablePrefix + i; Console.WriteLine(sql); - int code = TDengine.Query(conn, sql); - if (code != TDengine.TSDB_CODE_SUCCESS) + IntPtr res = TDengine.Query(conn, sql); + if (res == IntPtr.Zero) { - Console.WriteLine(sql + " failure, reason: " + TDengine.Error(conn)); + Console.WriteLine(sql + " failure, reason: " + TDengine.Error(res)); ExitProgram(); } - int fieldCount = TDengine.FieldCount(conn); - //Console.WriteLine("field count: " + fieldCount); + int fieldCount = TDengine.FieldCount(res); + Console.WriteLine("field count: " + fieldCount); - List metas = TDengine.FetchFields(conn); + List metas = TDengine.FetchFields(res); for (int j = 0; j < metas.Count; j++) { TDengineMeta meta = (TDengineMeta)metas[j]; - //Console.WriteLine("index:" + j + ", type:" + meta.type + ", typename:" + meta.TypeName() + ", name:" + meta.name + ", size:" + meta.size); - } - - long result = TDengine.UseResult(conn); - if (result == 0) - { - Console.WriteLine(sql + " result set is null"); - return; + Console.WriteLine("index:" + j + ", type:" + meta.type + ", typename:" + meta.TypeName() + ", name:" + meta.name + ", size:" + meta.size); } IntPtr rowdata; - while ((rowdata = TDengine.FetchRows(result)) != IntPtr.Zero) + StringBuilder builder = new StringBuilder(); + while ((rowdata = TDengine.FetchRows(res)) != IntPtr.Zero) { queryRows++; for (int fields = 0; fields < fieldCount; ++fields) { TDengineMeta meta = metas[fields]; - int offset = 8 * fields; + int offset = IntPtr.Size * fields; IntPtr data = Marshal.ReadIntPtr(rowdata, offset); - //Console.Write("---"); + builder.Append("---"); if (data == IntPtr.Zero) { - //Console.Write("NULL"); + builder.Append("NULL"); continue; } @@ -368,55 +372,61 @@ namespace TDengineDriver { case TDengineDataType.TSDB_DATA_TYPE_BOOL: bool v1 = Marshal.ReadByte(data) == 0 ? false : true; - //Console.Write(v1); + builder.Append(v1); break; case TDengineDataType.TSDB_DATA_TYPE_TINYINT: byte v2 = Marshal.ReadByte(data); - //Console.Write(v2); + builder.Append(v2); break; case TDengineDataType.TSDB_DATA_TYPE_SMALLINT: short v3 = Marshal.ReadInt16(data); - //Console.Write(v3); + builder.Append(v3); break; case TDengineDataType.TSDB_DATA_TYPE_INT: int v4 = Marshal.ReadInt32(data); - //Console.Write(v4); + builder.Append(v4); break; case TDengineDataType.TSDB_DATA_TYPE_BIGINT: long v5 = Marshal.ReadInt64(data); - //Console.Write(v5); + builder.Append(v5); break; case TDengineDataType.TSDB_DATA_TYPE_FLOAT: float v6 = (float)Marshal.PtrToStructure(data, typeof(float)); - //Console.Write(v6); + builder.Append(v6); break; case TDengineDataType.TSDB_DATA_TYPE_DOUBLE: double v7 = (double)Marshal.PtrToStructure(data, typeof(double)); - //Console.Write(v7); + builder.Append(v7); break; case TDengineDataType.TSDB_DATA_TYPE_BINARY: string v8 = Marshal.PtrToStringAnsi(data); - //Console.Write(v8); + builder.Append(v8); break; case TDengineDataType.TSDB_DATA_TYPE_TIMESTAMP: long v9 = Marshal.ReadInt64(data); - //Console.Write(v9); + builder.Append(v9); break; case TDengineDataType.TSDB_DATA_TYPE_NCHAR: string v10 = Marshal.PtrToStringAnsi(data); - //Console.Write(v10); + builder.Append(v10); break; } } - //Console.WriteLine("---"); + builder.Append("---"); + + if (queryRows <= 10) + { + Console.WriteLine(builder.ToString()); + } + builder.Clear(); } - if (TDengine.ErrorNo(conn) != 0) + if (TDengine.ErrorNo(res) != 0) { - Console.Write("Query is not completeļ¼Œ Error {0:G}", TDengine.ErrorNo(conn), TDengine.Error(conn)); + Console.Write("Query is not complete, Error {0:G}", TDengine.ErrorNo(res), TDengine.Error(res)); } - TDengine.FreeResult(result); + TDengine.FreeResult(res); } System.DateTime end = new System.DateTime(); @@ -428,14 +438,15 @@ namespace TDengineDriver public void CloseConnection() { - if (conn != 0) + if (this.conn != IntPtr.Zero) { - TDengine.Close(conn); + TDengine.Close(this.conn); } } static void ExitProgram() { + TDengine.Cleanup(); System.Environment.Exit(0); } } diff --git a/tests/script/tmp/prepare.sim b/tests/script/tmp/prepare.sim index f59eebede02311351978ce419ae8c4afa3e62308..ab2ad01c32f752ae1e1e0dd3afab3c7a221160df 100644 --- a/tests/script/tmp/prepare.sim +++ b/tests/script/tmp/prepare.sim @@ -32,10 +32,16 @@ system sh/cfg.sh -n dnode2 -c http -v 1 system sh/cfg.sh -n dnode3 -c http -v 1 system sh/cfg.sh -n dnode4 -c http -v 1 - +# for crash_gen +system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2 +system sh/cfg.sh -n dnode1 -c rpcMaxTime -v 101 +system sh/cfg.sh -n dnode1 -c cache -v 2 +system sh/cfg.sh -n dnode1 -c keep -v 36500 system sh/cfg.sh -n dnode1 -c walLevel -v 2 +# for windows + system sh/cfg.sh -n dnode1 -c firstEp -v 152.136.17.116:6030 system sh/cfg.sh -n dnode1 -c secondEp -v 152.136.17.116:6030 system sh/cfg.sh -n dnode1 -c serverPort -v 6030 diff --git a/tests/tsim/inc/sim.h b/tests/tsim/inc/sim.h index 18af21a506e41acc1139e187c498ee9cde71946c..6f3bc7099d306598da774fb9eeb6712e415ac1ce 100644 --- a/tests/tsim/inc/sim.h +++ b/tests/tsim/inc/sim.h @@ -96,20 +96,20 @@ enum { struct _script_t; typedef struct _cmd_t { - short cmdno; - short nlen; - char name[MAX_SIM_CMD_NAME_LEN]; - bool (*parseCmd)(char *, struct _cmd_t *, int); - bool (*executeCmd)(struct _script_t *script, char *option); + int16_t cmdno; + int16_t nlen; + char name[MAX_SIM_CMD_NAME_LEN]; + bool (*parseCmd)(char *, struct _cmd_t *, int); + bool (*executeCmd)(struct _script_t *script, char *option); struct _cmd_t *next; } SCommand; typedef struct { - short cmdno; - short jump; // jump position - short errorJump; // sql jump flag, while '-x' exist in sql cmd, this flag - // will be SQL_JUMP_TRUE, otherwise is SQL_JUMP_FALSE */ - short lineNum; // correspodning line number in original file + int16_t cmdno; + int16_t jump; // jump position + int16_t errorJump; // sql jump flag, while '-x' exist in sql cmd, this flag + // will be SQL_JUMP_TRUE, otherwise is SQL_JUMP_FALSE */ + int16_t lineNum; // correspodning line number in original file int optionOffset; // relative option offset } SCmdLine; @@ -120,7 +120,7 @@ typedef struct _var_t { } SVariable; typedef struct _script_t { - int type; + int type; bool killed; void *taos; @@ -130,10 +130,10 @@ typedef struct _script_t { char system_exit_code[12]; char system_ret_content[MAX_SYSTEM_RESULT_LEN]; - int varLen; - int linePos; // current cmd position - int numOfLines; // number of lines in the script - int bgScriptLen; + int varLen; + int linePos; // current cmd position + int numOfLines; // number of lines in the script + int bgScriptLen; char fileName[MAX_FILE_NAME_LEN]; // script file name char error[MAX_ERROR_LEN]; char *optionBuffer; diff --git a/tests/tsim/inc/simParse.h b/tests/tsim/inc/simParse.h index cff66e6c4f50e4f5d2573eed16422004cc520bf3..d3f92add71f3dc6ddb205c1810e1a3caace47e36 100644 --- a/tests/tsim/inc/simParse.h +++ b/tests/tsim/inc/simParse.h @@ -33,21 +33,21 @@ enum { /* label stack */ typedef struct { - char top; /* number of labels */ - short pos[MAX_NUM_LABLES]; /* the position of the label */ - char label[MAX_NUM_LABLES][MAX_LABEL_LEN]; /* name of the label */ + char top; /* number of labels */ + int16_t pos[MAX_NUM_LABLES]; /* the position of the label */ + char label[MAX_NUM_LABLES][MAX_LABEL_LEN]; /* name of the label */ } SLabel; /* block definition */ typedef struct { - char top; /* the number of blocks stacked */ - char type[MAX_NUM_BLOCK]; /* the block type */ - short *pos[MAX_NUM_BLOCK]; /* position of the jump for if/elif/case */ - short back[MAX_NUM_BLOCK]; /* go back, endw and continue */ - char numJump[MAX_NUM_BLOCK]; - short *jump[MAX_NUM_BLOCK][MAX_NUM_JUMP]; /* break or elif */ - char sexp[MAX_NUM_BLOCK][40]; /*switch expression */ - char sexpLen[MAX_NUM_BLOCK]; /*switch expression length */ + char top; /* the number of blocks stacked */ + char type[MAX_NUM_BLOCK]; /* the block type */ + int16_t *pos[MAX_NUM_BLOCK]; /* position of the jump for if/elif/case */ + int16_t back[MAX_NUM_BLOCK]; /* go back, endw and continue */ + char numJump[MAX_NUM_BLOCK]; + int16_t *jump[MAX_NUM_BLOCK][MAX_NUM_JUMP]; /* break or elif */ + char sexp[MAX_NUM_BLOCK][40]; /*switch expression */ + char sexpLen[MAX_NUM_BLOCK]; /*switch expression length */ } SBlock; bool simParseExpression(char *token, int lineNum);