提交 584ad689 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/release/s103' into release/s103

##########################################################################################
# Customize file classifications. #
# Results from files under any classifier will be excluded from LGTM #
# statistics. #
##########################################################################################
##########################################################################################
# Use the `path_classifiers` block to define changes to the default classification of #
# files. #
##########################################################################################
path_classifiers:
# docs:
# Identify the top-level file called `generate_javadoc.py` as documentation-related.
test:
# Override LGTM's default classification of test files by excluding all files.
- exclude: /
# Classify all files in the top-level directories tests/ and testsuites/ as test code.
- tests
# - testsuites
# Classify all files with suffix `.test` as test code.
# Note: use only forward slash / as a path separator.
# Use ** to indicate an arbitrary parent path.
# Use * to indicate any sequence of characters excluding /.
# Always enclose the expression in double quotes if it includes *.
# - "**/*.test"
# Refine the classifications above by excluding files in test/util/.
# - exclude: test/util
# The default behavior is to tag all files created during the
# build as `generated`. Results are hidden for generated code. You can tag
# further files as being generated by adding them to the `generated` section.
generated:
# Exclude all `*.c` files under the `ui/` directory from classification as
# generated code.
# - exclude: ui/**/*.c
# By default, all files not checked into the repository are considered to be
# 'generated'.
# The default behavior is to tag library code as `library`. Results are hidden
# for library code. You can tag further files as being library code by adding them
# to the `library` section.
library:
- exclude: deps/
# The default behavior is to tag template files as `template`. Results are hidden
# for template files. You can tag further files as being template files by adding
# them to the `template` section.
template:
#- exclude: path/to/template/code/**/*.c
# Define your own category, for example: 'some_custom_category'.
some_custom_category:
# Classify all files in the top-level directory tools/ (or the top-level file
# called tools).
# - tools
#########################################################################################
# Use the `queries` block to change the default display of query results. #
#########################################################################################
# queries:
# Start by hiding the results of all queries.
# - exclude: "*"
# Then include all queries tagged 'security' and 'correctness', and with a severity of
# 'error'.
# - include:
# tags:
# - "security"
# - "correctness"
# severity: "error"
# Specifically hide the results of two queries.
# - exclude: cpp/use-of-goto
# - exclude: java/equals-on-unrelated-types
# Refine by including the `java/command-line-injection` query.
# - include: java/command-line-injection
#########################################################################################
# Define changes to the default code extraction process. #
# Each block configures the extraction of a single language, and modifies actions in a #
# named step. Every named step includes automatic default actions, #
# except for the 'prepare' step. The steps are performed in the following sequence: #
# prepare #
# after_prepare #
# configure (C/C++ only) #
# python_setup (Python only) #
# before_index #
# index #
##########################################################################################
#########################################################################################
# Environment variables available to the steps: #
#########################################################################################
# LGTM_SRC
# The root of the source tree.
# LGTM_WORKSPACE
# An existing (initially empty) folder outside the source tree.
# Used for temporary download and setup commands.
#########################################################################################
# Use the extraction block to define changes to the default code extraction process #
# for one or more languages. The settings for each language are defined in a child #
# block, with one or more steps. #
#########################################################################################
extraction:
# Define settings for C/C++ analysis
#####################################
cpp:
# The `prepare` step exists for customization on LGTM.com only.
prepare:
# # The `packages` section is valid for LGTM.com only. It names Ubuntu packages to
# # be installed.
packages:
- cmake
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# This step is useful for C/C++ analysis where you want to prepare the environment
# for the `configure` step without changing the default behavior for that step.
# after_prepare:
#- export GNU_MAKE=make
#- export GIT=true
# The `configure` step generates build configuration files which the `index` step
# then uses to build the codebase.
configure:
command:
- mkdir build
- cd build
- cmake ..
# - ./prepare_deps
# Optional step. You should add a `before_index` step if you need to run commands
# before the `index` step.
# before_index:
# - export BOOST_DIR=$LGTM_SRC/boost
# - export GTEST_DIR=$LGTM_SRC/googletest
# - export HUNSPELL_DIR=$LGTM_SRC/hunspell
# - export CRYPTOPP_DIR=$LGTM_SRC/cryptopp
# The `index` step builds the code and extracts information during the build
# process.
index:
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
build_command:
- cd build
- make
# - $GNU_MAKE -j2 -s
# Specify that all project or solution files should be used for extraction.
# Default: false.
# all_solutions: true
# Specify a list of one or more project or solution files for extraction.
# Default: LGTM chooses the file closest to the root of the repository (this may
# fail if there are multiple candidates).
# solution:
# - myProject.sln
# Specify MSBuild settings
# msbuild:
# Specify a list of additional arguments to MSBuild. Default: empty.
# arguments: /p:Platform=x64 /p:Configuration=Release
# Specify the MSBuild configuration to use, for example, debug or release.
# Default: read from the solution file or files.
# configuration:
# Specify the platform to target, for example: x86, x64, or Any CPU.
# Default: read from the solution file or files.
# platform:
# Specify the MSBuild target. Default: rebuild.
# target:
# Specify whether or not to perform a NuGet restore for extraction. Default: true.
# nuget_restore: false
# Specify a version of Microsoft Visual Studio to use for MSBuild or any custom
# build commands (build_command). For example:
# 10 for Visual Studio 2010
# 12 for Visual Studio 2012
# 14 for Visual Studio 2015
# 15 for Visual Studio 2017
# Default: read from project files.
# vstools_version: 10
# Define settings for C# analysis
##################################
# csharp:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the `prepare` step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step builds the code and extracts information during the build
# process.
#index:
# Specify that all project or solution files should be used for extraction.
# Default: false.
# all_solutions: true
# Specify a list of one or more project or solution files for extraction.
# Default: LGTM chooses the file closest to the root of the repository (this may
# fail if there are multiple candidates).
# solution:
# - myProject.sln
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command:
# - ./example-compile-all.sh
# By default, LGTM analyzes the code by building it. You can override this,
# and tell LGTM not to build the code. Beware that this can lead
# to less accurate results.
# buildless: true
# Specify .NET Core settings.
# dotnet:
# Specify additional arguments to `dotnet build`.
# Default: empty.
# arguments: "example_arg"
# Specify the version of .NET Core SDK to use.
# Default: The version installed on the build machine.
# version: 2.1
# Specify MSBuild settings.
# msbuild:
# Specify a list of additional arguments to MSBuild. Default: empty.
# arguments: /P:WarningLevel=2
# Specify the MSBuild configuration to use, for example, debug or release.
# Default: read from the solution file or files.
# configuration: release
# Specify the platform to target, for example: x86, x64, or Any CPU.
# Default: read from the solution file or files.
# platform: x86
# Specify the MSBuild target. Default: rebuild.
# target: notest
# Specify whether or not to perform a NuGet restore for extraction. Default: true.
# nuget_restore: false
# Specify a version of Microsoft Visual Studio to use for MSBuild or any custom
# build commands (build_command). For example:
# 10 for Visual Studio 2010
# 12 for Visual Studio 2012
# 14 for Visual Studio 2015
# 15 for Visual Studio 2017
# Default: read from project files
# vstools_version: 10
# Specify additional options for the extractor,
# for example --fast to perform a faster extraction that produces a smaller
# database.
# extractor: "--fast"
# Define settings for Go analysis
##################################
# go:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the `prepare` step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step builds the code and extracts information during the build
# process.
# index:
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command:
# - ./compile-all.sh
# Define settings for Java analysis
####################################
# java:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step extracts information from the files in the codebase.
# index:
# Specify Gradle settings.
# gradle:
# Specify the required Gradle version.
# Default: determined automatically.
# version: 4.4
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command: ./compile-all.sh
# Specify the Java version required to build the project.
# java_version: 11
# Specify whether to extract Java .properties files
# Default: false
# properties_files: true
# Specify Maven settings.
# maven:
# Specify the path (absolute or relative) of a Maven settings file to use.
# Default: Maven uses a settings file in the default location, if it exists.
# settings_file: /opt/share/settings.xml
# Specify the path of a Maven toolchains file.
# Default: Maven uses a toolchains file in the default location, if it exists.
# toolchains_file: /opt/share/toolchains.xml
# Specify the required Maven version.
# Default: the Maven version is determined automatically, where feasible.
# version: 3.5.2
# Specify how XML files should be extracted:
# all = extract all XML files.
# default = only extract XML files named `AndroidManifest.xml`, `pom.xml`, and `web.xml`.
# disabled = do not extract any XML files.
# xml_mode: all
# Define settings for JavaScript analysis
##########################################
# javascript:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step extracts information from the files in the codebase.
# index:
# Specify a list of files and folders to extract.
# Default: The project root directory.
# include:
# - src/js
# Specify a list of files and folders to exclude from extraction.
# exclude:
# - thirdparty/lib
# You can add additional file types for LGTM to extract, by mapping file
# extensions (including the leading dot) to file types. The usual
# include/exclude patterns apply, so, for example, `.jsm` files under
# `thirdparty/lib` will not be extracted.
# filetypes:
# ".jsm": "js"
# ".tmpl": "html"
# Specify a list of glob patterns to include/exclude files from extraction; this
# is applied on top of the include/exclude paths from above; patterns are
# processed in the same way as for path classifiers above.
# Default: include all files with known extensions (such as .js, .ts and .html),
# but exclude files ending in `-min.js` or `.min.js` and folders named `node_modules`
# or `bower_components`
# filters:
# exclude any *.ts files anywhere.
# - exclude: "**/*.ts"
# but include *.ts files under src/js/typescript.
# - include: "src/js/typescript/**/*.ts"
# Specify how TypeScript files should be extracted:
# none = exclude all TypeScript files.
# basic = extract syntactic information from TypeScript files.
# full = extract syntactic and type information from TypeScript files.
# Default: full.
# typescript: basic
# By default, LGTM doesn't extract any XML files. You can override this by
# using the `xml_mode` property and setting it to `all`.
# xml_mode: all
# Define settings for Python analysis
######################################
# python:
# # The `prepare` step exists for customization on LGTM.com only.
# # prepare:
# # # The `packages` section is valid for LGTM.com only. It names packages to
# # # be installed.
# # packages: libpng-dev
# # This step is useful for Python analysis where you want to prepare the
# # environment for the `python_setup` step without changing the default behavior
# # for that step.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# # This sets up the Python interpreter and virtual environment, ready for the
# # `index` step to extract the codebase.
# python_setup:
# # Specify packages that should NOT be installed despite being mentioned in the
# # requirements.txt file.
# # Default: no package marked for exclusion.
# exclude_requirements:
# - pywin32
# # Specify a list of pip packages to install.
# # If any of these packages cannot be installed, the extraction will fail.
# requirements:
# - Pillow
# # Specify a list of requirements text files to use to set up the environment,
# # or false for none. Default: any requirements.txt, test-requirements.txt,
# # and similarly named files identified in the codebase are used.
# requirements_files:
# - required-packages.txt
# # Specify a setup.py file to use to set up the environment, or false for none.
# # Default: any setup.py files identified in the codebase are used in preference
# # to any requirements text files.
# setup_py: new-setup.py
# # Override the version of the Python interpreter used for setup and extraction
# # Default: Python 3.
# version: 2
# # Optional step. You should add a `before_index` step if you need to run commands
# # before the `index` step.
# before_index:
# - antlr4 -Dlanguage=Python3 Grammar.g4
# # The `index` step extracts information from the files in the codebase.
# index:
# # Specify a list of files and folders to exclude from extraction.
# # Default: Git submodules and Subversion externals.
# exclude:
# - legacy-implementation
# - thirdparty/libs
# filters:
# - exclude: "**/documentation/examples/snippets/*.py"
# - include: "**/documentation/examples/test_application/*"
# include:
# - example/to/include
......@@ -83,12 +83,18 @@ sudo dnf install -y maven
## Get the source codes
- github:
First of all, you may clone the source codes from github:
```bash
git clone https://github.com/taosdata/TDengine.git
cd TDengine
```
The connectors for go & grafana have been moved to separated repositories,
so you should run this command in the TDengine directory to install them:
```bash
git submodule update --init --recursive
```
## Build TDengine
### On Linux platform
......
......@@ -229,7 +229,6 @@ typedef struct SQueryInfo {
// TODO refactor
STimeWindow window; // query time window
SInterval interval;
int32_t tz; // query client timezone
SSqlGroupbyExpr groupbyExpr; // group by tags info
SArray * colList; // SArray<SColumn*>
......
......@@ -509,7 +509,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......
......@@ -430,7 +430,7 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) {
SSqlObj* pSql = builder->pInterSql;
if (row == NULL) {
return TSDB_CODE_MND_INVALID_TABLE_NAME;
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
int32_t* lengths = taos_fetch_lengths(pSql);
......@@ -458,7 +458,7 @@ static int32_t tscGetTableTagValue(SCreateBuilder *builder, char *result) {
}
if (0 == strlen(result)) {
return TSDB_CODE_MND_INVALID_TABLE_NAME;
return TSDB_CODE_TSC_INVALID_TABLE_NAME;
}
return TSDB_CODE_SUCCESS;
}
......@@ -554,7 +554,7 @@ int32_t tscRebuildCreateTableStatement(void *param,char *result) {
static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) {
TAOS_ROW row = tscFetchRow(builder);
if (row == NULL) {
return TSDB_CODE_MND_DB_NOT_SELECTED;
return TSDB_CODE_TSC_DB_NOT_SELECTED;
}
const char *showColumns[] = {"REPLICA", "QUORUM", "DAYS", "KEEP", "BLOCKS", NULL};
......@@ -586,7 +586,7 @@ static int32_t tscGetDBInfo(SCreateBuilder *builder, char *result) {
} while (row != NULL);
if (0 == strlen(result)) {
return TSDB_CODE_MND_DB_NOT_SELECTED;
return TSDB_CODE_TSC_DB_NOT_SELECTED;
}
return TSDB_CODE_SUCCESS;
......
......@@ -362,8 +362,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
// additional msg has been attached already
if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
code = tscSetTableFullName(pTableMetaInfo, pToken, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -381,14 +382,15 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
code = tscSetTableFullName(pTableMetaInfo, pToken, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return tscGetTableMeta(pSql, pTableMetaInfo);
}
case TSDB_SQL_SHOW_CREATE_DATABASE: {
const char* msg1 = "invalid database name";
const char* msg2 = "table name is too long";
SStrToken* pToken = &pInfo->pDCLInfo->a[0];
if (tscValidateName(pToken) != TSDB_CODE_SUCCESS) {
......@@ -397,11 +399,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pToken->n > TSDB_DB_NAME_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
return TSDB_CODE_SUCCESS;
}
return tscSetTableFullName(pTableMetaInfo, pToken, pSql);
}
case TSDB_SQL_CFG_DNODE: {
const char* msg2 = "invalid configure options or values, such as resetlog / debugFlag 135 / balance 'vnode:2-dnode:2' / monitor 1 ";
const char* msg3 = "invalid dnode ep";
......@@ -805,55 +805,44 @@ int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql) {
const char* msg1 = "name too long";
const char* msg2 = "current database or database name invalid";
SSqlCmd* pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS;
// backup the old name in pTableMetaInfo
size_t size = strlen(pTableMetaInfo->name);
char* oldName = NULL;
if (size > 0) {
oldName = strdup(pTableMetaInfo->name);
}
char oldName[TSDB_TABLE_FNAME_LEN] = {0};
tstrncpy(oldName, pTableMetaInfo->name, tListLen(oldName));
if (hasSpecifyDB(pzTableName)) {
// db has been specified in sql string so we ignore current db path
if (hasSpecifyDB(pzTableName)) { // db has been specified in sql string so we ignore current db path
code = setObjFullName(pTableMetaInfo->name, getAccountId(pSql), NULL, pzTableName, NULL);
if (code != TSDB_CODE_SUCCESS) {
if (code != 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
} else { // get current DB name first, then set it into path
} else { // get current DB name first, and then set it into path
SStrToken t = {0};
getCurrentDBName(pSql, &t);
if (t.n == 0) {
code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
if (t.n == 0) { // current database not available or not specified
code = TSDB_CODE_TSC_DB_NOT_SELECTED;
} else {
code = setObjFullName(pTableMetaInfo->name, NULL, &t, pzTableName, NULL);
if (code != TSDB_CODE_SUCCESS) {
code = invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
if (code != 0) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
}
}
}
if (code != TSDB_CODE_SUCCESS) {
taosTFree(oldName);
return code;
}
/*
* the old name exists and is not equalled to the new name. Release the metermeta/metricmeta
* the old name exists and is not equalled to the new name. Release the table meta
* that are corresponding to the old name for the new table name.
*/
if (size > 0) {
if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) {
tscClearTableMetaInfo(pTableMetaInfo, false);
}
} else {
assert(pTableMetaInfo->pTableMeta == NULL);
if (strlen(oldName) > 0 && strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) {
tscClearTableMetaInfo(pTableMetaInfo, false);
}
taosTFree(oldName);
return TSDB_CODE_SUCCESS;
}
......@@ -4566,6 +4555,8 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char* msg18 = "primary timestamp column cannot be dropped";
const char* msg19 = "invalid new tag name";
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
SAlterTableSQL* pAlterSQL = pInfo->pAlterInfo;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
......@@ -4576,13 +4567,14 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
code = tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t ret = tscGetTableMeta(pSql, pTableMetaInfo);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -5880,8 +5872,9 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
int32_t code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql);
if(code != TSDB_CODE_SUCCESS) {
return code;
}
if (!validateTableColumnInfo(pFieldList, pCmd) ||
......@@ -5935,15 +5928,16 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
int32_t code = tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// get meter meta from mnode
tstrncpy(pCreateTable->usingInfo.tagdata.name, pStableMeterMetaInfo->name, sizeof(pCreateTable->usingInfo.tagdata.name));
tVariantList* pList = pInfo->pCreateTableInfo->usingInfo.pTagVals;
int32_t code = tscGetTableMeta(pSql, pStableMeterMetaInfo);
code = tscGetTableMeta(pSql, pStableMeterMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -6020,7 +6014,6 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg1 = "invalid table name";
const char* msg2 = "table name too long";
const char* msg3 = "fill only available for interval query";
const char* msg4 = "fill option not supported in stream computing";
const char* msg5 = "sql too long"; // todo ADD support
......@@ -6052,11 +6045,12 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (tscSetTableFullName(pTableMetaInfo, &srcToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
int32_t code = tscSetTableFullName(pTableMetaInfo, &srcToken, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t code = tscGetTableMeta(pSql, pTableMetaInfo);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -6083,8 +6077,9 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
}
// set the created table[stream] name
if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
code = tscSetTableFullName(pTableMetaInfo, pzTableName, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pQuerySql->selectToken.n > TSDB_MAX_SAVED_SQL_LEN) {
......@@ -6128,7 +6123,6 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
assert(pQuerySql != NULL && (pQuerySql->from == NULL || pQuerySql->from->nExpr > 0));
const char* msg0 = "invalid table name";
//const char* msg1 = "table name too long";
const char* msg2 = "point interpolation query needs timestamp";
const char* msg5 = "fill only available for interval query";
const char* msg6 = "start(end) time of query range required or time range too large";
......@@ -6200,11 +6194,16 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo, i/2);
SStrToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz};
if (tscSetTableFullName(pTableMetaInfo1, &t, pSql) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
code = tscSetTableFullName(pTableMetaInfo1, &t, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tVariant* pTableItem1 = &pQuerySql->from->a[i + 1].pVar;
if (pTableItem1->nType != TSDB_DATA_TYPE_BINARY) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11);
}
SStrToken aliasName = {.z = pTableItem1->pz, .n = pTableItem1->nLen, .type = TK_STRING};
if (tscValidateName(&aliasName) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg11);
......
......@@ -53,7 +53,10 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
SRpcEpSet* pEpSet = &pSql->epSet;
pEpSet->inUse = 0;
// Issue the query to one of the vnode among a vgroup randomly.
// change the inUse property would not affect the isUse attribute of STableMeta
pEpSet->inUse = rand() % pVgroupInfo->numOfEps;
// apply the FQDN string length check here
bool hasFqdn = false;
......@@ -144,12 +147,13 @@ void tscPrintMgmtEp() {
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return;
if (pObj != pObj->signature) {
tscError("heart beat msg, pObj:%p, signature:%p invalid", pObj, pObj->signature);
return;
}
SSqlObj *pSql = pObj->pHb;
SSqlObj *pSql = tres;
SSqlRes *pRes = &pSql->res;
if (code == 0) {
......@@ -170,10 +174,17 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
}
} else {
tscDebug("heart beat failed, code:%s", tstrerror(code));
tscDebug("heartbeat failed, code:%s", tstrerror(code));
}
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
if (pObj->pHb != NULL) {
int32_t waitingDuring = tsShellActivityTimer * 500;
tscDebug("%p start heartbeat in %dms", pSql, waitingDuring);
taosTmrReset(tscProcessActivityTimer, waitingDuring, pObj, tscTmr, &pObj->pTimer);
} else {
tscDebug("%p start to close tscObj:%p, not send heartbeat again", pSql, pObj);
}
}
void tscProcessActivityTimer(void *handle, void *tmrId) {
......@@ -249,6 +260,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSqlCmd *pCmd = &pSql->cmd;
assert(*pSql->self == pSql);
pSql->pRpcCtx = NULL;
if (pObj->signature != pObj) {
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
......@@ -258,8 +270,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return;
}
pSql->pRpcCtx = NULL; // clear the rpcCtx
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
......@@ -474,6 +484,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
if (pSub->pRpcCtx != NULL) {
rpcCancelRequest(pSub->pRpcCtx);
pSub->pRpcCtx = NULL;
}
tscQueueAsyncRes(pSub); // async res? not other functions?
......
......@@ -698,6 +698,7 @@ void taos_stop_query(TAOS_RES *res) {
tscKillSTableQuery(pSql);
} else {
if (pSql->cmd.command < TSDB_SQL_LOCAL) {
assert(pSql->pRpcCtx != NULL);
rpcCancelRequest(pSql->pRpcCtx);
}
}
......
Subproject commit 06ec30a0f1762e8169bf6b9045c82bcaa52bcdf0
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
......@@ -468,8 +468,24 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
if (!mnodeIsRunning()) {
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
dInfo("mnode index:%d %s:%u should work as master", i, pEpSet->fqdn[i], pEpSet->port[i]);
sdbUpdateSync();
dInfo("mnode index:%d %s:%u should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]);
bool find = false;
for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) {
if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) {
dInfo("localEp found in mnode infos");
find = true;
break;
}
}
if (!find) {
dInfo("localEp not found in mnode infos, will set into mnode infos");
tstrncpy(tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeEp, tsLocalEp, TSDB_EP_LEN);
tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeId = dnodeGetDnodeId();
tsDMnodeInfos.nodeNum++;
}
dnodeStartMnode();
}
}
}
......
......@@ -146,19 +146,16 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
}
}
bool dnodeCheckMnodeStarting() {
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) return false;
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
SDMMnodeInfo *node = &mnodes->nodeInfos[i];
if (node->nodeId == dnodeGetDnodeId()) {
uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
dnodeProcessModuleStatus(moduleStatus);
return true;
}
bool dnodeStartMnode() {
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) {
dDebug("mnode module is already started, module status:%d", tsModuleStatus);
return false;
}
return false;
uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
dnodeProcessModuleStatus(moduleStatus);
sdbUpdateSync();
return true;
}
......@@ -187,6 +187,7 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
}
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode);
return;
}
......
......@@ -43,7 +43,7 @@ void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeEpSetForShell(void *epSet);
void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId();
bool dnodeCheckMnodeStarting();
bool dnodeStartMnode();
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
......
......@@ -98,7 +98,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax errr in SQL")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, 0, 0x0216, "Syntax error in SQL")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, 0, 0x0217, "Database not specified or available")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, 0, 0x0218, "Table does not exist")
// mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed")
......
......@@ -91,7 +91,6 @@ typedef struct {
} SSdbWriteWorkerPool;
extern void * tsMnodeTmr;
static void * tsUpdateSyncTmr;
static SSdbObject tsSdbObj = {0};
static taos_qset tsSdbWriteQset;
static taos_qall tsSdbWriteQall;
......@@ -298,16 +297,12 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
taosFreeQitem(pOper);
}
static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(); }
void sdbUpdateSync() {
if (!mnodeIsRunning()) {
mDebug("mnode not start yet, update sync info later");
if (dnodeCheckMnodeStarting()) {
taosTmrReset(sdbUpdateSyncTmrFp, 1000, NULL, tsMnodeTmr, &tsUpdateSyncTmr);
}
return;
}
mDebug("update sync info in sdb");
SSyncCfg syncCfg = {0};
......
......@@ -63,9 +63,10 @@ static FORCE_INLINE int64_t taosGetTimestamp(int32_t precision) {
typedef struct SInterval {
char intervalUnit;
char slidingUnit;
char offsetUnit;
int32_t tz; // query client timezone
char intervalUnit;
char slidingUnit;
char offsetUnit;
int64_t interval;
int64_t sliding;
int64_t offset;
......
......@@ -481,7 +481,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
start = (int64_t)(mktime(&tm) * TSDB_TICK_PER_SECOND(precision));
} else {
int64_t delta = t - pInterval->interval;
int32_t factor = delta > 0 ? 1 : -1;
int32_t factor = (delta >= 0) ? 1 : -1;
start = (delta / pInterval->sliding + factor) * pInterval->sliding;
......
......@@ -2225,10 +2225,11 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) {
return false;
}
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) {
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) {
SQuery *pQuery = pRuntimeEnv->pQuery;
*status = 0;
*status = BLK_DATA_NO_NEEDED;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf > 0) {
*status = BLK_DATA_ALL_NEEDED;
} else { // check if this data block is required to load
......@@ -2240,12 +2241,26 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
}
if ((*status) != BLK_DATA_ALL_NEEDED) {
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
bool hasTimeWindow = false;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey;
STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery);
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo->tid, &win, masterScan, &hasTimeWindow) !=
TSDB_CODE_SUCCESS) {
// todo handle error in set result for timewindow
}
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base;
int32_t functionId = pSqlFunc->functionId;
int32_t colId = pSqlFunc->colInfo.colId;
(*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId);
if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
break;
......@@ -2476,7 +2491,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SArray * pDataBlock = NULL;
uint32_t status = 0;
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
if (ret != TSDB_CODE_SUCCESS) {
break;
}
......@@ -4667,18 +4682,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
}
SDataStatis *pStatis = NULL;
SArray * pDataBlock = NULL;
uint32_t status = 0;
SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL;
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->windowResInfo, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
if (ret != TSDB_CODE_SUCCESS) {
break;
}
if (status == BLK_DATA_DISCARD) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
}
......
......@@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen);
static void rpcSendReqHead(SRpcConn *pConn);
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead);
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext);
static void rpcProcessConnError(void *param, void *id);
static void rpcProcessRetryTimer(void *, void *);
static void rpcProcessIdleTimer(void *param, void *tmrId);
......@@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) {
tError("failed to malloc msg, size:%d", size);
return NULL;
} else {
tDebug("malloc msg: %p", start);
tTrace("malloc mem: %p", start);
}
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
......@@ -333,7 +333,7 @@ void rpcFreeCont(void *cont) {
if (cont) {
char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
free(temp);
tDebug("free mem: %p", temp);
tTrace("free mem: %p", temp);
}
}
......@@ -553,7 +553,7 @@ static void rpcFreeMsg(void *msg) {
if ( msg ) {
char *temp = (char *)msg - sizeof(SRpcReqContext);
free(temp);
tDebug("free msg: %p", temp);
tTrace("free mem: %p", temp);
}
}
......@@ -819,9 +819,18 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
}
if (rpcContLenFromMsg(pHead->msgLen) <= 0) {
tDebug("%s, message body is empty, ignore", pConn->info);
return TSDB_CODE_RPC_APP_ERROR;
}
pConn->inTranId = pHead->tranId;
pConn->inType = pHead->msgType;
// start the progress timer to monitor the response from server app
if (pConn->connType != RPC_CONN_TCPS)
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);
return 0;
}
......@@ -881,17 +890,32 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
pConn->outType = 0;
pConn->pReqMsg = NULL;
pConn->reqMsgLen = 0;
SRpcReqContext *pContext = pConn->pContext;
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) {
// if EpSet is not included in the msg, treat it as NOT_READY
pHead->code = TSDB_CODE_RPC_NOT_READY;
} else {
pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
tWarn("%s, too many redirects, quit", pConn->info);
}
}
}
return TSDB_CODE_SUCCESS;
}
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) {
int32_t sid;
SRpcConn *pConn = NULL;
SRpcHead *pHead = (SRpcHead *)pRecv->msg;
sid = htonl(pHead->destId);
*ppContext = NULL;
if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) {
tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType);
......@@ -945,6 +969,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
} else {
terrno = rpcProcessRspHead(pConn, pHead);
*ppContext = pConn->pContext;
}
}
......@@ -1009,7 +1034,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
}
terrno = 0;
pConn = rpcProcessMsgHead(pRpc, pRecv);
SRpcReqContext *pContext;
pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext);
if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) {
tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label,
......@@ -1029,7 +1055,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
}
} else { // msg is passed to app only parsing is ok
rpcProcessIncomingMsg(pConn, pHead);
rpcProcessIncomingMsg(pConn, pHead, pContext);
}
}
......@@ -1060,7 +1086,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
rpcFreeCont(pContext->pCont);
}
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
SRpcInfo *pRpc = pConn->pRpc;
SRpcMsg rpcMsg;
......@@ -1070,29 +1096,18 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code;
rpcMsg.ahandle = pConn->ahandle;
if ( rpcIsReq(pHead->msgType) ) {
if (rpcMsg.contLen > 0) {
rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests
rpcMsg.ahandle = pConn->ahandle;
rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests
// start the progress timer to monitor the response from server app
if (pConn->connType != RPC_CONN_TCPS)
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
// notify the server app
(*(pRpc->cfp))(&rpcMsg, NULL);
} else {
tDebug("%s, message body is empty, ignore", pConn->info);
rpcFreeCont(rpcMsg.pCont);
}
// notify the server app
(*(pRpc->cfp))(&rpcMsg, NULL);
} else {
// it's a response
SRpcReqContext *pContext = pConn->pContext;
rpcMsg.handle = pContext;
pConn->pContext = NULL;
pConn->pReqMsg = NULL;
rpcMsg.ahandle = pContext->ahandle;
// for UDP, port may be changed by server, the port in epSet shall be used for cache
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
......@@ -1101,19 +1116,6 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcCloseConn(pConn);
}
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
if (rpcMsg.contLen < sizeof(SRpcEpSet)) {
// if EpSet is not included in the msg, treat it as NOT_READY
pHead->code = TSDB_CODE_RPC_NOT_READY;
} else {
pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
tWarn("%s, too many redirects, quit", pConn->info);
}
}
}
if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
pContext->numOfTry = 0;
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;
......@@ -1445,7 +1447,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
pNewHead->msgLen = rpcMsgLenFromCont(origLen);
rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead;
//tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
tTrace("decomp malloc mem: %p", temp);
} else {
tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
}
......
......@@ -62,7 +62,7 @@ typedef struct {
char label[TSDB_LABEL_LEN];
int numOfThreads;
void * shandle;
SThreadObj *pThreadObj;
SThreadObj **pThreadObj;
pthread_t thread;
} SServerObj;
......@@ -90,7 +90,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
pServerObj->numOfThreads = numOfThreads;
pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads);
if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -104,19 +104,28 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
// initialize parameters in case it may encounter error later
pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) {
pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1);
if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pServerObj->pThreadObj[j]);
free(pServerObj->pThreadObj);
free(pServerObj);
return NULL;
}
pServerObj->pThreadObj[i] = pThreadObj;
pThreadObj->pollFd = -1;
taosResetPthread(&pThreadObj->thread);
pThreadObj->processData = fp;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle;
pThreadObj++;
}
// initialize mutex, thread, fd which may fail
pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj[i];
code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
if (code < 0) {
tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
......@@ -137,7 +146,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
}
pThreadObj->threadId = i;
pThreadObj++;
}
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
......@@ -166,6 +174,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
pThreadObj->stop = true;
eventfd_t fd = -1;
if (taosComparePthread(pThreadObj->thread, pthread_self())) {
pthread_detach(pthread_self());
return;
}
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
......@@ -183,15 +196,11 @@ static void taosStopTcpThread(SThreadObj* pThreadObj) {
}
}
if (taosCheckPthreadValid(pThreadObj->thread)) pthread_join(pThreadObj->thread, NULL);
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
if (fd != -1) taosCloseSocket(fd);
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
if (taosCheckPthreadValid(pThreadObj->thread) && pThreadObj->pollFd >= 0) {
pthread_join(pThreadObj->thread, NULL);
}
if (fd != -1) taosCloseSocket(fd);
}
void taosStopTcpServer(void *handle) {
......@@ -199,7 +208,14 @@ void taosStopTcpServer(void *handle) {
if (pServerObj == NULL) return;
if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
if (taosCheckPthreadValid(pServerObj->thread)) pthread_join(pServerObj->thread, NULL);
if (taosCheckPthreadValid(pServerObj->thread)) {
if (taosComparePthread(pServerObj->thread, pthread_self())) {
pthread_detach(pthread_self());
} else {
pthread_join(pServerObj->thread, NULL);
}
}
tDebug("%s TCP server is stopped", pServerObj->label);
}
......@@ -210,9 +226,8 @@ void taosCleanUpTcpServer(void *handle) {
if (pServerObj == NULL) return;
for (int i = 0; i < pServerObj->numOfThreads; ++i) {
pThreadObj = pServerObj->pThreadObj + i;
pThreadObj = pServerObj->pThreadObj[i];
taosStopTcpThread(pThreadObj);
pthread_mutex_destroy(&(pThreadObj->mutex));
}
tDebug("%s TCP server is cleaned up", pServerObj->label);
......@@ -249,7 +264,7 @@ static void *taosAcceptTcpConnection(void *arg) {
taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
// pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj + threadId;
pThreadObj = pServerObj->pThreadObj[threadId];
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
if (pFdObj) {
......@@ -327,10 +342,8 @@ void taosCleanUpTcpClient(void *chandle) {
SThreadObj *pThreadObj = chandle;
if (pThreadObj == NULL) return;
tDebug ("%s TCP client will be cleaned up", pThreadObj->label);
taosStopTcpThread(pThreadObj);
tDebug ("%s TCP client is cleaned up", pThreadObj->label);
taosTFree(pThreadObj);
}
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
......@@ -365,7 +378,7 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
void taosCloseTcpConnection(void *chandle) {
SFdObj *pFdObj = chandle;
if (pFdObj == NULL) return;
if (pFdObj == NULL || pFdObj->signature != pFdObj) return;
SThreadObj *pThreadObj = pFdObj->pThreadObj;
tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj);
......@@ -378,7 +391,7 @@ void taosCloseTcpConnection(void *chandle) {
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
SFdObj *pFdObj = chandle;
if (chandle == NULL) return -1;
if (pFdObj == NULL || pFdObj->signature != pFdObj) return -1;
return taosWriteMsg(pFdObj->fd, data, len);
}
......@@ -425,7 +438,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1;
} else {
tDebug("TCP malloc mem: %p", buffer);
tTrace("TCP malloc mem: %p", buffer);
}
msg = buffer + tsRpcOverhead;
......@@ -503,8 +516,22 @@ static void *taosProcessTcpData(void *param) {
pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
}
if (pThreadObj->stop) break;
}
if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd);
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
pthread_mutex_destroy(&(pThreadObj->mutex));
tDebug("%s TCP thread exits ...", pThreadObj->label);
taosTFree(pThreadObj);
return NULL;
}
......
......@@ -214,7 +214,7 @@ static void *taosRecvUdpData(void *param) {
tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
continue;
} else {
tDebug("UDP malloc mem: %p", tmsg);
tTrace("UDP malloc mem: %p", tmsg);
}
tmsg += tsRpcOverhead; // overhead for SRpcReqContext
......
......@@ -219,7 +219,10 @@ static void *taosProcessTcpData(void *param) {
continue;
}
}
}
if (pThread->stop) break;
}
uDebug("%p TCP epoll thread exits", pThread);
......@@ -321,5 +324,5 @@ static void taosStopPoolThread(SThreadObj *pThread) {
}
pthread_join(thread, NULL);
taosClose(fd);
if (fd >= 0) taosClose(fd);
}
......@@ -1266,6 +1266,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
TSKEY* tsArray = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast);
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
......
......@@ -1516,6 +1516,8 @@ class Task():
if errno in [
0x05, # TSDB_CODE_RPC_NOT_READY
# 0x200, # invalid SQL, TODO: re-examine with TD-934
0x217, # "db not selected", client side defined error code
0x218, # "Table does not exist" client side defined error code
0x360, 0x362,
0x369, # tag already exists
0x36A, 0x36B, 0x36D,
......
......@@ -119,7 +119,7 @@ endi
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d ' used1' 127.0.0.1:7111/rest/sql
print 17-> $system_content
if $system_content != @{"status":"error","code":534,"desc":"Syntax errr in SQL"}@ then
if $system_content != @{"status":"error","code":534,"desc":"Syntax error in SQL"}@ then
return -1
endi
......
......@@ -213,4 +213,53 @@ if $data01 != 5195.000000000 then
return -1
endi
print =======================>td-1596
sql create table t2(ts timestamp, k int)
sql insert into t2 values('2020-1-2 1:1:1', 1);
sql insert into t2 values('2020-2-2 1:1:1', 1);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
sql connect
sleep 1000
sql use db
sql select count(*), first(ts), last(ts) from t2 interval(1d);
if $rows != 2 then
return -1
endi
if $data00 != @20-01-02 00:00:00.000@ then
print expect 20-01-02 00:00:00.000, actual: $data00
return -1
endi
if $data10 != @20-02-02 00:00:00.000@ then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data11 != 1 then
return -1
endi
if $data02 != @20-01-02 01:01:01.000@ then
return -1
endi
if $data12 != @20-02-02 01:01:01.000@ then
return -1
endi
if $data03 != @20-01-02 01:01:01.000@ then
return -1
endi
if $data13 != @20-02-02 01:01:01.000@ then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -238,6 +238,13 @@ if $data11 != @19-01-01 09:10:00.000@ then
endi
sql create table tb_where_NULL (ts timestamp, c1 float, c2 binary(10))
print ===================>td-1604
sql_error insert into tb_where_NULL values(?, ?, ?)
sql_error insert into tb_where_NULL values(now, 1, ?)
sql_error insert into tb_where_NULL values(?, 1, '')
sql_error insert into tb_where_NULL values(now, ?, '12')
sql insert into tb_where_NULL values ('2019-01-01 09:00:00.000', 1, 'val1')
sql insert into tb_where_NULL values ('2019-01-01 09:00:01.000', NULL, NULL)
sql insert into tb_where_NULL values ('2019-01-01 09:00:02.000', 2, 'val2')
......@@ -334,4 +341,5 @@ if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -65,7 +65,7 @@ endi
print ============== step4
sql drop dnode $hostname2
sleep 16000
sleep 10000
sql show mnodes
$dnode1Role = $data2_1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册