diff --git a/.lgtm.yml b/.lgtm.yml new file mode 100644 index 0000000000000000000000000000000000000000..fbcedead432f57b7f9ed2161a4a1e500446b489f --- /dev/null +++ b/.lgtm.yml @@ -0,0 +1,402 @@ +########################################################################################## +# Customize file classifications. # +# Results from files under any classifier will be excluded from LGTM # +# statistics. # +########################################################################################## + +########################################################################################## +# Use the `path_classifiers` block to define changes to the default classification of # +# files. # +########################################################################################## + +path_classifiers: + # docs: + # Identify the top-level file called `generate_javadoc.py` as documentation-related. + test: + # Override LGTM's default classification of test files by excluding all files. + - exclude: / + # Classify all files in the top-level directories tests/ and testsuites/ as test code. + - tests + # - testsuites + # Classify all files with suffix `.test` as test code. + # Note: use only forward slash / as a path separator. + # Use ** to indicate an arbitrary parent path. + # Use * to indicate any sequence of characters excluding /. + # Always enclose the expression in double quotes if it includes *. + # - "**/*.test" + # Refine the classifications above by excluding files in test/util/. + # - exclude: test/util + # The default behavior is to tag all files created during the + # build as `generated`. Results are hidden for generated code. You can tag + # further files as being generated by adding them to the `generated` section. + generated: + # Exclude all `*.c` files under the `ui/` directory from classification as + # generated code. + # - exclude: ui/**/*.c + # By default, all files not checked into the repository are considered to be + # 'generated'. + # The default behavior is to tag library code as `library`. Results are hidden + # for library code. You can tag further files as being library code by adding them + # to the `library` section. + library: + - exclude: deps/ + # The default behavior is to tag template files as `template`. Results are hidden + # for template files. You can tag further files as being template files by adding + # them to the `template` section. + template: + #- exclude: path/to/template/code/**/*.c + # Define your own category, for example: 'some_custom_category'. + some_custom_category: + # Classify all files in the top-level directory tools/ (or the top-level file + # called tools). + # - tools + +######################################################################################### +# Use the `queries` block to change the default display of query results. # +######################################################################################### + + # queries: + # Start by hiding the results of all queries. + # - exclude: "*" + # Then include all queries tagged 'security' and 'correctness', and with a severity of + # 'error'. + # - include: + # tags: + # - "security" + # - "correctness" + # severity: "error" + # Specifically hide the results of two queries. + # - exclude: cpp/use-of-goto + # - exclude: java/equals-on-unrelated-types + # Refine by including the `java/command-line-injection` query. + # - include: java/command-line-injection + +######################################################################################### +# Define changes to the default code extraction process. # +# Each block configures the extraction of a single language, and modifies actions in a # +# named step. Every named step includes automatic default actions, # +# except for the 'prepare' step. The steps are performed in the following sequence: # +# prepare # +# after_prepare # +# configure (C/C++ only) # +# python_setup (Python only) # +# before_index # +# index # +########################################################################################## + +######################################################################################### +# Environment variables available to the steps: # +######################################################################################### + +# LGTM_SRC +# The root of the source tree. +# LGTM_WORKSPACE +# An existing (initially empty) folder outside the source tree. +# Used for temporary download and setup commands. + +######################################################################################### +# Use the extraction block to define changes to the default code extraction process # +# for one or more languages. The settings for each language are defined in a child # +# block, with one or more steps. # +######################################################################################### + +extraction: + # Define settings for C/C++ analysis + ##################################### + cpp: + # The `prepare` step exists for customization on LGTM.com only. + prepare: + # # The `packages` section is valid for LGTM.com only. It names Ubuntu packages to + # # be installed. + packages: + - cmake + # Add an `after-prepare` step if you need to run commands after the prepare step. + # Each command should be listed on a separate line. + # This step is useful for C/C++ analysis where you want to prepare the environment + # for the `configure` step without changing the default behavior for that step. + # after_prepare: + #- export GNU_MAKE=make + #- export GIT=true + # The `configure` step generates build configuration files which the `index` step + # then uses to build the codebase. + configure: + command: + - mkdir build + - cd build + - cmake .. + # - ./prepare_deps + # Optional step. You should add a `before_index` step if you need to run commands + # before the `index` step. + # before_index: + # - export BOOST_DIR=$LGTM_SRC/boost + # - export GTEST_DIR=$LGTM_SRC/googletest + # - export HUNSPELL_DIR=$LGTM_SRC/hunspell + # - export CRYPTOPP_DIR=$LGTM_SRC/cryptopp + # The `index` step builds the code and extracts information during the build + # process. + index: + # Override the autobuild process by specifying a list of custom build commands + # to use instead. + build_command: + - cd build + - make + # - $GNU_MAKE -j2 -s + # Specify that all project or solution files should be used for extraction. + # Default: false. + # all_solutions: true + # Specify a list of one or more project or solution files for extraction. + # Default: LGTM chooses the file closest to the root of the repository (this may + # fail if there are multiple candidates). + # solution: + # - myProject.sln + # Specify MSBuild settings + # msbuild: + # Specify a list of additional arguments to MSBuild. Default: empty. + # arguments: /p:Platform=x64 /p:Configuration=Release + # Specify the MSBuild configuration to use, for example, debug or release. + # Default: read from the solution file or files. + # configuration: + # Specify the platform to target, for example: x86, x64, or Any CPU. + # Default: read from the solution file or files. + # platform: + # Specify the MSBuild target. Default: rebuild. + # target: + # Specify whether or not to perform a NuGet restore for extraction. Default: true. + # nuget_restore: false + # Specify a version of Microsoft Visual Studio to use for MSBuild or any custom + # build commands (build_command). For example: + # 10 for Visual Studio 2010 + # 12 for Visual Studio 2012 + # 14 for Visual Studio 2015 + # 15 for Visual Studio 2017 + # Default: read from project files. + # vstools_version: 10 + + # Define settings for C# analysis + ################################## + # csharp: + # The `prepare` step exists for customization on LGTM.com only. + # prepare: + # packages: + # - example_package + # Add an `after-prepare` step if you need to run commands after the `prepare` step. + # Each command should be listed on a separate line. + # after_prepare: + # - export PATH=$LGTM_WORKSPACE/tools:$PATH + # The `index` step builds the code and extracts information during the build + # process. + #index: + # Specify that all project or solution files should be used for extraction. + # Default: false. + # all_solutions: true + # Specify a list of one or more project or solution files for extraction. + # Default: LGTM chooses the file closest to the root of the repository (this may + # fail if there are multiple candidates). + # solution: + # - myProject.sln + # Override the autobuild process by specifying a list of custom build commands + # to use instead. + # build_command: + # - ./example-compile-all.sh + # By default, LGTM analyzes the code by building it. You can override this, + # and tell LGTM not to build the code. Beware that this can lead + # to less accurate results. + # buildless: true + # Specify .NET Core settings. + # dotnet: + # Specify additional arguments to `dotnet build`. + # Default: empty. + # arguments: "example_arg" + # Specify the version of .NET Core SDK to use. + # Default: The version installed on the build machine. + # version: 2.1 + # Specify MSBuild settings. + # msbuild: + # Specify a list of additional arguments to MSBuild. Default: empty. + # arguments: /P:WarningLevel=2 + # Specify the MSBuild configuration to use, for example, debug or release. + # Default: read from the solution file or files. + # configuration: release + # Specify the platform to target, for example: x86, x64, or Any CPU. + # Default: read from the solution file or files. + # platform: x86 + # Specify the MSBuild target. Default: rebuild. + # target: notest + # Specify whether or not to perform a NuGet restore for extraction. Default: true. + # nuget_restore: false + # Specify a version of Microsoft Visual Studio to use for MSBuild or any custom + # build commands (build_command). For example: + # 10 for Visual Studio 2010 + # 12 for Visual Studio 2012 + # 14 for Visual Studio 2015 + # 15 for Visual Studio 2017 + # Default: read from project files + # vstools_version: 10 + # Specify additional options for the extractor, + # for example --fast to perform a faster extraction that produces a smaller + # database. + # extractor: "--fast" + + # Define settings for Go analysis + ################################## + # go: + # The `prepare` step exists for customization on LGTM.com only. + # prepare: + # packages: + # - example_package + # Add an `after-prepare` step if you need to run commands after the `prepare` step. + # Each command should be listed on a separate line. + # after_prepare: + # - export PATH=$LGTM_WORKSPACE/tools:$PATH + # The `index` step builds the code and extracts information during the build + # process. + # index: + # Override the autobuild process by specifying a list of custom build commands + # to use instead. + # build_command: + # - ./compile-all.sh + + # Define settings for Java analysis + #################################### + # java: + # The `prepare` step exists for customization on LGTM.com only. + # prepare: + # packages: + # - example_package + # Add an `after-prepare` step if you need to run commands after the prepare step. + # Each command should be listed on a separate line. + # after_prepare: + # - export PATH=$LGTM_WORKSPACE/tools:$PATH + # The `index` step extracts information from the files in the codebase. + # index: + # Specify Gradle settings. + # gradle: + # Specify the required Gradle version. + # Default: determined automatically. + # version: 4.4 + # Override the autobuild process by specifying a list of custom build commands + # to use instead. + # build_command: ./compile-all.sh + # Specify the Java version required to build the project. + # java_version: 11 + # Specify whether to extract Java .properties files + # Default: false + # properties_files: true + # Specify Maven settings. + # maven: + # Specify the path (absolute or relative) of a Maven settings file to use. + # Default: Maven uses a settings file in the default location, if it exists. + # settings_file: /opt/share/settings.xml + # Specify the path of a Maven toolchains file. + # Default: Maven uses a toolchains file in the default location, if it exists. + # toolchains_file: /opt/share/toolchains.xml + # Specify the required Maven version. + # Default: the Maven version is determined automatically, where feasible. + # version: 3.5.2 + # Specify how XML files should be extracted: + # all = extract all XML files. + # default = only extract XML files named `AndroidManifest.xml`, `pom.xml`, and `web.xml`. + # disabled = do not extract any XML files. + # xml_mode: all + + # Define settings for JavaScript analysis + ########################################## + # javascript: + # The `prepare` step exists for customization on LGTM.com only. + # prepare: + # packages: + # - example_package + # Add an `after-prepare` step if you need to run commands after the prepare step. + # Each command should be listed on a separate line. + # after_prepare: + # - export PATH=$LGTM_WORKSPACE/tools:$PATH + # The `index` step extracts information from the files in the codebase. + # index: + # Specify a list of files and folders to extract. + # Default: The project root directory. + # include: + # - src/js + # Specify a list of files and folders to exclude from extraction. + # exclude: + # - thirdparty/lib + # You can add additional file types for LGTM to extract, by mapping file + # extensions (including the leading dot) to file types. The usual + # include/exclude patterns apply, so, for example, `.jsm` files under + # `thirdparty/lib` will not be extracted. + # filetypes: + # ".jsm": "js" + # ".tmpl": "html" + # Specify a list of glob patterns to include/exclude files from extraction; this + # is applied on top of the include/exclude paths from above; patterns are + # processed in the same way as for path classifiers above. + # Default: include all files with known extensions (such as .js, .ts and .html), + # but exclude files ending in `-min.js` or `.min.js` and folders named `node_modules` + # or `bower_components` + # filters: + # exclude any *.ts files anywhere. + # - exclude: "**/*.ts" + # but include *.ts files under src/js/typescript. + # - include: "src/js/typescript/**/*.ts" + # Specify how TypeScript files should be extracted: + # none = exclude all TypeScript files. + # basic = extract syntactic information from TypeScript files. + # full = extract syntactic and type information from TypeScript files. + # Default: full. + # typescript: basic + # By default, LGTM doesn't extract any XML files. You can override this by + # using the `xml_mode` property and setting it to `all`. + # xml_mode: all + + # Define settings for Python analysis + ###################################### + # python: + # # The `prepare` step exists for customization on LGTM.com only. + # # prepare: + # # # The `packages` section is valid for LGTM.com only. It names packages to + # # # be installed. + # # packages: libpng-dev + # # This step is useful for Python analysis where you want to prepare the + # # environment for the `python_setup` step without changing the default behavior + # # for that step. + # after_prepare: + # - export PATH=$LGTM_WORKSPACE/tools:$PATH + # # This sets up the Python interpreter and virtual environment, ready for the + # # `index` step to extract the codebase. + # python_setup: + # # Specify packages that should NOT be installed despite being mentioned in the + # # requirements.txt file. + # # Default: no package marked for exclusion. + # exclude_requirements: + # - pywin32 + # # Specify a list of pip packages to install. + # # If any of these packages cannot be installed, the extraction will fail. + # requirements: + # - Pillow + # # Specify a list of requirements text files to use to set up the environment, + # # or false for none. Default: any requirements.txt, test-requirements.txt, + # # and similarly named files identified in the codebase are used. + # requirements_files: + # - required-packages.txt + # # Specify a setup.py file to use to set up the environment, or false for none. + # # Default: any setup.py files identified in the codebase are used in preference + # # to any requirements text files. + # setup_py: new-setup.py + # # Override the version of the Python interpreter used for setup and extraction + # # Default: Python 3. + # version: 2 + # # Optional step. You should add a `before_index` step if you need to run commands + # # before the `index` step. + # before_index: + # - antlr4 -Dlanguage=Python3 Grammar.g4 + # # The `index` step extracts information from the files in the codebase. + # index: + # # Specify a list of files and folders to exclude from extraction. + # # Default: Git submodules and Subversion externals. + # exclude: + # - legacy-implementation + # - thirdparty/libs + # filters: + # - exclude: "**/documentation/examples/snippets/*.py" + # - include: "**/documentation/examples/test_application/*" + # include: + # - example/to/include diff --git a/README.md b/README.md index 522fc0ebc1277684b77589c5c62d061f7b33ce32..36436dd549e22f58caefd82c4a9be4e8aed8d869 100644 --- a/README.md +++ b/README.md @@ -83,12 +83,18 @@ sudo dnf install -y maven ## Get the source codes -- github: +First of all, you may clone the source codes from github: ```bash git clone https://github.com/taosdata/TDengine.git cd TDengine ``` +The connectors for go & grafana have been moved to separated repositories, +so you should run this command in the TDengine directory to install them: +```bash +git submodule update --init --recursive +``` + ## Build TDengine ### On Linux platform diff --git a/src/connector/go b/src/connector/go index 567b7b12f3fd2775c718d284beffc8c38dd6c219..8c58c512b6acda8bcdfa48fdc7140227b5221766 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 567b7b12f3fd2775c718d284beffc8c38dd6c219 +Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f0b8c996c5aec747e69785c469946f80856e05e5..414d37d8b8c3141784a22bb117a8ec6d92b4096a 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); @@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) { tError("failed to malloc msg, size:%d", size); return NULL; } else { - tDebug("malloc msg: %p", start); + tTrace("malloc mem: %p", start); } return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); @@ -333,7 +333,7 @@ void rpcFreeCont(void *cont) { if (cont) { char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext); free(temp); - tDebug("free mem: %p", temp); + tTrace("free mem: %p", temp); } } @@ -553,7 +553,7 @@ static void rpcFreeMsg(void *msg) { if ( msg ) { char *temp = (char *)msg - sizeof(SRpcReqContext); free(temp); - tDebug("free msg: %p", temp); + tTrace("free mem: %p", temp); } } @@ -819,9 +819,18 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; } + if (rpcContLenFromMsg(pHead->msgLen) <= 0) { + tDebug("%s, message body is empty, ignore", pConn->info); + return TSDB_CODE_RPC_APP_ERROR; + } + pConn->inTranId = pHead->tranId; pConn->inType = pHead->msgType; + // start the progress timer to monitor the response from server app + if (pConn->connType != RPC_CONN_TCPS) + pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl); + return 0; } @@ -881,17 +890,32 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { pConn->outType = 0; pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; + SRpcReqContext *pContext = pConn->pContext; + + if (pHead->code == TSDB_CODE_RPC_REDIRECT) { + if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) { + // if EpSet is not included in the msg, treat it as NOT_READY + pHead->code = TSDB_CODE_RPC_NOT_READY; + } else { + pContext->redirect++; + if (pContext->redirect > TSDB_MAX_REPLICA) { + pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + tWarn("%s, too many redirects, quit", pConn->info); + } + } + } return TSDB_CODE_SUCCESS; } -static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { +static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { int32_t sid; SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); + *ppContext = NULL; if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); @@ -945,6 +969,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); + *ppContext = pConn->pContext; } } @@ -1009,7 +1034,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } terrno = 0; - pConn = rpcProcessMsgHead(pRpc, pRecv); + SRpcReqContext *pContext; + pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, @@ -1029,7 +1055,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // msg is passed to app only parsing is ok - rpcProcessIncomingMsg(pConn, pHead); + rpcProcessIncomingMsg(pConn, pHead, pContext); } } @@ -1060,7 +1086,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { rpcFreeCont(pContext->pCont); } -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1070,29 +1096,18 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; rpcMsg.code = pHead->code; - rpcMsg.ahandle = pConn->ahandle; if ( rpcIsReq(pHead->msgType) ) { - if (rpcMsg.contLen > 0) { - rpcMsg.handle = pConn; - rpcAddRef(pRpc); // add the refCount for requests + rpcMsg.ahandle = pConn->ahandle; + rpcMsg.handle = pConn; + rpcAddRef(pRpc); // add the refCount for requests - // start the progress timer to monitor the response from server app - if (pConn->connType != RPC_CONN_TCPS) - pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); - - // notify the server app - (*(pRpc->cfp))(&rpcMsg, NULL); - } else { - tDebug("%s, message body is empty, ignore", pConn->info); - rpcFreeCont(rpcMsg.pCont); - } + // notify the server app + (*(pRpc->cfp))(&rpcMsg, NULL); } else { // it's a response - SRpcReqContext *pContext = pConn->pContext; rpcMsg.handle = pContext; - pConn->pContext = NULL; - pConn->pReqMsg = NULL; + rpcMsg.ahandle = pContext->ahandle; // for UDP, port may be changed by server, the port in epSet shall be used for cache if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { @@ -1101,19 +1116,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { rpcCloseConn(pConn); } - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { - if (rpcMsg.contLen < sizeof(SRpcEpSet)) { - // if EpSet is not included in the msg, treat it as NOT_READY - pHead->code = TSDB_CODE_RPC_NOT_READY; - } else { - pContext->redirect++; - if (pContext->redirect > TSDB_MAX_REPLICA) { - pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - tWarn("%s, too many redirects, quit", pConn->info); - } - } - } - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { pContext->numOfTry = 0; SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; @@ -1445,7 +1447,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { pNewHead->msgLen = rpcMsgLenFromCont(origLen); rpcFreeMsg(pHead); // free the compressed message buffer pHead = pNewHead; - //tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); + tTrace("decomp malloc mem: %p", temp); } else { tError("failed to allocate memory to decompress msg, contLen:%d", contLen); } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 2a3facdb36c0b5ea71aa4b3140245f06404f8bea..dd9e7684e03e5cfc8c44dc3555a4ad1d144b90b6 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -62,7 +62,7 @@ typedef struct { char label[TSDB_LABEL_LEN]; int numOfThreads; void * shandle; - SThreadObj *pThreadObj; + SThreadObj **pThreadObj; pthread_t thread; } SServerObj; @@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); pServerObj->numOfThreads = numOfThreads; - pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads); + pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads); if (pServerObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); terrno = TAOS_SYSTEM_ERROR(errno); @@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); // initialize parameters in case it may encounter error later - pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1); + if (pThreadObj == NULL) { + tError("TCP:%s no enough memory", label); + terrno = TAOS_SYSTEM_ERROR(errno); + for (int j=0; jpThreadObj[j]); + free(pServerObj->pThreadObj); + free(pServerObj); + return NULL; + } + + pServerObj->pThreadObj[i] = pThreadObj; pThreadObj->pollFd = -1; taosResetPthread(&pThreadObj->thread); pThreadObj->processData = fp; tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); pThreadObj->shandle = shandle; - pThreadObj++; } // initialize mutex, thread, fd which may fail - pThreadObj = pServerObj->pThreadObj; for (int i = 0; i < numOfThreads; ++i) { + pThreadObj = pServerObj->pThreadObj[i]; code = pthread_mutex_init(&(pThreadObj->mutex), NULL); if (code < 0) { tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno)); @@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } pThreadObj->threadId = i; - pThreadObj++; } pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); @@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { pThreadObj->stop = true; eventfd_t fd = -1; + if (taosComparePthread(pThreadObj->thread, pthread_self())) { + pthread_detach(pthread_self()); + return; + } + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { // signal the thread to stop, try graceful method first, // and use pthread_cancel when failed @@ -183,15 +196,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) { } } - if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL); - if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); - if (fd != -1) taosCloseSocket(fd); - - while (pThreadObj->pHead) { - SFdObj *pFdObj = pThreadObj->pHead; - pThreadObj->pHead = pFdObj->next; - taosFreeFdObj(pFdObj); + if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) { + pthread_join(pThreadObj->thread, NULL); } + + if (fd != -1) taosCloseSocket(fd); } void taosStopTcpServer(void *handle) { @@ -199,7 +208,14 @@ void taosStopTcpServer(void *handle) { if (pServerObj == NULL) return; if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD); - if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL); + + if (taosCheckPthreadValid(pServerObj->thread)) { + if (taosComparePthread(pServerObj->thread, pthread_self())) { + pthread_detach(pthread_self()); + } else { + pthread_join(pServerObj->thread, NULL); + } + } tDebug("%s TCP server is stopped", pServerObj->label); } @@ -210,9 +226,8 @@ void taosCleanUpTcpServer(void *handle) { if (pServerObj == NULL) return; for (int i = 0; i < pServerObj->numOfThreads; ++i) { - pThreadObj = pServerObj->pThreadObj + i; + pThreadObj = pServerObj->pThreadObj[i]; taosStopTcpThread(pThreadObj); - pthread_mutex_destroy(&(pThreadObj->mutex)); } tDebug("%s TCP server is cleaned up", pServerObj->label); @@ -249,7 +264,7 @@ static void *taosAcceptTcpConnection(void *arg) { taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); // pick up the thread to handle this connection - pThreadObj = pServerObj->pThreadObj + threadId; + pThreadObj = pServerObj->pThreadObj[threadId]; SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd); if (pFdObj) { @@ -327,10 +342,8 @@ void taosCleanUpTcpClient(void *chandle) { SThreadObj *pThreadObj = chandle; if (pThreadObj == NULL) return; + tDebug ("%s TCP client will be cleaned up", pThreadObj->label); taosStopTcpThread(pThreadObj); - tDebug ("%s TCP client is cleaned up", pThreadObj->label); - - taosTFree(pThreadObj); } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { @@ -365,7 +378,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin void taosCloseTcpConnection(void *chandle) { SFdObj *pFdObj = chandle; - if (pFdObj == NULL) return; + if (pFdObj == NULL || pFdObj->signature != pFdObj) return; SThreadObj *pThreadObj = pFdObj->pThreadObj; tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); @@ -378,7 +391,7 @@ void taosCloseTcpConnection(void *chandle) { int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) { SFdObj *pFdObj = chandle; - if (chandle == NULL) return -1; + if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1; return taosWriteMsg(pFdObj->fd, data, len); } @@ -425,7 +438,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); return -1; } else { - tDebug("TCP malloc mem: %p", buffer); + tTrace("TCP malloc mem: %p", buffer); } msg = buffer + tsRpcOverhead; @@ -503,8 +516,22 @@ static void *taosProcessTcpData(void *param) { pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); } + + if (pThreadObj->stop) break; + } + + if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); + + while (pThreadObj->pHead) { + SFdObj *pFdObj = pThreadObj->pHead; + pThreadObj->pHead = pFdObj->next; + taosFreeFdObj(pFdObj); } + pthread_mutex_destroy(&(pThreadObj->mutex)); + tDebug("%s TCP thread exits ...", pThreadObj->label); + taosTFree(pThreadObj); + return NULL; } diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 4ea47582b9c93fbd483454a50e1bf6aed5237870..6f653046615f162c516b5eebf08995d30c6214d7 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -214,7 +214,7 @@ static void *taosRecvUdpData(void *param) { tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen); continue; } else { - tDebug("UDP malloc mem: %p", tmsg); + tTrace("UDP malloc mem: %p", tmsg); } tmsg += tsRpcOverhead; // overhead for SRpcReqContext diff --git a/src/sync/src/taosTcpPool.c b/src/sync/src/taosTcpPool.c index eda822b1ec0b66ac984106566e03d5a1131ede8a..6a210a136ffe67b2e1394d26bac4cb5083452c8c 100644 --- a/src/sync/src/taosTcpPool.c +++ b/src/sync/src/taosTcpPool.c @@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) { continue; } } + } + + if (pThread->stop) break; } uDebug("%p TCP epoll thread exits", pThread); @@ -321,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) { } pthread_join(thread, NULL); - taosClose(fd); + if (fd >= 0) taosClose(fd); }