Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
0aee0028
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0aee0028
编写于
10月 08, 2020
作者:
F
freemine
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'upstream/develop' into odbc
上级
ac16f203
7df888c1
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
507 addition
and
67 deletion
+507
-67
.lgtm.yml
.lgtm.yml
+402
-0
README.md
README.md
+7
-1
src/connector/go
src/connector/go
+1
-1
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+41
-39
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+51
-24
src/rpc/src/rpcUdp.c
src/rpc/src/rpcUdp.c
+1
-1
src/sync/src/taosTcpPool.c
src/sync/src/taosTcpPool.c
+4
-1
未找到文件。
.lgtm.yml
0 → 100644
浏览文件 @
0aee0028
##########################################################################################
# Customize file classifications. #
# Results from files under any classifier will be excluded from LGTM #
# statistics. #
##########################################################################################
##########################################################################################
# Use the `path_classifiers` block to define changes to the default classification of #
# files. #
##########################################################################################
path_classifiers
:
# docs:
# Identify the top-level file called `generate_javadoc.py` as documentation-related.
test
:
# Override LGTM's default classification of test files by excluding all files.
-
exclude
:
/
# Classify all files in the top-level directories tests/ and testsuites/ as test code.
-
tests
# - testsuites
# Classify all files with suffix `.test` as test code.
# Note: use only forward slash / as a path separator.
# Use ** to indicate an arbitrary parent path.
# Use * to indicate any sequence of characters excluding /.
# Always enclose the expression in double quotes if it includes *.
# - "**/*.test"
# Refine the classifications above by excluding files in test/util/.
# - exclude: test/util
# The default behavior is to tag all files created during the
# build as `generated`. Results are hidden for generated code. You can tag
# further files as being generated by adding them to the `generated` section.
generated
:
# Exclude all `*.c` files under the `ui/` directory from classification as
# generated code.
# - exclude: ui/**/*.c
# By default, all files not checked into the repository are considered to be
# 'generated'.
# The default behavior is to tag library code as `library`. Results are hidden
# for library code. You can tag further files as being library code by adding them
# to the `library` section.
library
:
-
exclude
:
deps/
# The default behavior is to tag template files as `template`. Results are hidden
# for template files. You can tag further files as being template files by adding
# them to the `template` section.
template
:
#- exclude: path/to/template/code/**/*.c
# Define your own category, for example: 'some_custom_category'.
some_custom_category
:
# Classify all files in the top-level directory tools/ (or the top-level file
# called tools).
# - tools
#########################################################################################
# Use the `queries` block to change the default display of query results. #
#########################################################################################
# queries:
# Start by hiding the results of all queries.
# - exclude: "*"
# Then include all queries tagged 'security' and 'correctness', and with a severity of
# 'error'.
# - include:
# tags:
# - "security"
# - "correctness"
# severity: "error"
# Specifically hide the results of two queries.
# - exclude: cpp/use-of-goto
# - exclude: java/equals-on-unrelated-types
# Refine by including the `java/command-line-injection` query.
# - include: java/command-line-injection
#########################################################################################
# Define changes to the default code extraction process. #
# Each block configures the extraction of a single language, and modifies actions in a #
# named step. Every named step includes automatic default actions, #
# except for the 'prepare' step. The steps are performed in the following sequence: #
# prepare #
# after_prepare #
# configure (C/C++ only) #
# python_setup (Python only) #
# before_index #
# index #
##########################################################################################
#########################################################################################
# Environment variables available to the steps: #
#########################################################################################
# LGTM_SRC
# The root of the source tree.
# LGTM_WORKSPACE
# An existing (initially empty) folder outside the source tree.
# Used for temporary download and setup commands.
#########################################################################################
# Use the extraction block to define changes to the default code extraction process #
# for one or more languages. The settings for each language are defined in a child #
# block, with one or more steps. #
#########################################################################################
extraction
:
# Define settings for C/C++ analysis
#####################################
cpp
:
# The `prepare` step exists for customization on LGTM.com only.
prepare
:
# # The `packages` section is valid for LGTM.com only. It names Ubuntu packages to
# # be installed.
packages
:
-
cmake
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# This step is useful for C/C++ analysis where you want to prepare the environment
# for the `configure` step without changing the default behavior for that step.
# after_prepare:
#- export GNU_MAKE=make
#- export GIT=true
# The `configure` step generates build configuration files which the `index` step
# then uses to build the codebase.
configure
:
command
:
-
mkdir build
-
cd build
-
cmake ..
# - ./prepare_deps
# Optional step. You should add a `before_index` step if you need to run commands
# before the `index` step.
# before_index:
# - export BOOST_DIR=$LGTM_SRC/boost
# - export GTEST_DIR=$LGTM_SRC/googletest
# - export HUNSPELL_DIR=$LGTM_SRC/hunspell
# - export CRYPTOPP_DIR=$LGTM_SRC/cryptopp
# The `index` step builds the code and extracts information during the build
# process.
index
:
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
build_command
:
-
cd build
-
make
# - $GNU_MAKE -j2 -s
# Specify that all project or solution files should be used for extraction.
# Default: false.
# all_solutions: true
# Specify a list of one or more project or solution files for extraction.
# Default: LGTM chooses the file closest to the root of the repository (this may
# fail if there are multiple candidates).
# solution:
# - myProject.sln
# Specify MSBuild settings
# msbuild:
# Specify a list of additional arguments to MSBuild. Default: empty.
# arguments: /p:Platform=x64 /p:Configuration=Release
# Specify the MSBuild configuration to use, for example, debug or release.
# Default: read from the solution file or files.
# configuration:
# Specify the platform to target, for example: x86, x64, or Any CPU.
# Default: read from the solution file or files.
# platform:
# Specify the MSBuild target. Default: rebuild.
# target:
# Specify whether or not to perform a NuGet restore for extraction. Default: true.
# nuget_restore: false
# Specify a version of Microsoft Visual Studio to use for MSBuild or any custom
# build commands (build_command). For example:
# 10 for Visual Studio 2010
# 12 for Visual Studio 2012
# 14 for Visual Studio 2015
# 15 for Visual Studio 2017
# Default: read from project files.
# vstools_version: 10
# Define settings for C# analysis
##################################
# csharp:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the `prepare` step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step builds the code and extracts information during the build
# process.
#index:
# Specify that all project or solution files should be used for extraction.
# Default: false.
# all_solutions: true
# Specify a list of one or more project or solution files for extraction.
# Default: LGTM chooses the file closest to the root of the repository (this may
# fail if there are multiple candidates).
# solution:
# - myProject.sln
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command:
# - ./example-compile-all.sh
# By default, LGTM analyzes the code by building it. You can override this,
# and tell LGTM not to build the code. Beware that this can lead
# to less accurate results.
# buildless: true
# Specify .NET Core settings.
# dotnet:
# Specify additional arguments to `dotnet build`.
# Default: empty.
# arguments: "example_arg"
# Specify the version of .NET Core SDK to use.
# Default: The version installed on the build machine.
# version: 2.1
# Specify MSBuild settings.
# msbuild:
# Specify a list of additional arguments to MSBuild. Default: empty.
# arguments: /P:WarningLevel=2
# Specify the MSBuild configuration to use, for example, debug or release.
# Default: read from the solution file or files.
# configuration: release
# Specify the platform to target, for example: x86, x64, or Any CPU.
# Default: read from the solution file or files.
# platform: x86
# Specify the MSBuild target. Default: rebuild.
# target: notest
# Specify whether or not to perform a NuGet restore for extraction. Default: true.
# nuget_restore: false
# Specify a version of Microsoft Visual Studio to use for MSBuild or any custom
# build commands (build_command). For example:
# 10 for Visual Studio 2010
# 12 for Visual Studio 2012
# 14 for Visual Studio 2015
# 15 for Visual Studio 2017
# Default: read from project files
# vstools_version: 10
# Specify additional options for the extractor,
# for example --fast to perform a faster extraction that produces a smaller
# database.
# extractor: "--fast"
# Define settings for Go analysis
##################################
# go:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the `prepare` step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step builds the code and extracts information during the build
# process.
# index:
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command:
# - ./compile-all.sh
# Define settings for Java analysis
####################################
# java:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step extracts information from the files in the codebase.
# index:
# Specify Gradle settings.
# gradle:
# Specify the required Gradle version.
# Default: determined automatically.
# version: 4.4
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command: ./compile-all.sh
# Specify the Java version required to build the project.
# java_version: 11
# Specify whether to extract Java .properties files
# Default: false
# properties_files: true
# Specify Maven settings.
# maven:
# Specify the path (absolute or relative) of a Maven settings file to use.
# Default: Maven uses a settings file in the default location, if it exists.
# settings_file: /opt/share/settings.xml
# Specify the path of a Maven toolchains file.
# Default: Maven uses a toolchains file in the default location, if it exists.
# toolchains_file: /opt/share/toolchains.xml
# Specify the required Maven version.
# Default: the Maven version is determined automatically, where feasible.
# version: 3.5.2
# Specify how XML files should be extracted:
# all = extract all XML files.
# default = only extract XML files named `AndroidManifest.xml`, `pom.xml`, and `web.xml`.
# disabled = do not extract any XML files.
# xml_mode: all
# Define settings for JavaScript analysis
##########################################
# javascript:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step extracts information from the files in the codebase.
# index:
# Specify a list of files and folders to extract.
# Default: The project root directory.
# include:
# - src/js
# Specify a list of files and folders to exclude from extraction.
# exclude:
# - thirdparty/lib
# You can add additional file types for LGTM to extract, by mapping file
# extensions (including the leading dot) to file types. The usual
# include/exclude patterns apply, so, for example, `.jsm` files under
# `thirdparty/lib` will not be extracted.
# filetypes:
# ".jsm": "js"
# ".tmpl": "html"
# Specify a list of glob patterns to include/exclude files from extraction; this
# is applied on top of the include/exclude paths from above; patterns are
# processed in the same way as for path classifiers above.
# Default: include all files with known extensions (such as .js, .ts and .html),
# but exclude files ending in `-min.js` or `.min.js` and folders named `node_modules`
# or `bower_components`
# filters:
# exclude any *.ts files anywhere.
# - exclude: "**/*.ts"
# but include *.ts files under src/js/typescript.
# - include: "src/js/typescript/**/*.ts"
# Specify how TypeScript files should be extracted:
# none = exclude all TypeScript files.
# basic = extract syntactic information from TypeScript files.
# full = extract syntactic and type information from TypeScript files.
# Default: full.
# typescript: basic
# By default, LGTM doesn't extract any XML files. You can override this by
# using the `xml_mode` property and setting it to `all`.
# xml_mode: all
# Define settings for Python analysis
######################################
# python:
# # The `prepare` step exists for customization on LGTM.com only.
# # prepare:
# # # The `packages` section is valid for LGTM.com only. It names packages to
# # # be installed.
# # packages: libpng-dev
# # This step is useful for Python analysis where you want to prepare the
# # environment for the `python_setup` step without changing the default behavior
# # for that step.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# # This sets up the Python interpreter and virtual environment, ready for the
# # `index` step to extract the codebase.
# python_setup:
# # Specify packages that should NOT be installed despite being mentioned in the
# # requirements.txt file.
# # Default: no package marked for exclusion.
# exclude_requirements:
# - pywin32
# # Specify a list of pip packages to install.
# # If any of these packages cannot be installed, the extraction will fail.
# requirements:
# - Pillow
# # Specify a list of requirements text files to use to set up the environment,
# # or false for none. Default: any requirements.txt, test-requirements.txt,
# # and similarly named files identified in the codebase are used.
# requirements_files:
# - required-packages.txt
# # Specify a setup.py file to use to set up the environment, or false for none.
# # Default: any setup.py files identified in the codebase are used in preference
# # to any requirements text files.
# setup_py: new-setup.py
# # Override the version of the Python interpreter used for setup and extraction
# # Default: Python 3.
# version: 2
# # Optional step. You should add a `before_index` step if you need to run commands
# # before the `index` step.
# before_index:
# - antlr4 -Dlanguage=Python3 Grammar.g4
# # The `index` step extracts information from the files in the codebase.
# index:
# # Specify a list of files and folders to exclude from extraction.
# # Default: Git submodules and Subversion externals.
# exclude:
# - legacy-implementation
# - thirdparty/libs
# filters:
# - exclude: "**/documentation/examples/snippets/*.py"
# - include: "**/documentation/examples/test_application/*"
# include:
# - example/to/include
README.md
浏览文件 @
0aee0028
...
...
@@ -83,12 +83,18 @@ sudo dnf install -y maven
## Get the source codes
-
github:
First of all, you may clone the source codes from
github:
```
bash
git clone https://github.com/taosdata/TDengine.git
cd
TDengine
```
The connectors for go & grafana have been moved to separated repositories,
so you should run this command in the TDengine directory to install them:
```
bash
git submodule update
--init
--recursive
```
## Build TDengine
### On Linux platform
...
...
go
@
8c58c512
比较
567b7b12
...
8c58c512
Subproject commit
567b7b12f3fd2775c718d284beffc8c38dd6c219
Subproject commit
8c58c512b6acda8bcdfa48fdc7140227b5221766
src/rpc/src/rpcMain.c
浏览文件 @
0aee0028
...
...
@@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static
void
rpcSendReqHead
(
SRpcConn
*
pConn
);
static
void
*
rpcProcessMsgFromPeer
(
SRecvInfo
*
pRecv
);
static
void
rpcProcessIncomingMsg
(
SRpcConn
*
pConn
,
SRpcHead
*
pHead
);
static
void
rpcProcessIncomingMsg
(
SRpcConn
*
pConn
,
SRpcHead
*
pHead
,
SRpcReqContext
*
pContext
);
static
void
rpcProcessConnError
(
void
*
param
,
void
*
id
);
static
void
rpcProcessRetryTimer
(
void
*
,
void
*
);
static
void
rpcProcessIdleTimer
(
void
*
param
,
void
*
tmrId
);
...
...
@@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) {
tError
(
"failed to malloc msg, size:%d"
,
size
);
return
NULL
;
}
else
{
t
Debug
(
"malloc msg
: %p"
,
start
);
t
Trace
(
"malloc mem
: %p"
,
start
);
}
return
start
+
sizeof
(
SRpcReqContext
)
+
sizeof
(
SRpcHead
);
...
...
@@ -333,7 +333,7 @@ void rpcFreeCont(void *cont) {
if
(
cont
)
{
char
*
temp
=
((
char
*
)
cont
)
-
sizeof
(
SRpcHead
)
-
sizeof
(
SRpcReqContext
);
free
(
temp
);
t
Debug
(
"free mem: %p"
,
temp
);
t
Trace
(
"free mem: %p"
,
temp
);
}
}
...
...
@@ -553,7 +553,7 @@ static void rpcFreeMsg(void *msg) {
if
(
msg
)
{
char
*
temp
=
(
char
*
)
msg
-
sizeof
(
SRpcReqContext
);
free
(
temp
);
t
Debug
(
"free msg
: %p"
,
temp
);
t
Trace
(
"free mem
: %p"
,
temp
);
}
}
...
...
@@ -819,9 +819,18 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
return
TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED
;
}
if
(
rpcContLenFromMsg
(
pHead
->
msgLen
)
<=
0
)
{
tDebug
(
"%s, message body is empty, ignore"
,
pConn
->
info
);
return
TSDB_CODE_RPC_APP_ERROR
;
}
pConn
->
inTranId
=
pHead
->
tranId
;
pConn
->
inType
=
pHead
->
msgType
;
// start the progress timer to monitor the response from server app
if
(
pConn
->
connType
!=
RPC_CONN_TCPS
)
pConn
->
pTimer
=
taosTmrStart
(
rpcProcessProgressTimer
,
tsProgressTimer
,
pConn
,
pConn
->
pRpc
->
tmrCtrl
);
return
0
;
}
...
...
@@ -881,17 +890,32 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
pConn
->
outType
=
0
;
pConn
->
pReqMsg
=
NULL
;
pConn
->
reqMsgLen
=
0
;
SRpcReqContext
*
pContext
=
pConn
->
pContext
;
if
(
pHead
->
code
==
TSDB_CODE_RPC_REDIRECT
)
{
if
(
rpcContLenFromMsg
(
pHead
->
msgLen
)
<
sizeof
(
SRpcEpSet
))
{
// if EpSet is not included in the msg, treat it as NOT_READY
pHead
->
code
=
TSDB_CODE_RPC_NOT_READY
;
}
else
{
pContext
->
redirect
++
;
if
(
pContext
->
redirect
>
TSDB_MAX_REPLICA
)
{
pHead
->
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
tWarn
(
"%s, too many redirects, quit"
,
pConn
->
info
);
}
}
}
return
TSDB_CODE_SUCCESS
;
}
static
SRpcConn
*
rpcProcessMsgHead
(
SRpcInfo
*
pRpc
,
SRecvInfo
*
pRecv
)
{
static
SRpcConn
*
rpcProcessMsgHead
(
SRpcInfo
*
pRpc
,
SRecvInfo
*
pRecv
,
SRpcReqContext
**
ppContext
)
{
int32_t
sid
;
SRpcConn
*
pConn
=
NULL
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
pRecv
->
msg
;
sid
=
htonl
(
pHead
->
destId
);
*
ppContext
=
NULL
;
if
(
pHead
->
msgType
>=
TSDB_MSG_TYPE_MAX
||
pHead
->
msgType
<=
0
)
{
tDebug
(
"%s sid:%d, invalid message type:%d"
,
pRpc
->
label
,
sid
,
pHead
->
msgType
);
...
...
@@ -945,6 +969,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn
->
pIdleTimer
=
taosTmrStart
(
rpcProcessIdleTimer
,
tsRpcTimer
*
2
,
pConn
,
pRpc
->
tmrCtrl
);
}
else
{
terrno
=
rpcProcessRspHead
(
pConn
,
pHead
);
*
ppContext
=
pConn
->
pContext
;
}
}
...
...
@@ -1009,7 +1034,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
}
terrno
=
0
;
pConn
=
rpcProcessMsgHead
(
pRpc
,
pRecv
);
SRpcReqContext
*
pContext
;
pConn
=
rpcProcessMsgHead
(
pRpc
,
pRecv
,
&
pContext
);
if
(
pHead
->
msgType
>=
1
&&
pHead
->
msgType
<
TSDB_MSG_TYPE_MAX
)
{
tDebug
(
"%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x"
,
pRpc
->
label
,
...
...
@@ -1029,7 +1055,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
tDebug
(
"%s %p %p, %s is sent with error code:0x%x"
,
pRpc
->
label
,
pConn
,
(
void
*
)
pHead
->
ahandle
,
taosMsg
[
pHead
->
msgType
+
1
],
code
);
}
}
else
{
// msg is passed to app only parsing is ok
rpcProcessIncomingMsg
(
pConn
,
pHead
);
rpcProcessIncomingMsg
(
pConn
,
pHead
,
pContext
);
}
}
...
...
@@ -1060,7 +1086,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
rpcFreeCont
(
pContext
->
pCont
);
}
static
void
rpcProcessIncomingMsg
(
SRpcConn
*
pConn
,
SRpcHead
*
pHead
)
{
static
void
rpcProcessIncomingMsg
(
SRpcConn
*
pConn
,
SRpcHead
*
pHead
,
SRpcReqContext
*
pContext
)
{
SRpcInfo
*
pRpc
=
pConn
->
pRpc
;
SRpcMsg
rpcMsg
;
...
...
@@ -1070,29 +1096,18 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcMsg
.
pCont
=
pHead
->
content
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
code
=
pHead
->
code
;
rpcMsg
.
ahandle
=
pConn
->
ahandle
;
if
(
rpcIsReq
(
pHead
->
msgType
)
)
{
if
(
rpcMsg
.
contLen
>
0
)
{
rpcMsg
.
handle
=
pConn
;
rpcAddRef
(
pRpc
);
// add the refCount for requests
rpcMsg
.
ahandle
=
pConn
->
ahandle
;
rpcMsg
.
handle
=
pConn
;
rpcAddRef
(
pRpc
);
// add the refCount for requests
// start the progress timer to monitor the response from server app
if
(
pConn
->
connType
!=
RPC_CONN_TCPS
)
pConn
->
pTimer
=
taosTmrStart
(
rpcProcessProgressTimer
,
tsProgressTimer
,
pConn
,
pRpc
->
tmrCtrl
);
// notify the server app
(
*
(
pRpc
->
cfp
))(
&
rpcMsg
,
NULL
);
}
else
{
tDebug
(
"%s, message body is empty, ignore"
,
pConn
->
info
);
rpcFreeCont
(
rpcMsg
.
pCont
);
}
// notify the server app
(
*
(
pRpc
->
cfp
))(
&
rpcMsg
,
NULL
);
}
else
{
// it's a response
SRpcReqContext
*
pContext
=
pConn
->
pContext
;
rpcMsg
.
handle
=
pContext
;
pConn
->
pContext
=
NULL
;
pConn
->
pReqMsg
=
NULL
;
rpcMsg
.
ahandle
=
pContext
->
ahandle
;
// for UDP, port may be changed by server, the port in epSet shall be used for cache
if
(
pHead
->
code
!=
TSDB_CODE_RPC_TOO_SLOW
)
{
...
...
@@ -1101,19 +1116,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcCloseConn
(
pConn
);
}
if
(
pHead
->
code
==
TSDB_CODE_RPC_REDIRECT
)
{
if
(
rpcMsg
.
contLen
<
sizeof
(
SRpcEpSet
))
{
// if EpSet is not included in the msg, treat it as NOT_READY
pHead
->
code
=
TSDB_CODE_RPC_NOT_READY
;
}
else
{
pContext
->
redirect
++
;
if
(
pContext
->
redirect
>
TSDB_MAX_REPLICA
)
{
pHead
->
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
tWarn
(
"%s, too many redirects, quit"
,
pConn
->
info
);
}
}
}
if
(
pHead
->
code
==
TSDB_CODE_RPC_REDIRECT
)
{
pContext
->
numOfTry
=
0
;
SRpcEpSet
*
pEpSet
=
(
SRpcEpSet
*
)
pHead
->
content
;
...
...
@@ -1445,7 +1447,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
pNewHead
->
msgLen
=
rpcMsgLenFromCont
(
origLen
);
rpcFreeMsg
(
pHead
);
// free the compressed message buffer
pHead
=
pNewHead
;
//tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen
);
tTrace
(
"decomp malloc mem: %p"
,
temp
);
}
else
{
tError
(
"failed to allocate memory to decompress msg, contLen:%d"
,
contLen
);
}
...
...
src/rpc/src/rpcTcp.c
浏览文件 @
0aee0028
...
...
@@ -62,7 +62,7 @@ typedef struct {
char
label
[
TSDB_LABEL_LEN
];
int
numOfThreads
;
void
*
shandle
;
SThreadObj
*
pThreadObj
;
SThreadObj
*
*
pThreadObj
;
pthread_t
thread
;
}
SServerObj
;
...
...
@@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
tstrncpy
(
pServerObj
->
label
,
label
,
sizeof
(
pServerObj
->
label
));
pServerObj
->
numOfThreads
=
numOfThreads
;
pServerObj
->
pThreadObj
=
(
SThreadObj
*
)
calloc
(
sizeof
(
SThreadObj
),
numOfThreads
);
pServerObj
->
pThreadObj
=
(
SThreadObj
*
*
)
calloc
(
sizeof
(
SThreadObj
*
),
numOfThreads
);
if
(
pServerObj
->
pThreadObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
// initialize parameters in case it may encounter error later
pThreadObj
=
pServerObj
->
pThreadObj
;
for
(
int
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pThreadObj
=
(
SThreadObj
*
)
calloc
(
sizeof
(
SThreadObj
),
1
);
if
(
pThreadObj
==
NULL
)
{
tError
(
"TCP:%s no enough memory"
,
label
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
for
(
int
j
=
0
;
j
<
i
;
++
j
)
free
(
pServerObj
->
pThreadObj
[
j
]);
free
(
pServerObj
->
pThreadObj
);
free
(
pServerObj
);
return
NULL
;
}
pServerObj
->
pThreadObj
[
i
]
=
pThreadObj
;
pThreadObj
->
pollFd
=
-
1
;
taosResetPthread
(
&
pThreadObj
->
thread
);
pThreadObj
->
processData
=
fp
;
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
));
pThreadObj
->
shandle
=
shandle
;
pThreadObj
++
;
}
// initialize mutex, thread, fd which may fail
pThreadObj
=
pServerObj
->
pThreadObj
;
for
(
int
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pThreadObj
=
pServerObj
->
pThreadObj
[
i
];
code
=
pthread_mutex_init
(
&
(
pThreadObj
->
mutex
),
NULL
);
if
(
code
<
0
)
{
tError
(
"%s failed to init TCP process data mutex(%s)"
,
label
,
strerror
(
errno
));
...
...
@@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
}
pThreadObj
->
threadId
=
i
;
pThreadObj
++
;
}
pServerObj
->
fd
=
taosOpenTcpServerSocket
(
pServerObj
->
ip
,
pServerObj
->
port
);
...
...
@@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj
->
stop
=
true
;
eventfd_t
fd
=
-
1
;
if
(
taosComparePthread
(
pThreadObj
->
thread
,
pthread_self
()))
{
pthread_detach
(
pthread_self
());
return
;
}
if
(
taosCheckPthreadValid
(
pThreadObj
->
thread
)
&&
pThreadObj
->
pollFd
>=
0
)
{
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
...
...
@@ -183,15 +196,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
}
}
if
(
taosCheckPthreadValid
(
pThreadObj
->
thread
))
pthread_join
(
pThreadObj
->
thread
,
NULL
);
if
(
pThreadObj
->
pollFd
>=
0
)
taosCloseSocket
(
pThreadObj
->
pollFd
);
if
(
fd
!=
-
1
)
taosCloseSocket
(
fd
);
while
(
pThreadObj
->
pHead
)
{
SFdObj
*
pFdObj
=
pThreadObj
->
pHead
;
pThreadObj
->
pHead
=
pFdObj
->
next
;
taosFreeFdObj
(
pFdObj
);
if
(
taosCheckPthreadValid
(
pThreadObj
->
thread
)
&&
pThreadObj
->
pollFd
>=
0
)
{
pthread_join
(
pThreadObj
->
thread
,
NULL
);
}
if
(
fd
!=
-
1
)
taosCloseSocket
(
fd
);
}
void
taosStopTcpServer
(
void
*
handle
)
{
...
...
@@ -199,7 +208,14 @@ void taosStopTcpServer(void *handle) {
if
(
pServerObj
==
NULL
)
return
;
if
(
pServerObj
->
fd
>=
0
)
shutdown
(
pServerObj
->
fd
,
SHUT_RD
);
if
(
taosCheckPthreadValid
(
pServerObj
->
thread
))
pthread_join
(
pServerObj
->
thread
,
NULL
);
if
(
taosCheckPthreadValid
(
pServerObj
->
thread
))
{
if
(
taosComparePthread
(
pServerObj
->
thread
,
pthread_self
()))
{
pthread_detach
(
pthread_self
());
}
else
{
pthread_join
(
pServerObj
->
thread
,
NULL
);
}
}
tDebug
(
"%s TCP server is stopped"
,
pServerObj
->
label
);
}
...
...
@@ -210,9 +226,8 @@ void taosCleanUpTcpServer(void *handle) {
if
(
pServerObj
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
pServerObj
->
numOfThreads
;
++
i
)
{
pThreadObj
=
pServerObj
->
pThreadObj
+
i
;
pThreadObj
=
pServerObj
->
pThreadObj
[
i
]
;
taosStopTcpThread
(
pThreadObj
);
pthread_mutex_destroy
(
&
(
pThreadObj
->
mutex
));
}
tDebug
(
"%s TCP server is cleaned up"
,
pServerObj
->
label
);
...
...
@@ -249,7 +264,7 @@ static void *taosAcceptTcpConnection(void *arg) {
taosSetSockOpt
(
connFd
,
SOL_SOCKET
,
SO_RCVTIMEO
,
&
to
,
sizeof
(
to
));
// pick up the thread to handle this connection
pThreadObj
=
pServerObj
->
pThreadObj
+
threadId
;
pThreadObj
=
pServerObj
->
pThreadObj
[
threadId
]
;
SFdObj
*
pFdObj
=
taosMallocFdObj
(
pThreadObj
,
connFd
);
if
(
pFdObj
)
{
...
...
@@ -327,10 +342,8 @@ void taosCleanUpTcpClient(void *chandle) {
SThreadObj
*
pThreadObj
=
chandle
;
if
(
pThreadObj
==
NULL
)
return
;
tDebug
(
"%s TCP client will be cleaned up"
,
pThreadObj
->
label
);
taosStopTcpThread
(
pThreadObj
);
tDebug
(
"%s TCP client is cleaned up"
,
pThreadObj
->
label
);
taosTFree
(
pThreadObj
);
}
void
*
taosOpenTcpClientConnection
(
void
*
shandle
,
void
*
thandle
,
uint32_t
ip
,
uint16_t
port
)
{
...
...
@@ -365,7 +378,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
void
taosCloseTcpConnection
(
void
*
chandle
)
{
SFdObj
*
pFdObj
=
chandle
;
if
(
pFdObj
==
NULL
)
return
;
if
(
pFdObj
==
NULL
||
pFdObj
->
signature
!=
pFdObj
)
return
;
SThreadObj
*
pThreadObj
=
pFdObj
->
pThreadObj
;
tDebug
(
"%s %p TCP connection will be closed, FD:%p"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
pFdObj
);
...
...
@@ -378,7 +391,7 @@ void taosCloseTcpConnection(void *chandle) {
int
taosSendTcpData
(
uint32_t
ip
,
uint16_t
port
,
void
*
data
,
int
len
,
void
*
chandle
)
{
SFdObj
*
pFdObj
=
chandle
;
if
(
chandle
==
NULL
)
return
-
1
;
if
(
pFdObj
==
NULL
||
pFdObj
->
signature
!=
pFdObj
)
return
-
1
;
return
taosWriteMsg
(
pFdObj
->
fd
,
data
,
len
);
}
...
...
@@ -425,7 +438,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
tError
(
"%s %p TCP malloc(size:%d) fail"
,
pThreadObj
->
label
,
pFdObj
->
thandle
,
msgLen
);
return
-
1
;
}
else
{
t
Debug
(
"TCP malloc mem: %p"
,
buffer
);
t
Trace
(
"TCP malloc mem: %p"
,
buffer
);
}
msg
=
buffer
+
tsRpcOverhead
;
...
...
@@ -503,8 +516,22 @@ static void *taosProcessTcpData(void *param) {
pFdObj
->
thandle
=
(
*
(
pThreadObj
->
processData
))(
&
recvInfo
);
if
(
pFdObj
->
thandle
==
NULL
)
taosFreeFdObj
(
pFdObj
);
}
if
(
pThreadObj
->
stop
)
break
;
}
if
(
pThreadObj
->
pollFd
>=
0
)
taosCloseSocket
(
pThreadObj
->
pollFd
);
while
(
pThreadObj
->
pHead
)
{
SFdObj
*
pFdObj
=
pThreadObj
->
pHead
;
pThreadObj
->
pHead
=
pFdObj
->
next
;
taosFreeFdObj
(
pFdObj
);
}
pthread_mutex_destroy
(
&
(
pThreadObj
->
mutex
));
tDebug
(
"%s TCP thread exits ..."
,
pThreadObj
->
label
);
taosTFree
(
pThreadObj
);
return
NULL
;
}
...
...
src/rpc/src/rpcUdp.c
浏览文件 @
0aee0028
...
...
@@ -214,7 +214,7 @@ static void *taosRecvUdpData(void *param) {
tError
(
"%s failed to allocate memory, size:%"
PRId64
,
pConn
->
label
,
(
int64_t
)
dataLen
);
continue
;
}
else
{
t
Debug
(
"UDP malloc mem: %p"
,
tmsg
);
t
Trace
(
"UDP malloc mem: %p"
,
tmsg
);
}
tmsg
+=
tsRpcOverhead
;
// overhead for SRpcReqContext
...
...
src/sync/src/taosTcpPool.c
浏览文件 @
0aee0028
...
...
@@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) {
continue
;
}
}
}
if
(
pThread
->
stop
)
break
;
}
uDebug
(
"%p TCP epoll thread exits"
,
pThread
);
...
...
@@ -321,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) {
}
pthread_join
(
thread
,
NULL
);
taosClose
(
fd
);
if
(
fd
>=
0
)
taosClose
(
fd
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录