提交 9d45a352 编写于 作者: L liuyq-617

Merge branch 'develop' into test/jenkins

......@@ -86,6 +86,13 @@ pipeline {
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh --valgrind -p -t 10 -s 100 -b 4
./handle_crash_gen_val_log.sh
'''
}
sh '''
date
cd ${WKC}/tests
......@@ -131,14 +138,33 @@ pipeline {
sh'''
cd ${WORKSPACE}
git checkout develop
cd tests/gotest
bash batchtest.sh
cd ${WORKSPACE}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single >/dev/null
java -jar target/jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
cd ${WORKSPACE}/tests/examples/python/PYTHONConnectorChecker
python3 PythonChecker.py
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WORKSPACE}/tests/gotest
bash batchtest.sh
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WORKSPACE}/tests/examples/python/PYTHONConnectorChecker
python3 PythonChecker.py
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WORKSPACE}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single >/dev/null
java -jar target/jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${JENKINS_HOME}/workspace/C#NET/src/CheckC#
dotnet run
'''
}
}
}
......@@ -146,5 +172,82 @@ pipeline {
}
}
post {
success {
emailext (
subject: "SUCCESSFUL: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
body: '''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${PROJECT_NAME}</li>
<li>构建结果:<span style="color:green"> Successful </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${CAUSE}</li>
<li>变更概要:${CHANGES}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
<li>变更集:${JELLY_SCRIPT}</li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>''',
to: "yqliu@taosdata.com,pxiao@taosdata.com",
from: "support@taosdata.com"
)
}
failure {
emailext (
subject: "FAILED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
body: '''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${PROJECT_NAME}</li>
<li>构建结果:<span style="color:green"> Successful </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${CAUSE}</li>
<li>变更概要:${CHANGES}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
<li>变更集:${JELLY_SCRIPT}</li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>''',
to: "yqliu@taosdata.com,pxiao@taosdata.com",
from: "support@taosdata.com"
)
}
}
}
\ No newline at end of file
......@@ -87,6 +87,7 @@ TDengine系统后台服务由taosd提供,可以在配置文件taos.cfg里修
- httpPort: RESTful服务使用的端口号,所有的HTTP请求(TCP)都需要向该接口发起查询/写入请求。
- dataDir: 数据文件目录,所有的数据文件都将写入该目录。默认值:/var/lib/taos。
- logDir:日志文件目录,客户端和服务器的运行日志文件将写入该目录。默认值:/var/log/taos。
- tempDir:临时文件目录,客户端和服务器的临时文件(主要是查询时用于保存中间结果的问题)将写入该目录。 默认值:Linux下为 /tmp/,Windows下为环境变量 tmp 或 temp 指向的目录。
- arbitrator:系统中裁决器的end point, 缺省值为空。
- role:dnode的可选角色。0-any; 既可作为mnode,也可分配vnode;1-mgmt;只能作为mnode,不能分配vnode;2-dnode;不能作为mnode,只能分配vnode
- debugFlag:运行日志开关。131(输出错误和警告日志),135( 输出错误、警告和调试日志),143( 输出错误、警告、调试和跟踪日志)。默认值:131或135(不同模块有不同的默认值)。
......
......@@ -844,7 +844,7 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
- **PERCENTILE**
```mysql
SELECT PERCENTILE(field_name, P) FROM { tb_name | stb_name } [WHERE clause];
SELECT PERCENTILE(field_name, P) FROM { tb_name } [WHERE clause];
```
功能说明:统计表中某列的值百分比分位数。
返回结果数据类型: 双精度浮点数Double。
......
......@@ -35,7 +35,7 @@ TDengine相对于通用数据库,有超高的压缩比,在绝大多数场景
Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable
```
示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000\*128\*24\*60/15*365 = 44851T。TDengine大概需要消耗44851/5=8970T, 8.9P空间。
示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000\*128\*24\*60/15*365 = 44.8512T。TDengine大概需要消耗44.851/5=8.97024T空间。
用户可以通过参数keep,设置数据在磁盘中的最大保存时长。为进一步减少存储成本,TDengine还提供多级存储,最冷的数据可以存放在最廉价的存储介质上,应用的访问不用做任何调整,只是读取速度降低了。
......
......@@ -616,6 +616,43 @@ HTTP请求URL采用`sqlutc`时,返回结果集的时间戳将采用UTC时间
- httpEnableCompress: 是否支持压缩,默认不支持,目前TDengine仅支持gzip压缩格式
- httpDebugFlag: 日志开关,131:仅错误和报警信息,135:调试信息,143:非常详细的调试信息,默认131
## CSharp Connector
在Windows系统上,C#应用程序可以使用TDengine的原生C接口来执行所有数据库操作,后续版本将提供ORM(dapper)框架驱动。
#### 安装TDengine客户端
C#连接器需要使用`libtaos.so``taos.h`。因此,在使用C#连接器之前,需在程序运行的Windows环境安装TDengine的Windows客户端,以便获得相关驱动文件。
安装完成后,在文件夹`C:/TDengine/examples/C#`中,将会看到两个文件
- TDengineDriver.cs 调用taos.dll文件的Native C方法
- TDengineTest.cs 参考程序示例
在文件夹`C:\Windows\System32`,将会看到`taos.dll`文件
#### 使用方法
- 将C#接口文件TDengineDriver.cs加入到应用程序所在.NET项目中
- 参考TDengineTest.cs来定义数据库连接参数,及执行数据插入、查询等操作的方法
- 因为C#接口需要用到`taos.dll`文件,用户可以将`taos.dll`文件加入.NET解决方案中
#### 注意事项
- `taos.dll`文件使用x64平台编译,所以.NET项目在生成.exe文件时,“解决方案”/“项目”的“平台”请均选择“x64”。
- 此.NET接口目前已经在Visual Studio 2013/2015/2017中验证过,其它VS版本尚待验证。
#### 第三方驱动
Maikebing.Data.Taos是一个TDengine的ADO.Net提供器,支持linux,windows。该开发包由热心贡献者`麦壳饼@@maikebing`提供,具体请参考
```
//接口下载
https://github.com/maikebing/Maikebing.EntityFrameworkCore.Taos
//用法说明
https://www.taosdata.com/blog/2020/11/02/1901.html
```
## Go Connector
......
......@@ -38,9 +38,9 @@
6. 检查防火墙设置,确认TCP/UDP 端口6030-6042 是打开的
7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/lib/taos*里, 并且*/usr/local/lib/taos*在系统库函数搜索路径*LD_LIBRARY_PATH*
7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/taos/driver*里, 并且*/usr/local/taos/driver*在系统库函数搜索路径*LD_LIBRARY_PATH*
8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*driver/c/taos.dll*在你的系统搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*)
8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*C:\TDengine\driver\taos.dll*在你的系统库函数搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*)
9. 如果仍不能排除连接故障,请使用命令行工具nc来分别判断指定端口的TCP和UDP连接是否通畅
检查UDP端口连接是否工作:`nc -vuz {hostIP} {port} `
......
......@@ -20,6 +20,9 @@
# data file's directory
# dataDir /var/lib/taos
# temporary file's directory
# tempDir /tmp/
# the arbitrator's fully qualified domain name (FQDN) for TDengine system, for cluster only
# arbitrator arbitrator_hostname:6042
......@@ -256,3 +259,5 @@
# maximum display width of binary and nchar fields in the shell. The parts exceeding this limit will be hidden
# maxBinaryDisplayWidth 30
# enable/disable telemetry reporting
# telemetryReporting 1
\ No newline at end of file
......@@ -2,7 +2,7 @@
%define cfg_install_dir /etc/taos
%define __strip /bin/true
Name: TDengine
Name: tdengine
Version: %{_version}
Release: 3%{?dist}
Summary: tdengine from taosdata
......
......@@ -121,8 +121,11 @@ function install_config() {
echo -e -n "${GREEN}Enter FQDN:port (like h1.taosdata.com:6030) of an existing TDengine cluster node to join${NC}"
echo
echo -e -n "${GREEN}OR leave it blank to build one${NC}:"
read firstEp
while true; do
#read firstEp
if exec < /dev/tty; then
read firstEp;
fi
while true; do
if [ ! -z "$firstEp" ]; then
# check the format of the firstEp
#if [[ $firstEp == $FQDN_PATTERN ]]; then
......
......@@ -490,7 +490,7 @@ static bool balanceMontiorDropping() {
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
if (pDnode->lastAccess + tsOfflineThreshold > tsAccessSquence) continue;
if (strcmp(pDnode->dnodeEp, dnodeGetMnodeMasterEp()) == 0) continue;
if (dnodeIsMasterEp(pDnode->dnodeEp)) continue;
if (mnodeGetDnodesNum() <= 1) continue;
mLInfo("dnode:%d, set to removing state for it offline:%d seconds", pDnode->dnodeId,
......
......@@ -64,9 +64,8 @@ typedef struct SLocalReducer {
SColumnModel * resColModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure
char * pFinalRes; // result data after interpo
tFilePage * discardData;
SResultInfo * pResInfo;
char* pFinalRes; // result data after interpo
tFilePage* discardData;
bool discard;
int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable
......
......@@ -23,7 +23,7 @@ extern "C" {
#include "tscUtil.h"
#include "tsclient.h"
void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscFetchDatablockForSubquery(SSqlObj* pSql);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
......
......@@ -75,6 +75,7 @@ typedef struct SJoinSupporter {
SArray* exprList;
SFieldInfo fieldsInfo;
STagCond tagCond;
SSqlGroupbyExpr groupInfo; // group by info
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
......@@ -86,8 +87,8 @@ typedef struct SJoinSupporter {
} SJoinSupporter;
typedef struct SVgroupTableInfo {
SCMVgroupInfo vgInfo;
SArray* itemList; //SArray<STableIdInfo>
SVgroupInfo vgInfo;
SArray* itemList; //SArray<STableIdInfo>
} SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
......@@ -225,8 +226,9 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables);
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
......@@ -237,7 +239,7 @@ void tscDoQuery(SSqlObj* pSql);
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo);
void* tscVgroupInfoClear(SVgroupsInfo *pInfo);
void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src);
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src);
/**
* The create object function must be successful expect for the out of memory issue.
*
......@@ -265,6 +267,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t sub
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
......
......@@ -77,7 +77,7 @@ SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex);
* @param colId
* @return
*/
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
/**
* check if the schema is valid or not, including following aspects:
......@@ -107,9 +107,6 @@ SSchema tscGetTbnameColumnSchema();
*/
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size);
//todo tags value as well as the table id structure needs refactor
char *tsGetTagsValue(STableMeta *pMeta);
#ifdef __cplusplus
}
#endif
......
......@@ -90,12 +90,12 @@ typedef struct STableComInfo {
int32_t rowSize;
} STableComInfo;
typedef struct SCMCorVgroupInfo {
int32_t version;
int8_t inUse;
int8_t numOfEps;
SEpAddr1 epAddr[TSDB_MAX_REPLICA];
} SCMCorVgroupInfo;
typedef struct SCorVgroupInfo {
int32_t version;
int8_t inUse;
int8_t numOfEps;
SEpAddr1 epAddr[TSDB_MAX_REPLICA];
} SCorVgroupInfo;
typedef struct STableMeta {
STableComInfo tableInfo;
......@@ -103,8 +103,8 @@ typedef struct STableMeta {
int16_t sversion;
int16_t tversion;
char sTableId[TSDB_TABLE_FNAME_LEN];
SCMVgroupInfo vgroupInfo;
SCMCorVgroupInfo corVgroupInfo;
SVgroupInfo vgroupInfo;
SCorVgroupInfo corVgroupInfo;
STableId id;
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta;
......@@ -128,7 +128,7 @@ typedef struct STableMetaInfo {
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndex colInfo;
int64_t uid; // refactor use the pointer
uint64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
......@@ -339,9 +339,9 @@ typedef struct STscObj {
} STscObj;
typedef struct SSubqueryState {
int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState;
typedef struct SSqlObj {
......@@ -431,14 +431,6 @@ void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
*/
void tscFreeSqlResult(SSqlObj *pSql);
/**
* only free part of resources allocated during query.
* TODO remove it later
* Note: this function is multi-thread safe.
* @param pObj
*/
void tscPartiallyFreeSqlObj(SSqlObj *pSql);
/**
* free sql object, release allocated resource
* @param pObj
......@@ -523,7 +515,6 @@ extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
int32_t tscCompareTidTags(const void* p1, const void* p2);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
#ifdef __cplusplus
......
......@@ -176,7 +176,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
}
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
tscFetchDatablockForSubquery(pSql);
} else {
tscProcessSql(pSql);
}
......@@ -226,7 +226,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
// handle the sub queries of join query
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
tscFetchDatablockForSubquery(pSql);
} else if (pRes->completed) {
if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
......@@ -405,7 +405,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlRes *pRes = &pSql->res;
pRes->code = code;
const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
SSqlObj *sub = (SSqlObj*) res;
const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
if (code != TSDB_CODE_SUCCESS) {
tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code));
goto _error;
......@@ -427,8 +428,11 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} else {
assert(code == TSDB_CODE_SUCCESS);
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pSql->param != NULL);
// param already freed by other routine and pSql in tscCache when ctrl + c
if (atomic_load_ptr(&pSql->param) == NULL) {
return;
}
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSql;
......
此差异已折叠。
......@@ -49,82 +49,6 @@ typedef struct SCreateBuilder {
} SCreateBuilder;
static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength);
static int32_t getToStringLength(const char *pData, int32_t length, int32_t type) {
char buf[512] = {0};
int32_t len = 0;
int32_t MAX_BOOL_TYPE_LENGTH = 5; // max(strlen("true"), strlen("false"));
switch (type) {
case TSDB_DATA_TYPE_BINARY:
return length;
case TSDB_DATA_TYPE_NCHAR:
return length;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(pData);
len = sprintf(buf, "%lf", dv);
if (strncasecmp("nan", buf, 3) == 0) {
len = 4;
}
} break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(pData);
len = sprintf(buf, "%f", fv);
if (strncasecmp("nan", buf, 3) == 0) {
len = 4;
}
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
len = sprintf(buf, "%" PRId64, *(int64_t *)pData);
break;
case TSDB_DATA_TYPE_BOOL:
len = MAX_BOOL_TYPE_LENGTH;
break;
default:
len = sprintf(buf, "%d", *(int32_t *)pData);
break;
};
return len;
}
/*
* we need to convert all data into string, so we need to sprintf all kinds of
* non-string data into string, and record its length to get the right
* maximum length. The length may be less or greater than its original binary length:
* For example:
* length((short) 1) == 1, less than sizeof(short)
* length((uint64_t) 123456789011) > 12, greater than sizsof(uint64_t)
*/
static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) {
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta;
if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_STREAM_TABLE) {
return 0;
}
char * pTagValue = tsGetTagsValue(pMeta);
SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
int32_t len = getToStringLength(pTagValue, pTagsSchema[0].bytes, pTagsSchema[0].type);
pTagValue += pTagsSchema[0].bytes;
int32_t numOfTags = tscGetNumOfTags(pMeta);
for (int32_t i = 1; i < numOfTags; ++i) {
int32_t tLen = getToStringLength(pTagValue, pTagsSchema[i].bytes, pTagsSchema[i].type);
if (len < tLen) {
len = tLen;
}
pTagValue += pTagsSchema[i].bytes;
}
return len;
}
static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
......@@ -186,8 +110,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
return 0;
}
// the following is handle display tags value for meters created according to metric
char *pTagValue = tsGetTagsValue(pMeta);
// the following is handle display tags for table created according to super table
for (int32_t i = numOfRows; i < totalNumOfRows; ++i) {
// field name
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
......@@ -219,8 +142,6 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
char *target = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i;
const char *src = "TAG";
STR_WITH_MAXSIZE_TO_VARSTR(target, src, pField->bytes);
pTagValue += pSchema[i].bytes;
}
return 0;
......@@ -286,10 +207,10 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
const int32_t TYPE_COLUMN_LENGTH = 16;
const int32_t NOTE_COLUMN_MIN_LENGTH = 8;
int32_t noteFieldLen = tscMaxLengthOfTagsFields(pSql);
if (noteFieldLen == 0) {
noteFieldLen = NOTE_COLUMN_MIN_LENGTH;
}
int32_t noteFieldLen = NOTE_COLUMN_MIN_LENGTH;//tscMaxLengthOfTagsFields(pSql);
// if (noteFieldLen == 0) {
// noteFieldLen = NOTE_COLUMN_MIN_LENGTH;
// }
int32_t rowLen = tscBuildTableSchemaResultFields(pSql, NUM_OF_DESC_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, noteFieldLen);
tscFieldInfoUpdateOffset(pQueryInfo);
......
......@@ -99,12 +99,9 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
}
SResultInfo *pResInfo = &pReducer->pResInfo[i];
pResInfo->bufLen = pExpr->interBytes;
pResInfo->interResultBuf = calloc(1, (size_t) pResInfo->bufLen);
pCtx->resultInfo = &pReducer->pResInfo[i];
pCtx->resultInfo->superTableQ = true;
pCtx->interBufBytes = pExpr->interBytes;
pCtx->resultInfo = calloc(1, pCtx->interBufBytes + sizeof(SResultRowCellInfo));
pCtx->stableQuery = true;
}
int16_t n = 0;
......@@ -345,7 +342,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pTempBuffer->num = 0;
pReducer->pResInfo = calloc(numOfCols, sizeof(SResultInfo));
tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pReducer, pDesc);
......@@ -489,13 +485,15 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tscDebug("%p waiting for delete procedure, status: %d", pSql, status);
}
pLocalReducer->pFillInfo = taosDestoryFillInfo(pLocalReducer->pFillInfo);
pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
tVariantDestroy(&pCtx->tag);
taosTFree(pCtx->resultInfo);
if (pCtx->tagInfo.pTagCtxList != NULL) {
taosTFree(pCtx->tagInfo.pTagCtxList);
}
......@@ -509,15 +507,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
taosTFree(pLocalReducer->pTempBuffer);
taosTFree(pLocalReducer->pResultBuf);
if (pLocalReducer->pResInfo != NULL) {
size_t num = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < num; ++i) {
taosTFree(pLocalReducer->pResInfo[i].interResultBuf);
}
taosTFree(pLocalReducer->pResInfo);
}
if (pLocalReducer->pLoserTree) {
taosTFree(pLocalReducer->pLoserTree->param);
taosTFree(pLocalReducer->pLoserTree);
......@@ -1072,7 +1061,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
continue;
}
SResultInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (maxOutput < pResInfo->numOfRes) {
maxOutput = pResInfo->numOfRes;
}
......@@ -1253,10 +1242,11 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur
return true;
}
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { // reset output buffer to the beginning
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
pLocalReducer->pCtx[i].aOutputBuf =
pLocalReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pLocalReducer->resColModel->capacity;
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {// reset output buffer to the beginning
size_t t = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < t; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
pLocalReducer->pCtx[i].aOutputBuf = pLocalReducer->pResultBuf->data + pExpr->offset * pLocalReducer->resColModel->capacity;
}
memset(pLocalReducer->pResultBuf, 0, pLocalReducer->nResultBufSize + sizeof(tFilePage));
......@@ -1501,8 +1491,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
if (pLocalReducer->discard && sameGroup) {
pLocalReducer->hasUnprocessedRow = false;
tmpBuffer->num = 0;
} else {
// current row does not belongs to the previous group, so it is not be handled yet.
} else { // current row does not belongs to the previous group, so it is not be handled yet.
pLocalReducer->hasUnprocessedRow = true;
}
......
......@@ -702,7 +702,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st
}
int32_t code = TSDB_CODE_TSC_INVALID_SQL;
char * tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \"
char * tmpTokenBuf = calloc(1, 16*1024); // used for deleting Escape character: \\, \', \"
if (NULL == tmpTokenBuf) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......
......@@ -222,7 +222,7 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
}
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SCMHeartBeatMsg *pHeartbeat = pMsg;
SHeartBeatMsg *pHeartbeat = pMsg;
int allocedQueriesNum = pHeartbeat->numOfQueries;
int allocedStreamsNum = pHeartbeat->numOfStreams;
......@@ -277,7 +277,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
}
int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) +
sizeof(SCMHeartBeatMsg);
sizeof(SHeartBeatMsg);
pHeartbeat->connId = htonl(pObj->connId);
pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries);
pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams);
......
此差异已折叠。
......@@ -118,7 +118,7 @@ SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t colIndex)
}
// TODO for large number of columns, employ the binary search method
SSchema* tscGetTableColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
for(int32_t i = 0; i < tinfo.numOfColumns + tinfo.numOfTags; ++i) {
......@@ -140,7 +140,7 @@ struct SSchema tscGetTbnameColumnSchema() {
strcpy(s.name, TSQL_TBNAME_L);
return s;
}
static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) {
static void tscInitCorVgroupInfo(SCorVgroupInfo *corVgroupInfo, SVgroupInfo *vgroupInfo) {
corVgroupInfo->version = 0;
corVgroupInfo->inUse = 0;
corVgroupInfo->numOfEps = vgroupInfo->numOfEps;
......@@ -166,7 +166,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->id.tid = pTableMetaMsg->tid;
pTableMeta->id.uid = pTableMetaMsg->uid;
SCMVgroupInfo* pVgroupInfo = &pTableMeta->vgroupInfo;
SVgroupInfo* pVgroupInfo = &pTableMeta->vgroupInfo;
pVgroupInfo->numOfEps = pTableMetaMsg->vgroup.numOfEps;
pVgroupInfo->vgId = pTableMetaMsg->vgroup.vgId;
......@@ -197,28 +197,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
return pTableMeta;
}
/**
* the TableMeta data format in memory is as follows:
*
* +--------------------+
* |STableMeta Body data| sizeof(STableMeta)
* +--------------------+
* |Schema data | numOfTotalColumns * sizeof(SSchema)
* +--------------------+
* |Tags data | tag_col_1.bytes + tag_col_2.bytes + ....
* +--------------------+
*
* @param pTableMeta
* @return
*/
char* tsGetTagsValue(STableMeta* pTableMeta) {
int32_t offset = 0;
// int32_t numOfTotalCols = pTableMeta->numOfColumns + pTableMeta->numOfTags;
// uint32_t offset = sizeof(STableMeta) + numOfTotalCols * sizeof(SSchema);
return ((char*)pTableMeta + offset);
}
// todo refactor
UNUSED_FUNC static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
......
此差异已折叠。
此差异已折叠。
......@@ -80,7 +80,7 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon
void taos_init_imp(void) {
char temp[128];
char temp[128] = {0};
errno = TSDB_CODE_SUCCESS;
srand(taosGetTimestampSec());
......@@ -104,7 +104,6 @@ void taos_init_imp(void) {
taosReadGlobalCfg();
taosCheckGlobalCfg();
taosPrintGlobalCfg();
tscDebug("starting to initialize TAOS client ...");
tscDebug("Local End Point is:%s", tsLocalEp);
......@@ -149,30 +148,42 @@ void taos_init_imp(void) {
tscRefId = taosOpenRef(200, tscCloseTscObj);
// in other language APIs, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated
// resource to suppress the valgrind warning.
atexit(taos_cleanup);
tscDebug("client is initialized successfully");
}
void taos_init() { pthread_once(&tscinit, taos_init_imp); }
void taos_cleanup() {
if (tscMetaCache != NULL) {
taosCacheCleanup(tscMetaCache);
tscMetaCache = NULL;
// this function may be called by user or system, or by both simultaneously.
void taos_cleanup(void) {
tscDebug("start to cleanup client environment");
taosCacheCleanup(tscObjCache);
tscObjCache = NULL;
void* m = tscMetaCache;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscMetaCache, m, 0) == m) {
taosCacheCleanup(m);
}
if (tscQhandle != NULL) {
taosCleanUpScheduler(tscQhandle);
tscQhandle = NULL;
m = tscObjCache;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) {
taosCacheCleanup(m);
}
m = tscQhandle;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscQhandle, m, 0) == m) {
taosCleanUpScheduler(m);
}
taosCloseRef(tscRefId);
taosCleanupKeywordsTable();
taosCloseLog();
taosTmrCleanUp(tscTmr);
m = tscTmr;
if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) {
taosTmrCleanUp(m);
}
}
static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
......
......@@ -338,34 +338,6 @@ void tscFreeSqlResult(SSqlObj* pSql) {
memset(&pSql->res, 0, sizeof(SSqlRes));
}
void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
}
SSqlCmd* pCmd = &pSql->cmd;
int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscRemoveFromSqlList(pSql);
}
// pSql->sqlstr will be used by tscBuildQueryStreamDesc
// if (pObj->signature == pObj) {
//pthread_mutex_lock(&pObj->mutex);
taosTFree(pSql->sqlstr);
//pthread_mutex_unlock(&pObj->mutex);
// }
tscFreeSqlResult(pSql);
taosTFree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pSql->self = 0;
tscResetSqlCmdObj(pCmd, false);
}
static void tscFreeSubobj(SSqlObj* pSql) {
if (pSql->subState.numOfSub == 0) {
return;
......@@ -434,22 +406,32 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscDebug("%p start to free sqlObj", pSql);
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscFreeSubobj(pSql);
tscPartiallyFreeSqlObj(pSql);
SSqlCmd* pCmd = &pSql->cmd;
int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscRemoveFromSqlList(pSql);
}
pSql->signature = NULL;
pSql->fp = NULL;
SSqlCmd* pCmd = &pSql->cmd;
taosTFree(pSql->sqlstr);
taosTFree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pSql->self = 0;
tscFreeSqlResult(pSql);
tscResetSqlCmdObj(pCmd, false);
memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
taosTFree(pCmd->payload);
pCmd->allocSize = 0;
taosTFree(pSql->sqlstr);
tsem_destroy(&pSql->rspSem);
free(pSql);
}
......@@ -1665,6 +1647,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
pQueryInfo->groupbyExpr.columnInfo = NULL;
pQueryInfo->groupbyExpr.numOfGroupCols = 0;
}
pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf);
......@@ -1713,7 +1696,18 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
taosArrayRemove(pVgroupTable, index);
}
SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) {
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) {
memset(info, 0, sizeof(SVgroupTableInfo));
info->vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
info->itemList = taosArrayClone(pInfo->itemList);
}
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
if (pVgroupTables == NULL) {
return NULL;
}
......@@ -1724,14 +1718,8 @@ SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) {
SVgroupTableInfo info;
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
memset(&info, 0, sizeof(SVgroupTableInfo));
info.vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info.vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
tscVgroupTableCopy(&info, pInfo);
info.itemList = taosArrayClone(pInfo->itemList);
taosArrayPush(pa, &info);
}
......@@ -1739,7 +1727,7 @@ SArray* tscCloneVgroupTableInfo(SArray* pVgroupTables) {
}
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
tscDebug("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
tscDebug("%p unref %d tables in the tableMeta cache", address, pQueryInfo->numOfTables);
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
......@@ -1779,6 +1767,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList);
}
// TODO handle malloc failure
pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
if (pTableMetaInfo->tagColList == NULL) {
return NULL;
......@@ -1788,7 +1777,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
tscColumnListCopy(pTableMetaInfo->tagColList, pTagCols, -1);
}
pTableMetaInfo->pVgroupTables = tscCloneVgroupTableInfo(pVgroupTables);
pTableMetaInfo->pVgroupTables = tscVgroupTableInfoClone(pVgroupTables);
pQueryInfo->numOfTables += 1;
return pTableMetaInfo;
......@@ -2155,6 +2144,21 @@ int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
}
}
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId) {
int32_t numOfTags = tscGetNumOfTags(pTableMeta);
SSchema* pSchema = tscGetTableTagSchema(pTableMeta);
for(int32_t i = 0; i < numOfTags; ++i) {
if (pSchema[i].colId == colId) {
return i;
}
}
// can not reach here
assert(0);
return INT16_MIN;
}
bool tscIsUpdateQuery(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
......@@ -2424,7 +2428,7 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
return NULL;
}
size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups;
size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupList->numOfVgroups;
SVgroupsInfo* pNew = calloc(1, size);
if (pNew == NULL) {
return NULL;
......@@ -2433,9 +2437,9 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
pNew->numOfVgroups = vgroupList->numOfVgroups;
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SCMVgroupInfo* pNewVInfo = &pNew->vgroups[i];
SVgroupInfo* pNewVInfo = &pNew->vgroups[i];
SCMVgroupInfo* pvInfo = &vgroupList->vgroups[i];
SVgroupInfo* pvInfo = &vgroupList->vgroups[i];
pNewVInfo->vgId = pvInfo->vgId;
pNewVInfo->numOfEps = pvInfo->numOfEps;
......@@ -2454,7 +2458,7 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
}
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SCMVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i];
SVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i];
for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) {
taosTFree(pVgroupInfo->epAddr[j].fqdn);
......@@ -2465,10 +2469,11 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
return NULL;
}
void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src) {
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) {
dst->vgId = src->vgId;
dst->numOfEps = src->numOfEps;
for(int32_t i = 0; i < dst->numOfEps; ++i) {
taosTFree(dst->epAddr[i].fqdn);
dst->epAddr[i].port = src->epAddr[i].port;
dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn);
}
......
......@@ -51,6 +51,7 @@ extern char tsLocale[];
extern char tsCharset[]; // default encode string
extern int32_t tsEnableCoreFile;
extern int32_t tsCompressMsgSize;
extern char tsTempDir[];
//query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing
......@@ -183,13 +184,13 @@ extern int32_t debugFlag;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
void taosInitGlobalCfg();
bool taosCheckGlobalCfg();
void taosSetAllDebugFlag();
bool taosCfgDynamicOptions(char *msg);
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId);
void taosInitGlobalCfg();
int32_t taosCheckGlobalCfg();
void taosSetAllDebugFlag();
bool taosCfgDynamicOptions(char *msg);
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId);
#ifdef __cplusplus
}
#endif
......
......@@ -58,6 +58,7 @@ char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
int32_t tsEnableCoreFile = 0;
int32_t tsMaxBinaryDisplayWidth = 30;
char tsTempDir[TSDB_FILENAME_LEN] = "/tmp/";
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
......@@ -1310,13 +1311,23 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "tempDir";
cfg.ptr = tsTempDir;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsTempDir);
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
}
void taosInitGlobalCfg() {
pthread_once(&tsInitGlobalCfgOnce, doInitGlobalConfig);
}
bool taosCheckGlobalCfg() {
int32_t taosCheckGlobalCfg() {
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
......@@ -1375,7 +1386,9 @@ bool taosCheckGlobalCfg() {
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
tsHttpPort = tsServerPort + TSDB_PORT_HTTP;
return true;
taosPrintGlobalCfg();
return 0;
}
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) {
......
......@@ -108,7 +108,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
break;
}
case TSDB_DATA_TYPE_BINARY: { // todo refactor, extract a method
pVar->pz = calloc(len, sizeof(char));
pVar->pz = calloc(len + 1, sizeof(char));
memcpy(pVar->pz, pz, len);
pVar->nLen = (int32_t)len;
break;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_CFG_H
#define TDENGINE_DNODE_CFG_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t dnodeInitCfg();
void dnodeCleanupCfg();
void dnodeUpdateCfg(SDnodeCfg *cfg);
int32_t dnodeGetDnodeId();
void dnodeGetCfg(int32_t *dnodeId, char *clusterId);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_EP_H
#define TDENGINE_DNODE_EP_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
int32_t dnodeInitEps();
void dnodeCleanupEps();
void dnodeUpdateEps(SDnodeEps *eps);
void dnodeUpdateEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_MINFOS_H
#define TDENGINE_DNODE_MINFOS_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
int32_t dnodeInitMInfos();
void dnodeCleanupMInfos();
void dnodeUpdateMInfos(SMnodeInfos *minfos);
void dnodeUpdateEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetMInfos(SMnodeInfos *minfos);
bool dnodeIsMasterEp(char *ep);
#ifdef __cplusplus
}
#endif
#endif
......@@ -20,6 +20,8 @@
extern "C" {
#endif
#include "trpc.h"
int32_t dnodeInitMgmt();
void dnodeCleanupMgmt();
int32_t dnodeInitMgmtTimer();
......@@ -35,8 +37,8 @@ void* dnodeGetVnodeTsdb(void *pVnode);
void dnodeReleaseVnode(void *pVnode);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeEpSetForShell(void *epSet);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
#ifdef __cplusplus
}
......
......@@ -20,9 +20,12 @@
extern "C" {
#endif
int32_t dnodeInitVnodeWrite();
void dnodeCleanupVnodeWrite();
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg);
int32_t dnodeInitVWrite();
void dnodeCleanupVWrite();
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
void * dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeCfg.h"
static SDnodeCfg tsCfg = {0};
static pthread_mutex_t tsCfgMutex;
static int32_t dnodeReadCfg();
static int32_t dnodeWriteCfg();
static void dnodeResetCfg(SDnodeCfg *cfg);
static void dnodePrintCfg(SDnodeCfg *cfg);
int32_t dnodeInitCfg() {
pthread_mutex_init(&tsCfgMutex, NULL);
dnodeResetCfg(NULL);
int32_t ret = dnodeReadCfg();
if (ret == 0) {
dInfo("dnode cfg is initialized");
}
return ret;
}
void dnodeCleanupCfg() { pthread_mutex_destroy(&tsCfgMutex); }
void dnodeUpdateCfg(SDnodeCfg *cfg) {
if (tsCfg.dnodeId != 0) return;
dnodeResetCfg(cfg);
}
int32_t dnodeGetDnodeId() {
int32_t dnodeId = 0;
pthread_mutex_lock(&tsCfgMutex);
dnodeId = tsCfg.dnodeId;
pthread_mutex_unlock(&tsCfgMutex);
return dnodeId;
}
void dnodeGetCfg(int32_t *dnodeId, char *clusterId) {
pthread_mutex_lock(&tsCfgMutex);
*dnodeId = tsCfg.dnodeId;
tstrncpy(clusterId, tsCfg.clusterId, TSDB_CLUSTER_ID_LEN);
pthread_mutex_unlock(&tsCfgMutex);
}
static void dnodeResetCfg(SDnodeCfg *cfg) {
if (cfg == NULL) return;
if (cfg->dnodeId == 0) return;
pthread_mutex_lock(&tsCfgMutex);
tsCfg.dnodeId = cfg->dnodeId;
tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
dnodePrintCfg(cfg);
dnodeWriteCfg();
pthread_mutex_unlock(&tsCfgMutex);
}
static void dnodePrintCfg(SDnodeCfg *cfg) {
dInfo("dnodeId is set to %d, clusterId is set to %s", cfg->dnodeId, cfg->clusterId);
}
static int32_t dnodeReadCfg() {
int32_t len = 0;
int32_t maxLen = 200;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
SDnodeCfg cfg = {0};
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeCfg.json", tsDnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("failed to read %s, file not exist", file);
goto PARSE_CFG_OVER;
}
len = fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s, content is null", file);
goto PARSE_CFG_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s, invalid json format", file);
goto PARSE_CFG_OVER;
}
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s, dnodeId not found", file);
goto PARSE_CFG_OVER;
}
cfg.dnodeId = dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s, clusterId not found", file);
goto PARSE_CFG_OVER;
}
tstrncpy(cfg.clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN);
dInfo("read file %s successed", file);
PARSE_CFG_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
terrno = 0;
dnodeResetCfg(&cfg);
return 0;
}
static int32_t dnodeWriteCfg() {
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s, reason:%s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 200;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsCfg.dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsCfg.clusterId);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
terrno = 0;
dInfo("successed to write %s", file);
return 0;
}
......@@ -15,9 +15,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnodeInt.h"
#include "dnodeCheck.h"
......@@ -30,8 +28,8 @@ typedef struct {
void (*stopFp)();
} SCheckItem;
static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}};
int64_t tsMinFreeMemSizeForStart = 0;
static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}};
int64_t tsMinFreeMemSizeForStart = 0;
static int bindTcpPort(int port) {
int serverSocket;
......@@ -264,8 +262,6 @@ int32_t dnodeInitCheck() {
}
}
dInfo("dnode check is initialized");
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "hash.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeEps.h"
static SDnodeEps *tsEps = NULL;
static SHashObj * tsEpsHash = NULL;
static pthread_mutex_t tsEpsMutex;
static int32_t dnodeReadEps();
static int32_t dnodeWriteEps();
static void dnodeResetEps(SDnodeEps *eps);
static void dnodePrintEps(SDnodeEps *eps);
int32_t dnodeInitEps() {
pthread_mutex_init(&tsEpsMutex, NULL);
tsEpsHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
dnodeResetEps(NULL);
int32_t ret = dnodeReadEps();
if (ret == 0) {
dInfo("dnode eps is initialized");
}
return ret;
}
void dnodeCleanupEps() {
pthread_mutex_lock(&tsEpsMutex);
if (tsEps) {
free(tsEps);
tsEps = NULL;
}
if (tsEpsHash) {
taosHashCleanup(tsEpsHash);
tsEpsHash = NULL;
}
pthread_mutex_unlock(&tsEpsMutex);
pthread_mutex_destroy(&tsEpsMutex);
}
void dnodeUpdateEps(SDnodeEps *eps) {
if (eps == NULL) return;
eps->dnodeNum = htonl(eps->dnodeNum);
for (int32_t i = 0; i < eps->dnodeNum; ++i) {
eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId);
eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort);
}
pthread_mutex_lock(&tsEpsMutex);
if (eps->dnodeNum != tsEps->dnodeNum) {
dnodeResetEps(eps);
dnodeWriteEps();
} else {
int32_t size = sizeof(SDnodeEps) + eps->dnodeNum * sizeof(SDnodeEp);
if (memcmp(eps, tsEps, size) != 0) {
dnodeResetEps(eps);
dnodeWriteEps();
}
}
pthread_mutex_unlock(&tsEpsMutex);
}
bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr) {
bool changed = false;
pthread_mutex_lock(&tsEpsMutex);
SDnodeEp *ep = taosHashGet(tsEpsHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
char epSaved[TSDB_EP_LEN + 1];
snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
changed = strcmp(epstr, epSaved) != 0;
tstrncpy(epstr, epSaved, TSDB_EP_LEN);
}
pthread_mutex_unlock(&tsEpsMutex);
return changed;
}
void dnodeUpdateEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
pthread_mutex_lock(&tsEpsMutex);
SDnodeEp *ep = taosHashGet(tsEpsHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
if (port) *port = ep->dnodePort;
if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN);
if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
}
pthread_mutex_unlock(&tsEpsMutex);
}
static void dnodeResetEps(SDnodeEps *eps) {
if (eps == NULL) {
int32_t size = sizeof(SDnodeEps) + sizeof(SDnodeEp);
if (tsEps == NULL) {
tsEps = calloc(1, size);
} else {
tsEps->dnodeNum = 0;
}
} else {
assert(tsEps);
int32_t size = sizeof(SDnodeEps) + sizeof(SDnodeEp) * eps->dnodeNum;
if (eps->dnodeNum > tsEps->dnodeNum) {
tsEps = realloc(tsEps, size);
}
memcpy(tsEps, eps, size);
dnodePrintEps(eps);
}
for (int32_t i = 0; i < tsEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsEps->dnodeEps[i];
taosHashPut(tsEpsHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
}
}
static void dnodePrintEps(SDnodeEps *eps) {
dDebug("print dnodeEp, dnodeNum:%d", eps->dnodeNum);
for (int32_t i = 0; i < eps->dnodeNum; i++) {
SDnodeEp *ep = &eps->dnodeEps[i];
dDebug("dnodeId:%d, dnodeFqdn:%s dnodePort:%u", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort);
}
}
static int32_t dnodeReadEps() {
int32_t ret = -1;
int32_t len = 0;
int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
SDnodeEps *eps = NULL;
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeEps.json", tsDnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("failed to read %s, file not exist", file);
goto PRASE_EPS_OVER;
}
len = fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s, content is null", file);
goto PRASE_EPS_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s, invalid json format", file);
goto PRASE_EPS_OVER;
}
cJSON *dnodeNum = cJSON_GetObjectItem(root, "dnodeNum");
if (!dnodeNum || dnodeNum->type != cJSON_Number) {
dError("failed to read %s, dnodeNum not found", file);
goto PRASE_EPS_OVER;
}
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
dError("failed to read %s, dnodeInfos not found", file);
goto PRASE_EPS_OVER;
}
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
if (dnodeInfosSize != dnodeNum->valueint) {
dError("failed to read %s, dnodeInfos size:%d not matched dnodeNum:%d", file, dnodeInfosSize,
(int32_t)dnodeNum->valueint);
goto PRASE_EPS_OVER;
}
int32_t epsSize = sizeof(SDnodeEps) + dnodeInfosSize * sizeof(SDnodeEp);
eps = calloc(1, epsSize);
eps->dnodeNum = dnodeInfosSize;
for (int32_t i = 0; i < dnodeInfosSize; ++i) {
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
if (dnodeInfo == NULL) break;
SDnodeEp *ep = &eps->dnodeEps[i];
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s, dnodeId not found", file);
goto PRASE_EPS_OVER;
}
ep->dnodeId = dnodeId->valueint;
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", file);
goto PRASE_EPS_OVER;
}
strncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
if (!dnodePort || dnodePort->type != cJSON_Number) {
dError("failed to read %s, dnodePort not found", file);
goto PRASE_EPS_OVER;
}
ep->dnodePort = (uint16_t)dnodePort->valueint;
}
ret = 0;
dInfo("read file %s successed", file);
dnodePrintEps(eps);
PRASE_EPS_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
if (ret != 0) {
if (eps) free(eps);
eps = NULL;
}
dnodeResetEps(eps);
if (eps) free(eps);
dnodeUpdateEp(dnodeGetDnodeId(), tsLocalEp, tsLocalFqdn, &tsServerPort);
terrno = 0;
return 0;
}
static int32_t dnodeWriteEps() {
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeEps.json", tsDnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s, reason:%s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeNum\": %d,\n", tsEps->dnodeNum);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
for (int32_t i = 0; i < tsEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsEps->dnodeEps[i];
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", ep->dnodeId);
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
len += snprintf(content + len, maxLen - len, " \"dnodePort\": %u\n", ep->dnodePort);
if (i < tsEps->dnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
terrno = 0;
dInfo("successed to write %s", file);
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMInfos.h"
static SMnodeInfos tsMInfos;
static SRpcEpSet tsMEpSet;
static pthread_mutex_t tsMInfosMutex;
static void dnodeResetMInfos(SMnodeInfos *minfos);
static void dnodePrintMInfos(SMnodeInfos *minfos);
static int32_t dnodeReadMInfos();
static int32_t dnodeWriteMInfos();
int32_t dnodeInitMInfos() {
pthread_mutex_init(&tsMInfosMutex, NULL);
dnodeResetMInfos(NULL);
int32_t ret = dnodeReadMInfos();
if (ret == 0) {
dInfo("dnode minfos is initialized");
}
return ret;
}
void dnodeCleanupMInfos() { pthread_mutex_destroy(&tsMInfosMutex); }
void dnodeUpdateMInfos(SMnodeInfos *minfos) {
if (minfos->mnodeNum <= 0 || minfos->mnodeNum > 3) {
dError("invalid mnode infos, mnodeNum:%d", minfos->mnodeNum);
return;
}
for (int32_t i = 0; i < minfos->mnodeNum; ++i) {
SMnodeInfo *minfo = &minfos->mnodeInfos[i];
minfo->mnodeId = htonl(minfo->mnodeId);
if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) {
dError("invalid mnode info:%d, mnodeId:%d mnodeEp:%s", i, minfo->mnodeId, minfo->mnodeEp);
return;
}
}
pthread_mutex_lock(&tsMInfosMutex);
if (minfos->mnodeNum != tsMInfos.mnodeNum) {
dnodeResetMInfos(minfos);
dnodeWriteMInfos();
sdbUpdateAsync();
} else {
int32_t size = sizeof(SMnodeInfos);
if (memcmp(minfos, &tsMInfos, size) != 0) {
dnodeResetMInfos(minfos);
dnodeWriteMInfos();
sdbUpdateAsync();
}
}
pthread_mutex_unlock(&tsMInfosMutex);
}
void dnodeUpdateEpSetForPeer(SRpcEpSet *ep) {
if (ep->numOfEps <= 0) {
dError("mnode EP list for peer is changed, but content is invalid, discard it");
return;
}
pthread_mutex_lock(&tsMInfosMutex);
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse);
for (int i = 0; i < ep->numOfEps; ++i) {
ep->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
}
tsMEpSet = *ep;
pthread_mutex_unlock(&tsMInfosMutex);
}
bool dnodeIsMasterEp(char *ep) {
pthread_mutex_lock(&tsMInfosMutex);
bool isMaster = strcmp(ep, tsMInfos.mnodeInfos[tsMEpSet.inUse].mnodeEp) == 0;
pthread_mutex_unlock(&tsMInfosMutex);
return isMaster;
}
void dnodeGetMInfos(SMnodeInfos *minfos) {
pthread_mutex_lock(&tsMInfosMutex);
memcpy(minfos, &tsMInfos, sizeof(SMnodeInfos));
for (int32_t i = 0; i < tsMInfos.mnodeNum; ++i) {
minfos->mnodeInfos[i].mnodeId = htonl(tsMInfos.mnodeInfos[i].mnodeId);
}
pthread_mutex_unlock(&tsMInfosMutex);
}
void dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsMInfosMutex);
*epSet = tsMEpSet;
for (int i = 0; i < epSet->numOfEps; ++i) {
epSet->port[i] += TSDB_PORT_DNODEDNODE;
}
pthread_mutex_unlock(&tsMInfosMutex);
}
void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsMInfosMutex);
*epSet = tsMEpSet;
pthread_mutex_unlock(&tsMInfosMutex);
}
static void dnodePrintMInfos(SMnodeInfos *minfos) {
dInfo("print mnode infos, mnodeNum:%d inUse:%d", minfos->mnodeNum, minfos->inUse);
for (int32_t i = 0; i < minfos->mnodeNum; i++) {
dInfo("mnode index:%d, %s", minfos->mnodeInfos[i].mnodeId, minfos->mnodeInfos[i].mnodeEp);
}
}
static void dnodeResetMInfos(SMnodeInfos *minfos) {
if (minfos == NULL) {
tsMEpSet.numOfEps = 1;
taosGetFqdnPortFromEp(tsFirst, tsMEpSet.fqdn[0], &tsMEpSet.port[0]);
if (strcmp(tsSecond, tsFirst) != 0) {
tsMEpSet.numOfEps = 2;
taosGetFqdnPortFromEp(tsSecond, tsMEpSet.fqdn[1], &tsMEpSet.port[1]);
}
return;
}
if (minfos->mnodeNum == 0) return;
int32_t size = sizeof(SMnodeInfos);
memcpy(&tsMInfos, minfos, size);
tsMEpSet.inUse = tsMInfos.inUse;
tsMEpSet.numOfEps = tsMInfos.mnodeNum;
for (int32_t i = 0; i < tsMInfos.mnodeNum; i++) {
taosGetFqdnPortFromEp(tsMInfos.mnodeInfos[i].mnodeEp, tsMEpSet.fqdn[i], &tsMEpSet.port[i]);
}
dnodePrintMInfos(minfos);
}
static int32_t dnodeReadMInfos() {
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
SMnodeInfos minfos = {0};
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("failed to read %s, file not exist", file);
goto PARSE_MINFOS_OVER;
}
len = fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s, content is null", file);
goto PARSE_MINFOS_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s, invalid json format", file);
goto PARSE_MINFOS_OVER;
}
cJSON *inUse = cJSON_GetObjectItem(root, "inUse");
if (!inUse || inUse->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, inUse not found");
goto PARSE_MINFOS_OVER;
}
tsMInfos.inUse = inUse->valueint;
cJSON *nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeNum not found");
goto PARSE_MINFOS_OVER;
}
minfos.mnodeNum = nodeNum->valueint;
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
dError("failed to read mnodeEpSet.json, nodeInfos not found");
goto PARSE_MINFOS_OVER;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != minfos.mnodeNum) {
dError("failed to read mnodeEpSet.json, nodeInfos size not matched");
goto PARSE_MINFOS_OVER;
}
for (int i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeId not found");
goto PARSE_MINFOS_OVER;
}
minfos.mnodeInfos[i].mnodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
dError("failed to read mnodeEpSet.json, nodeName not found");
goto PARSE_MINFOS_OVER;
}
strncpy(minfos.mnodeInfos[i].mnodeEp, nodeEp->valuestring, TSDB_EP_LEN);
}
dInfo("read file %s successed", file);
dnodePrintMInfos(&minfos);
PARSE_MINFOS_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
terrno = 0;
for (int32_t i = 0; i < minfos.mnodeNum; ++i) {
SMnodeInfo *mInfo = &minfos.mnodeInfos[i];
dnodeUpdateEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL);
}
dnodeResetMInfos(&minfos);
return 0;
}
static int32_t dnodeWriteMInfos() {
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s, reason:%s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMInfos.inUse);
len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMInfos.mnodeNum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < tsMInfos.mnodeNum; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMInfos.mnodeInfos[i].mnodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMInfos.mnodeInfos[i].mnodeEp);
if (i < tsMInfos.mnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
terrno = 0;
dInfo("successed to write %s", file);
return 0;
}
......@@ -58,7 +58,7 @@ int32_t dnodeInitMnodePeer() {
dDebug("dnode mpeer worker:%d is created", i);
}
dDebug("dnode mpeer is opened, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset);
dDebug("dnode mpeer is initialized, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset);
return 0;
}
......
......@@ -60,7 +60,7 @@ int32_t dnodeInitMnodeRead() {
dDebug("dnode mread worker:%d is created", i);
}
dDebug("dnode mread is opened, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset);
dDebug("dnode mread is initialized, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset);
return 0;
}
......
......@@ -60,7 +60,7 @@ int32_t dnodeInitMnodeWrite() {
dDebug("dnode mwrite worker:%d is created", i);
}
dDebug("dnode mwrite is opened, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset);
dDebug("dnode mwrite is initialized, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset);
return 0;
}
......
......@@ -19,11 +19,15 @@
#include "tutil.h"
#include "tconfig.h"
#include "tglobal.h"
#include "twal.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodePeer.h"
#include "dnodeModule.h"
#include "dnodeEps.h"
#include "dnodeMInfos.h"
#include "dnodeCfg.h"
#include "dnodeCheck.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
......@@ -33,26 +37,32 @@
#include "dnodeShell.h"
#include "dnodeTelemetry.h"
static int32_t dnodeInitStorage();
static void dnodeCleanupStorage();
static void dnodeSetRunStatus(SDnodeRunStatus status);
static void dnodeCheckDataDirOpenned(char *dir);
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
static int32_t dnodeInitStorage();
static void dnodeCleanupStorage();
static void dnodeSetRunStatus(SDnodeRunStatus status);
static void dnodeCheckDataDirOpenned(char *dir);
static int32_t dnodeInitComponents();
static void dnodeCleanupComponents(int32_t stepId);
static int dnodeCreateDir(const char *dir);
static void dnodeCleanupComponents(int32_t stepId);
static int dnodeCreateDir(const char *dir);
typedef struct {
const char *const name;
int (*init)();
void (*cleanup)();
int32_t (*init)();
void (*cleanup)();
} SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = {
{"storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
{"dnodeeps", dnodeInitEps, dnodeCleanupEps},
{"globalcfg" ,taosCheckGlobalCfg, NULL},
{"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos},
{"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
{"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
{"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer},
......@@ -75,7 +85,9 @@ static int dnodeCreateDir(const char *dir) {
static void dnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
tsDnodeComponents[i].cleanup();
if (tsDnodeComponents[i].cleanup) {
(*tsDnodeComponents[i].cleanup)();
}
}
}
......@@ -112,14 +124,13 @@ int32_t dnodeInitSystem() {
printf("failed to init log file\n");
}
if (!taosReadGlobalCfg() || !taosCheckGlobalCfg()) {
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
taosPrintGlobalCfg();
dInfo("start to initialize TDengine on %s", tsLocalEp);
dInfo("start to initialize TDengine");
if (dnodeInitComponents() != 0) {
return -1;
......@@ -198,7 +209,7 @@ static int32_t dnodeInitStorage() {
dnodeCheckDataDirOpenned(tsDnodeDir);
dInfo("storage directory is initialized");
dInfo("dnode storage is initialized at %s", tsDnodeDir);
return 0;
}
......
此差异已折叠。
......@@ -114,6 +114,7 @@ int32_t dnodeInitModules() {
}
}
dInfo("dnode modules is initialized");
return 0;
}
......@@ -146,8 +147,8 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
}
}
bool dnodeStartMnode(void *pMnodes) {
SDMMnodeInfos *mnodes = pMnodes;
bool dnodeStartMnode(SMnodeInfos *minfos) {
SMnodeInfos *mnodes = minfos;
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) {
dDebug("mnode module is already started, module status:%d", tsModuleStatus);
......
......@@ -28,8 +28,8 @@
#include "dnodeMgmt.h"
#include "dnodeVWrite.h"
#include "dnodeMPeer.h"
#include "dnodeMInfos.h"
extern void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet);
static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *);
static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
......@@ -38,10 +38,10 @@ static void *tsDnodeServerRpc = NULL;
static void *tsDnodeClientRpc = NULL;
int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue;
......@@ -72,7 +72,7 @@ int32_t dnodeInitServer() {
return -1;
}
dInfo("inter-dnodes RPC server is opened");
dInfo("dnode inter-dnodes RPC server is initialized");
return 0;
}
......@@ -137,7 +137,7 @@ int32_t dnodeInitClient() {
return -1;
}
dInfo("inter-dnodes rpc client is opened");
dInfo("dnode inter-dnodes rpc client is initialized");
return 0;
}
......@@ -145,13 +145,13 @@ void dnodeCleanupClient() {
if (tsDnodeClientRpc) {
rpcClose(tsDnodeClientRpc);
tsDnodeClientRpc = NULL;
dInfo("inter-dnodes rpc client is closed");
dInfo("dnode inter-dnodes rpc client is closed");
}
}
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeEpSetForPeer(pEpSet);
dnodeUpdateEpSetForPeer(pEpSet);
}
if (dnodeProcessRspMsgFp[pMsg->msgType]) {
......@@ -173,7 +173,7 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeGetEpSetForPeer(&epSet);
rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp);
}
......
......@@ -38,10 +38,10 @@ static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;
......@@ -97,7 +97,7 @@ int32_t dnodeInitShell() {
return -1;
}
dInfo("shell rpc server is opened");
dInfo("dnode shell rpc server is initialized");
return 0;
}
......@@ -146,12 +146,12 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code;
SDMAuthMsg *pMsg = rpcMallocCont(sizeof(SDMAuthMsg));
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, sizeof(pMsg->user));
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pMsg;
rpcMsg.contLen = sizeof(SDMAuthMsg);
rpcMsg.contLen = sizeof(SAuthMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH;
dDebug("user:%s, send auth msg to mnodes", user);
......@@ -161,7 +161,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
} else {
SDMAuthRsp *pRsp = rpcRsp.pCont;
SAuthRsp *pRsp = rpcRsp.pCont;
dDebug("user:%s, auth msg received from mnodes", user);
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
......@@ -176,8 +176,8 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) {
dDebug("vgId:%d, tid:%d send config table msg to mnode", vgId, tid);
int32_t contLen = sizeof(SDMConfigTableMsg);
SDMConfigTableMsg *pMsg = rpcMallocCont(contLen);
int32_t contLen = sizeof(SConfigTableMsg);
SConfigTableMsg *pMsg = rpcMallocCont(contLen);
pMsg->dnodeId = htonl(dnodeGetDnodeId());
pMsg->vgId = htonl(vgId);
......
......@@ -268,7 +268,7 @@ static void dnodeGetEmail(char* filepath) {
return;
}
if (taosTRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
if (taosRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
}
......@@ -299,6 +299,7 @@ int32_t dnodeInitTelemetry() {
dTrace("failed to create telemetry thread, reason:%s", strerror(errno));
}
dInfo("dnode telemetry is initialized");
return 0;
}
......
......@@ -61,7 +61,7 @@ int32_t dnodeInitVnodeRead() {
pWorker->workerId = i;
}
dInfo("dnode read is opened, min worker:%d max worker:%d", readPool.min, readPool.max);
dInfo("dnode read is initialized, min worker:%d max worker:%d", readPool.min, readPool.max);
return 0;
}
......@@ -132,7 +132,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
}
}
void *dnodeAllocateVnodeRqueue(void *pVnode) {
void *dnodeAllocVReadQueue(void *pVnode) {
pthread_mutex_lock(&readPool.mutex);
taos_queue queue = taosOpenQueue();
if (queue == NULL) {
......@@ -167,7 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
return queue;
}
void dnodeFreeVnodeRqueue(void *rqueue) {
void dnodeFreeVReadQueue(void *rqueue) {
taosCloseQueue(rqueue);
// dynamically adjust the number of threads
......
此差异已折叠。
此差异已折叠。
......@@ -64,7 +64,7 @@ typedef struct taosField {
#endif
DLL_EXPORT void taos_init();
DLL_EXPORT void taos_cleanup();
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
......
此差异已折叠。
......@@ -68,7 +68,7 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef int (*FGetWalInfo)(void *ahandle, char *name, uint32_t *index);
typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data
typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type);
......
此差异已折叠。
此差异已折叠。
......@@ -244,7 +244,7 @@ int32_t shellRunCommand(TAOS* con, char* command) {
}
*p++ = c;
if (c == ';') {
if (c == ';' && quote == 0) {
c = *p;
*p = 0;
if (shellRunSingleCommand(con, cmd) < 0) {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册