提交 7688a6eb 编写于 作者: S Steven Li

Merge remote-tracking branch 'origin/develop' into feature/crash_gen

......@@ -87,6 +87,7 @@ TDengine系统后台服务由taosd提供,可以在配置文件taos.cfg里修
- httpPort: RESTful服务使用的端口号,所有的HTTP请求(TCP)都需要向该接口发起查询/写入请求。
- dataDir: 数据文件目录,所有的数据文件都将写入该目录。默认值:/var/lib/taos。
- logDir:日志文件目录,客户端和服务器的运行日志文件将写入该目录。默认值:/var/log/taos。
- tempDir:临时文件目录,客户端和服务器的临时文件(主要是查询时用于保存中间结果的问题)将写入该目录。 默认值:Linux下为 /tmp/,Windows下为环境变量 tmp 或 temp 指向的目录。
- arbitrator:系统中裁决器的end point, 缺省值为空。
- role:dnode的可选角色。0-any; 既可作为mnode,也可分配vnode;1-mgmt;只能作为mnode,不能分配vnode;2-dnode;不能作为mnode,只能分配vnode
- debugFlag:运行日志开关。131(输出错误和警告日志),135( 输出错误、警告和调试日志),143( 输出错误、警告、调试和跟踪日志)。默认值:131或135(不同模块有不同的默认值)。
......
......@@ -844,7 +844,7 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
- **PERCENTILE**
```mysql
SELECT PERCENTILE(field_name, P) FROM { tb_name | stb_name } [WHERE clause];
SELECT PERCENTILE(field_name, P) FROM { tb_name } [WHERE clause];
```
功能说明:统计表中某列的值百分比分位数。
返回结果数据类型: 双精度浮点数Double。
......
......@@ -35,7 +35,7 @@ TDengine相对于通用数据库,有超高的压缩比,在绝大多数场景
Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable
```
示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000\*128\*24\*60/15*365 = 44851T。TDengine大概需要消耗44851/5=8970T, 8.9P空间。
示例:1000万台智能电表,每台电表每15分钟采集一次数据,每次采集的数据128字节,那么一年的原始数据量是:10000000\*128\*24\*60/15*365 = 44.8512T。TDengine大概需要消耗44.851/5=8.97024T空间。
用户可以通过参数keep,设置数据在磁盘中的最大保存时长。为进一步减少存储成本,TDengine还提供多级存储,最冷的数据可以存放在最廉价的存储介质上,应用的访问不用做任何调整,只是读取速度降低了。
......
......@@ -616,6 +616,43 @@ HTTP请求URL采用`sqlutc`时,返回结果集的时间戳将采用UTC时间
- httpEnableCompress: 是否支持压缩,默认不支持,目前TDengine仅支持gzip压缩格式
- httpDebugFlag: 日志开关,131:仅错误和报警信息,135:调试信息,143:非常详细的调试信息,默认131
## CSharp Connector
在Windows系统上,C#应用程序可以使用TDengine的原生C接口来执行所有数据库操作,后续版本将提供ORM(dapper)框架驱动。
#### 安装TDengine客户端
C#连接器需要使用`libtaos.so``taos.h`。因此,在使用C#连接器之前,需在程序运行的Windows环境安装TDengine的Windows客户端,以便获得相关驱动文件。
安装完成后,在文件夹`C:/TDengine/examples/C#`中,将会看到两个文件
- TDengineDriver.cs 调用taos.dll文件的Native C方法
- TDengineTest.cs 参考程序示例
在文件夹`C:\Windows\System32`,将会看到`taos.dll`文件
#### 使用方法
- 将C#接口文件TDengineDriver.cs加入到应用程序所在.NET项目中
- 参考TDengineTest.cs来定义数据库连接参数,及执行数据插入、查询等操作的方法
- 因为C#接口需要用到`taos.dll`文件,用户可以将`taos.dll`文件加入.NET解决方案中
#### 注意事项
- `taos.dll`文件使用x64平台编译,所以.NET项目在生成.exe文件时,“解决方案”/“项目”的“平台”请均选择“x64”。
- 此.NET接口目前已经在Visual Studio 2013/2015/2017中验证过,其它VS版本尚待验证。
#### 第三方驱动
Maikebing.Data.Taos是一个TDengine的ADO.Net提供器,支持linux,windows。该开发包由热心贡献者`麦壳饼@@maikebing`提供,具体请参考
```
//接口下载
https://github.com/maikebing/Maikebing.EntityFrameworkCore.Taos
//用法说明
https://www.taosdata.com/blog/2020/11/02/1901.html
```
## Go Connector
......
......@@ -38,9 +38,9 @@
6. 检查防火墙设置,确认TCP/UDP 端口6030-6042 是打开的
7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/lib/taos*里, 并且*/usr/local/lib/taos*在系统库函数搜索路径*LD_LIBRARY_PATH*
7. 对于Linux上的JDBC(ODBC, Python, Go等接口类似)连接, 确保*libtaos.so*在目录*/usr/local/taos/driver*里, 并且*/usr/local/taos/driver*在系统库函数搜索路径*LD_LIBRARY_PATH*
8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*driver/c/taos.dll*在你的系统搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*)
8. 对于windows上的JDBC, ODBC, Python, Go等连接,确保*C:\TDengine\driver\taos.dll*在你的系统库函数搜索目录里 (建议*taos.dll*放在目录 *C:\Windows\System32*)
9. 如果仍不能排除连接故障,请使用命令行工具nc来分别判断指定端口的TCP和UDP连接是否通畅
检查UDP端口连接是否工作:`nc -vuz {hostIP} {port} `
......
......@@ -20,6 +20,9 @@
# data file's directory
# dataDir /var/lib/taos
# temporary file's directory
# tempDir /tmp/
# the arbitrator's fully qualified domain name (FQDN) for TDengine system, for cluster only
# arbitrator arbitrator_hostname:6042
......@@ -256,3 +259,5 @@
# maximum display width of binary and nchar fields in the shell. The parts exceeding this limit will be hidden
# maxBinaryDisplayWidth 30
# enable/disable telemetry reporting
# telemetryReporting 1
\ No newline at end of file
......@@ -121,7 +121,10 @@ function install_config() {
echo -e -n "${GREEN}Enter FQDN:port (like h1.taosdata.com:6030) of an existing TDengine cluster node to join${NC}"
echo
echo -e -n "${GREEN}OR leave it blank to build one${NC}:"
read firstEp
#read firstEp
if exec < /dev/tty; then
read firstEp;
fi
while true; do
if [ ! -z "$firstEp" ]; then
# check the format of the firstEp
......
......@@ -490,7 +490,7 @@ static bool balanceMontiorDropping() {
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
if (pDnode->lastAccess + tsOfflineThreshold > tsAccessSquence) continue;
if (strcmp(pDnode->dnodeEp, dnodeGetMnodeMasterEp()) == 0) continue;
if (dnodeIsMasterEp(pDnode->dnodeEp)) continue;
if (mnodeGetDnodesNum() <= 1) continue;
mLInfo("dnode:%d, set to removing state for it offline:%d seconds", pDnode->dnodeId,
......
......@@ -64,9 +64,8 @@ typedef struct SLocalReducer {
SColumnModel * resColModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure
char * pFinalRes; // result data after interpo
tFilePage * discardData;
SResultInfo * pResInfo;
char* pFinalRes; // result data after interpo
tFilePage* discardData;
bool discard;
int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable
......
......@@ -23,7 +23,7 @@ extern "C" {
#include "tscUtil.h"
#include "tsclient.h"
void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscFetchDatablockForSubquery(SSqlObj* pSql);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
......
......@@ -87,7 +87,7 @@ typedef struct SJoinSupporter {
} SJoinSupporter;
typedef struct SVgroupTableInfo {
SCMVgroupInfo vgInfo;
SVgroupInfo vgInfo;
SArray* itemList; //SArray<STableIdInfo>
} SVgroupTableInfo;
......@@ -228,6 +228,7 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
......@@ -238,7 +239,7 @@ void tscDoQuery(SSqlObj* pSql);
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo);
void* tscVgroupInfoClear(SVgroupsInfo *pInfo);
void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src);
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src);
/**
* The create object function must be successful expect for the out of memory issue.
*
......
......@@ -107,9 +107,6 @@ SSchema tscGetTbnameColumnSchema();
*/
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size);
//todo tags value as well as the table id structure needs refactor
char *tsGetTagsValue(STableMeta *pMeta);
#ifdef __cplusplus
}
#endif
......
......@@ -90,12 +90,12 @@ typedef struct STableComInfo {
int32_t rowSize;
} STableComInfo;
typedef struct SCMCorVgroupInfo {
typedef struct SCorVgroupInfo {
int32_t version;
int8_t inUse;
int8_t numOfEps;
SEpAddr1 epAddr[TSDB_MAX_REPLICA];
} SCMCorVgroupInfo;
} SCorVgroupInfo;
typedef struct STableMeta {
STableComInfo tableInfo;
......@@ -103,8 +103,8 @@ typedef struct STableMeta {
int16_t sversion;
int16_t tversion;
char sTableId[TSDB_TABLE_FNAME_LEN];
SCMVgroupInfo vgroupInfo;
SCMCorVgroupInfo corVgroupInfo;
SVgroupInfo vgroupInfo;
SCorVgroupInfo corVgroupInfo;
STableId id;
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta;
......@@ -431,14 +431,6 @@ void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
*/
void tscFreeSqlResult(SSqlObj *pSql);
/**
* only free part of resources allocated during query.
* TODO remove it later
* Note: this function is multi-thread safe.
* @param pObj
*/
void tscPartiallyFreeSqlObj(SSqlObj *pSql);
/**
* free sql object, release allocated resource
* @param pObj
......@@ -523,7 +515,6 @@ extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
int32_t tscCompareTidTags(const void* p1, const void* p2);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
#ifdef __cplusplus
......
......@@ -176,7 +176,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
}
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
tscFetchDatablockForSubquery(pSql);
} else {
tscProcessSql(pSql);
}
......@@ -226,7 +226,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
// handle the sub queries of join query
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
tscFetchDatablockForSubquery(pSql);
} else if (pRes->completed) {
if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
......
此差异已折叠。
......@@ -49,82 +49,6 @@ typedef struct SCreateBuilder {
} SCreateBuilder;
static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength);
static int32_t getToStringLength(const char *pData, int32_t length, int32_t type) {
char buf[512] = {0};
int32_t len = 0;
int32_t MAX_BOOL_TYPE_LENGTH = 5; // max(strlen("true"), strlen("false"));
switch (type) {
case TSDB_DATA_TYPE_BINARY:
return length;
case TSDB_DATA_TYPE_NCHAR:
return length;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(pData);
len = sprintf(buf, "%lf", dv);
if (strncasecmp("nan", buf, 3) == 0) {
len = 4;
}
} break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(pData);
len = sprintf(buf, "%f", fv);
if (strncasecmp("nan", buf, 3) == 0) {
len = 4;
}
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
len = sprintf(buf, "%" PRId64, *(int64_t *)pData);
break;
case TSDB_DATA_TYPE_BOOL:
len = MAX_BOOL_TYPE_LENGTH;
break;
default:
len = sprintf(buf, "%d", *(int32_t *)pData);
break;
};
return len;
}
/*
* we need to convert all data into string, so we need to sprintf all kinds of
* non-string data into string, and record its length to get the right
* maximum length. The length may be less or greater than its original binary length:
* For example:
* length((short) 1) == 1, less than sizeof(short)
* length((uint64_t) 123456789011) > 12, greater than sizsof(uint64_t)
*/
static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) {
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta;
if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_STREAM_TABLE) {
return 0;
}
char * pTagValue = tsGetTagsValue(pMeta);
SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
int32_t len = getToStringLength(pTagValue, pTagsSchema[0].bytes, pTagsSchema[0].type);
pTagValue += pTagsSchema[0].bytes;
int32_t numOfTags = tscGetNumOfTags(pMeta);
for (int32_t i = 1; i < numOfTags; ++i) {
int32_t tLen = getToStringLength(pTagValue, pTagsSchema[i].bytes, pTagsSchema[i].type);
if (len < tLen) {
len = tLen;
}
pTagValue += pTagsSchema[i].bytes;
}
return len;
}
static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
......@@ -186,8 +110,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
return 0;
}
// the following is handle display tags value for meters created according to metric
char *pTagValue = tsGetTagsValue(pMeta);
// the following is handle display tags for table created according to super table
for (int32_t i = numOfRows; i < totalNumOfRows; ++i) {
// field name
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
......@@ -219,8 +142,6 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
char *target = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i;
const char *src = "TAG";
STR_WITH_MAXSIZE_TO_VARSTR(target, src, pField->bytes);
pTagValue += pSchema[i].bytes;
}
return 0;
......@@ -286,10 +207,10 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
const int32_t TYPE_COLUMN_LENGTH = 16;
const int32_t NOTE_COLUMN_MIN_LENGTH = 8;
int32_t noteFieldLen = tscMaxLengthOfTagsFields(pSql);
if (noteFieldLen == 0) {
noteFieldLen = NOTE_COLUMN_MIN_LENGTH;
}
int32_t noteFieldLen = NOTE_COLUMN_MIN_LENGTH;//tscMaxLengthOfTagsFields(pSql);
// if (noteFieldLen == 0) {
// noteFieldLen = NOTE_COLUMN_MIN_LENGTH;
// }
int32_t rowLen = tscBuildTableSchemaResultFields(pSql, NUM_OF_DESC_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, noteFieldLen);
tscFieldInfoUpdateOffset(pQueryInfo);
......
......@@ -99,12 +99,9 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
pCtx->param[1].i64Key = pQueryInfo->order.orderColId;
}
SResultInfo *pResInfo = &pReducer->pResInfo[i];
pResInfo->bufLen = pExpr->interBytes;
pResInfo->interResultBuf = calloc(1, (size_t) pResInfo->bufLen);
pCtx->resultInfo = &pReducer->pResInfo[i];
pCtx->resultInfo->superTableQ = true;
pCtx->interBufBytes = pExpr->interBytes;
pCtx->resultInfo = calloc(1, pCtx->interBufBytes + sizeof(SResultRowCellInfo));
pCtx->stableQuery = true;
}
int16_t n = 0;
......@@ -345,7 +342,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
size_t numOfCols = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pTempBuffer->num = 0;
pReducer->pResInfo = calloc(numOfCols, sizeof(SResultInfo));
tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pReducer, pDesc);
......@@ -489,13 +485,15 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
tscDebug("%p waiting for delete procedure, status: %d", pSql, status);
}
pLocalReducer->pFillInfo = taosDestoryFillInfo(pLocalReducer->pFillInfo);
pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
tVariantDestroy(&pCtx->tag);
taosTFree(pCtx->resultInfo);
if (pCtx->tagInfo.pTagCtxList != NULL) {
taosTFree(pCtx->tagInfo.pTagCtxList);
}
......@@ -509,15 +507,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
taosTFree(pLocalReducer->pTempBuffer);
taosTFree(pLocalReducer->pResultBuf);
if (pLocalReducer->pResInfo != NULL) {
size_t num = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < num; ++i) {
taosTFree(pLocalReducer->pResInfo[i].interResultBuf);
}
taosTFree(pLocalReducer->pResInfo);
}
if (pLocalReducer->pLoserTree) {
taosTFree(pLocalReducer->pLoserTree->param);
taosTFree(pLocalReducer->pLoserTree);
......@@ -1072,7 +1061,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
continue;
}
SResultInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
SResultRowCellInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (maxOutput < pResInfo->numOfRes) {
maxOutput = pResInfo->numOfRes;
}
......
......@@ -222,7 +222,7 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
}
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SCMHeartBeatMsg *pHeartbeat = pMsg;
SHeartBeatMsg *pHeartbeat = pMsg;
int allocedQueriesNum = pHeartbeat->numOfQueries;
int allocedStreamsNum = pHeartbeat->numOfStreams;
......@@ -277,7 +277,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
}
int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) +
sizeof(SCMHeartBeatMsg);
sizeof(SHeartBeatMsg);
pHeartbeat->connId = htonl(pObj->connId);
pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries);
pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams);
......
此差异已折叠。
......@@ -140,7 +140,7 @@ struct SSchema tscGetTbnameColumnSchema() {
strcpy(s.name, TSQL_TBNAME_L);
return s;
}
static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) {
static void tscInitCorVgroupInfo(SCorVgroupInfo *corVgroupInfo, SVgroupInfo *vgroupInfo) {
corVgroupInfo->version = 0;
corVgroupInfo->inUse = 0;
corVgroupInfo->numOfEps = vgroupInfo->numOfEps;
......@@ -166,7 +166,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->id.tid = pTableMetaMsg->tid;
pTableMeta->id.uid = pTableMetaMsg->uid;
SCMVgroupInfo* pVgroupInfo = &pTableMeta->vgroupInfo;
SVgroupInfo* pVgroupInfo = &pTableMeta->vgroupInfo;
pVgroupInfo->numOfEps = pTableMetaMsg->vgroup.numOfEps;
pVgroupInfo->vgId = pTableMetaMsg->vgroup.vgId;
......@@ -197,28 +197,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
return pTableMeta;
}
/**
* the TableMeta data format in memory is as follows:
*
* +--------------------+
* |STableMeta Body data| sizeof(STableMeta)
* +--------------------+
* |Schema data | numOfTotalColumns * sizeof(SSchema)
* +--------------------+
* |Tags data | tag_col_1.bytes + tag_col_2.bytes + ....
* +--------------------+
*
* @param pTableMeta
* @return
*/
char* tsGetTagsValue(STableMeta* pTableMeta) {
int32_t offset = 0;
// int32_t numOfTotalCols = pTableMeta->numOfColumns + pTableMeta->numOfTags;
// uint32_t offset = sizeof(STableMeta) + numOfTotalCols * sizeof(SSchema);
return ((char*)pTableMeta + offset);
}
// todo refactor
UNUSED_FUNC static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
......
此差异已折叠。
......@@ -32,11 +32,26 @@ typedef struct SInsertSupporter {
static void freeJoinSubqueryObj(SSqlObj* pSql);
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
static bool tsCompare(int32_t order, int64_t left, int64_t right) {
static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
if (left == right) {
return 0;
}
if (order == TSDB_ORDER_ASC) {
return left < right;
return left < right? -1:1;
} else {
return left > right;
return left > right? -1:1;
}
}
static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
while (tsBufNextPos(pTSBuf)) {
STSElem el1 = tsBufGetElem(pTSBuf);
int32_t res = tVariantCompare(el1.tag, tag1);
if (res != 0) { // it is a record with new tag
return;
}
}
}
......@@ -88,32 +103,50 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
int64_t numOfInput1 = 1;
int64_t numOfInput2 = 1;
while(1) {
STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
// no data in pSupporter1 anymore, jump out of loop
if (!tsBufIsValidElem(&elem)) {
break;
}
// find the data in supporter2 with the same tag value
STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
/**
* there are elements in pSupporter2 with the same tag, continue
*/
tVariant tag1 = {0};
tVariantAssign(&tag1, elem.tag);
if (tsBufIsValidElem(&e2)) {
while (1) {
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
#ifdef _DEBUG_VIEW
tscInfo("%" PRId64 ", tags:%"PRId64" \t %" PRId64 ", tags:%"PRId64, elem1.ts, elem1.tag.i64Key, elem2.ts, elem2.tag.i64Key);
#endif
int32_t res = tVariantCompare(elem1.tag, elem2.tag);
if (res == -1 || (res == 0 && tsCompare(order, elem1.ts, elem2.ts))) {
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
// data with current are exhausted
if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
break;
}
numOfInput1++;
} else if ((res > 0) || (res == 0 && tsCompare(order, elem2.ts, elem1.ts))) {
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
skipRemainValue(pSupporter1->pTSBuf, &tag1);
break;
}
numOfInput2++;
} else {
/*
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
* final results which is acquired after the secondry merge of in the client.
* final results which is acquired after the secondary merge of in the client.
*/
int32_t re = tsCompare(order, elem1.ts, elem2.ts);
if (re < 0) {
tsBufNextPos(pSupporter1->pTSBuf);
numOfInput1++;
} else if (re > 0) {
tsBufNextPos(pSupporter2->pTSBuf);
numOfInput2++;
} else {
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (win->skey > elem1.ts) {
win->skey = elem1.ts;
......@@ -125,24 +158,21 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else {
pLimit->offset -= 1;
}
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
break;
pLimit->offset -= 1;//offset apply to projection?
}
tsBufNextPos(pSupporter1->pTSBuf);
numOfInput1++;
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
break;
}
tsBufNextPos(pSupporter2->pTSBuf);
numOfInput2++;
}
}
} else { // no data in pSupporter2, ignore current data in pSupporter2
skipRemainValue(pSupporter1->pTSBuf, &tag1);
}
}
/*
* failed to set the correct ts order yet in two cases:
......@@ -162,8 +192,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
TSKEY et = taosGetTimestampUs();
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elasped time:%"PRId64" us", pSql, numOfInput1, numOfInput2, output1->numOfTotal,
output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1), et - st);
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfVnodes, win->skey, win->ekey,
tsBufGetNumOfVnodes(output1), et - st);
return output1->numOfTotal;
}
......@@ -248,6 +279,68 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
return false;
}
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0;
int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list);
// The virtual node, of which all tables are disqualified after the timestamp intersection,
// is removed to avoid next stage query.
// TODO: If tables from some vnodes are not qualified for next stage query, discard them.
for (int32_t k = 0; k < taosArrayGetSize(pVgroupTables);) {
SVgroupTableInfo* p = taosArrayGet(pVgroupTables, k);
bool found = false;
for (int32_t f = 0; f < num; ++f) {
if (p->vgInfo.vgId == list[f]) {
found = true;
break;
}
}
if (!found) {
tscRemoveVgroupTableGroup(pVgroupTables, k);
} else {
k++;
}
}
assert(taosArrayGetSize(pVgroupTables) > 0);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
taosTFree(list);
}
static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0;
int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list);
size_t numOfGroups = taosArrayGetSize(pVgroupTables);
SArray* pNew = taosArrayInit(num, sizeof(SVgroupTableInfo));
SVgroupTableInfo info;
for (int32_t i = 0; i < num; ++i) {
int32_t vnodeId = list[i];
for (int32_t j = 0; j < numOfGroups; ++j) {
SVgroupTableInfo* p1 = taosArrayGet(pVgroupTables, j);
if (p1->vgInfo.vgId == vnodeId) {
tscVgroupTableCopy(&info, p1);
break;
}
}
taosArrayPush(pNew, &info);
}
taosTFree(list);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
return pNew;
}
/*
* launch secondary stage query to fetch the result that contains timestamp in set
*/
......@@ -322,12 +415,11 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
pQueryInfo->groupbyExpr = pSupporter->groupInfo;
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
tscFieldInfoUpdateOffset(pNewQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
pSupporter->exprList = NULL;
......@@ -341,7 +433,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
* during the timestamp intersection.
*/
pSupporter->limit = pQueryInfo->limit;
pNewQueryInfo->limit = pSupporter->limit;
pQueryInfo->limit = pSupporter->limit;
SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);
......@@ -356,7 +448,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
tscPrintSelectClause(pNew, 0);
tscFieldInfoUpdateOffset(pNewQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo);
pExpr = tscSqlExprGet(pQueryInfo, 0);
}
......@@ -371,39 +463,21 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pExpr->numOfParams = 1;
}
int32_t num = 0;
int32_t *list = NULL;
tsBufGetVnodeIdList(pNewQueryInfo->tsBuf, &num, &list);
if (pTableMetaInfo->pVgroupTables != NULL) {
for(int32_t k = 0; k < taosArrayGetSize(pTableMetaInfo->pVgroupTables);) {
SVgroupTableInfo* p = taosArrayGet(pTableMetaInfo->pVgroupTables, k);
bool found = false;
for(int32_t f = 0; f < num; ++f) {
if (p->vgInfo.vgId == list[f]) {
found = true;
break;
}
}
if (!found) {
tscRemoveVgroupTableGroup(pTableMetaInfo->pVgroupTables, k);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->pVgroupTables != NULL);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
pTableMetaInfo->pVgroupTables = p;
} else {
k++;
filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables);
}
}
assert(taosArrayGetSize(pTableMetaInfo->pVgroupTables) > 0);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
}
taosTFree(list);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
}
//prepare the subqueries object failed, abort
......@@ -509,7 +583,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
SVgroupTableInfo info = {{0}};
for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (tt->vgId == pvg->vgroups[m].vgId) {
tscSCMVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
tscSVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
break;
}
}
......@@ -517,17 +591,29 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
vgTables = taosArrayInit(4, sizeof(STableIdInfo));
info.itemList = vgTables;
if (taosArrayGetSize(result) > 0) {
SVgroupTableInfo* prevGroup = taosArrayGet(result, taosArrayGetSize(result) - 1);
tscDebug("%p vgId:%d, tables:%"PRId64, pSql, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList));
}
taosArrayPush(result, &info);
}
tscDebug("%p tid:%d, uid:%"PRIu64",vgId:%d added for vnode query", pSql, tt->tid, tt->uid, tt->vgId)
STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
taosArrayPush(vgTables, &item);
tscTrace("%p tid:%d, uid:%"PRIu64",vgId:%d added", pSql, tt->tid, tt->uid, tt->vgId);
prev = tt;
}
pTableMetaInfo->pVgroupTables = result;
pTableMetaInfo->vgroupIndex = 0;
if (taosArrayGetSize(result) > 0) {
SVgroupTableInfo* g = taosArrayGet(result, taosArrayGetSize(result) - 1);
tscDebug("%p vgId:%d, tables:%"PRId64, pSql, g->vgInfo.vgId, taosArrayGetSize(g->itemList));
}
}
static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
......@@ -602,11 +688,11 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
}
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
tscDebug("%p all subqueries retrieve <tid, tags> complete, do tags match", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match, %d, %d", pParentSql, p1->num, p2->num);
// sort according to the tag value
qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar);
qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar);
......@@ -655,6 +741,19 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
qsort((*s1)->pData, t1, size, tidTagsCompar);
qsort((*s2)->pData, t2, size, tidTagsCompar);
#if 0
for(int32_t k = 0; k < t1; ++k) {
STidTags* p = (*s1)->pData + size * k;
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
}
for(int32_t k = 0; k < t1; ++k) {
STidTags* p = (*s2)->pData + size * k;
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
}
#endif
tscDebug("%p tags match complete, result: %"PRId64", %"PRId64, pParentSql, t1, t2);
return TSDB_CODE_SUCCESS;
}
......@@ -958,6 +1057,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
}
}
assert(pState->numOfRemain > 0);
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub);
return;
......@@ -971,6 +1071,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
}
// update the records for each subquery in parent sql object.
bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0);
for (int32_t i = 0; i < pState->numOfSub; ++i) {
if (pParentSql->pSubs[i] == NULL) {
tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i);
......@@ -984,7 +1085,10 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pRes1->numOfRows, pRes1->numOfTotal);
assert(pRes1->row < pRes1->numOfRows);
} else {
if (!stableQuery) {
pRes1->numOfClauseTotal += pRes1->numOfRows;
}
tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64, pParentSql, pParentSql->pSubs[i], i,
pRes1->numOfRows, pRes1->numOfTotal);
}
......@@ -994,7 +1098,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
tscBuildResFromSubqueries(pParentSql);
}
void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
void tscFetchDatablockForSubquery(SSqlObj* pSql) {
assert(pSql->subState.numOfSub >= 1);
int32_t numOfFetch = 0;
......@@ -1056,11 +1160,22 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
if (numOfFetch <= 0) {
bool tryNextVnode = false;
SSqlObj* pp = pSql->pSubs[0];
SQueryInfo* pi = tscGetQueryInfoDetail(&pp->cmd, 0);
bool orderedPrjQuery = false;
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0);
orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
if (orderedPrjQuery) {
break;
}
}
// get the number of subquery that need to retrieve the next vnode.
if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) {
if (orderedPrjQuery) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
......@@ -1164,7 +1279,6 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
tscDebug("%p all subquery response, retrieve data for subclause:%d", pSql, pCmd->clauseIndex);
// the column transfer support struct has been built
if (pRes->pColumnIndex != NULL) {
......@@ -1260,21 +1374,23 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
return;
}
// wait for the other subqueries response from vnode
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
return;
}
}
tscSetupOutputColumnIndex(pParentSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
/**
* if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
// pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
......@@ -1671,8 +1787,8 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in
SSqlObj *pParentSql = trsupport->pParentSql;
int32_t subqueryIndex = trsupport->subqueryIndex;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
......@@ -1874,7 +1990,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SCMVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
SVgroupInfo *pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
......@@ -1985,7 +2101,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
// stable query killed or other subquery failed, all query stopped
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
......
......@@ -104,7 +104,6 @@ void taos_init_imp(void) {
taosReadGlobalCfg();
taosCheckGlobalCfg();
taosPrintGlobalCfg();
tscDebug("starting to initialize TAOS client ...");
tscDebug("Local End Point is:%s", tsLocalEp);
......
......@@ -338,34 +338,6 @@ void tscFreeSqlResult(SSqlObj* pSql) {
memset(&pSql->res, 0, sizeof(SSqlRes));
}
void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
}
SSqlCmd* pCmd = &pSql->cmd;
int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscRemoveFromSqlList(pSql);
}
// pSql->sqlstr will be used by tscBuildQueryStreamDesc
// if (pObj->signature == pObj) {
//pthread_mutex_lock(&pObj->mutex);
taosTFree(pSql->sqlstr);
//pthread_mutex_unlock(&pObj->mutex);
// }
tscFreeSqlResult(pSql);
taosTFree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pSql->self = 0;
tscResetSqlCmdObj(pCmd, false);
}
static void tscFreeSubobj(SSqlObj* pSql) {
if (pSql->subState.numOfSub == 0) {
return;
......@@ -434,22 +406,32 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscDebug("%p start to free sqlObj", pSql);
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscFreeSubobj(pSql);
tscPartiallyFreeSqlObj(pSql);
SSqlCmd* pCmd = &pSql->cmd;
int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscRemoveFromSqlList(pSql);
}
pSql->signature = NULL;
pSql->fp = NULL;
taosTFree(pSql->sqlstr);
SSqlCmd* pCmd = &pSql->cmd;
taosTFree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pSql->self = 0;
tscFreeSqlResult(pSql);
tscResetSqlCmdObj(pCmd, false);
memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
taosTFree(pCmd->payload);
pCmd->allocSize = 0;
taosTFree(pSql->sqlstr);
tsem_destroy(&pSql->rspSem);
free(pSql);
}
......@@ -1714,6 +1696,17 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
taosArrayRemove(pVgroupTable, index);
}
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) {
memset(info, 0, sizeof(SVgroupTableInfo));
info->vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
info->itemList = taosArrayClone(pInfo->itemList);
}
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
if (pVgroupTables == NULL) {
return NULL;
......@@ -1725,14 +1718,8 @@ SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
SVgroupTableInfo info;
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
memset(&info, 0, sizeof(SVgroupTableInfo));
info.vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info.vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
tscVgroupTableCopy(&info, pInfo);
info.itemList = taosArrayClone(pInfo->itemList);
taosArrayPush(pa, &info);
}
......@@ -2441,7 +2428,7 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
return NULL;
}
size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups;
size_t size = sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupList->numOfVgroups;
SVgroupsInfo* pNew = calloc(1, size);
if (pNew == NULL) {
return NULL;
......@@ -2450,9 +2437,9 @@ SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
pNew->numOfVgroups = vgroupList->numOfVgroups;
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SCMVgroupInfo* pNewVInfo = &pNew->vgroups[i];
SVgroupInfo* pNewVInfo = &pNew->vgroups[i];
SCMVgroupInfo* pvInfo = &vgroupList->vgroups[i];
SVgroupInfo* pvInfo = &vgroupList->vgroups[i];
pNewVInfo->vgId = pvInfo->vgId;
pNewVInfo->numOfEps = pvInfo->numOfEps;
......@@ -2471,7 +2458,7 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
}
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SCMVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i];
SVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i];
for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) {
taosTFree(pVgroupInfo->epAddr[j].fqdn);
......@@ -2482,7 +2469,7 @@ void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
return NULL;
}
void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src) {
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src) {
dst->vgId = src->vgId;
dst->numOfEps = src->numOfEps;
for(int32_t i = 0; i < dst->numOfEps; ++i) {
......
......@@ -51,6 +51,7 @@ extern char tsLocale[];
extern char tsCharset[]; // default encode string
extern int32_t tsEnableCoreFile;
extern int32_t tsCompressMsgSize;
extern char tsTempDir[];
//query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer for each data node during query processing
......@@ -184,7 +185,7 @@ extern int32_t debugFlag;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
void taosInitGlobalCfg();
bool taosCheckGlobalCfg();
int32_t taosCheckGlobalCfg();
void taosSetAllDebugFlag();
bool taosCfgDynamicOptions(char *msg);
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
......
......@@ -58,6 +58,7 @@ char tsLocale[TSDB_LOCALE_LEN] = {0};
char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string
int32_t tsEnableCoreFile = 0;
int32_t tsMaxBinaryDisplayWidth = 30;
char tsTempDir[TSDB_FILENAME_LEN] = "/tmp/";
/*
* denote if the server needs to compress response message at the application layer to client, including query rsp,
......@@ -1310,13 +1311,23 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "tempDir";
cfg.ptr = tsTempDir;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = tListLen(tsTempDir);
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
}
void taosInitGlobalCfg() {
pthread_once(&tsInitGlobalCfgOnce, doInitGlobalConfig);
}
bool taosCheckGlobalCfg() {
int32_t taosCheckGlobalCfg() {
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
......@@ -1375,7 +1386,9 @@ bool taosCheckGlobalCfg() {
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
tsHttpPort = tsServerPort + TSDB_PORT_HTTP;
return true;
taosPrintGlobalCfg();
return 0;
}
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) {
......
......@@ -108,7 +108,7 @@ void tVariantCreateFromBinary(tVariant *pVar, const char *pz, size_t len, uint32
break;
}
case TSDB_DATA_TYPE_BINARY: { // todo refactor, extract a method
pVar->pz = calloc(len, sizeof(char));
pVar->pz = calloc(len + 1, sizeof(char));
memcpy(pVar->pz, pz, len);
pVar->nLen = (int32_t)len;
break;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_CFG_H
#define TDENGINE_DNODE_CFG_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t dnodeInitCfg();
void dnodeCleanupCfg();
void dnodeUpdateCfg(SDnodeCfg *cfg);
int32_t dnodeGetDnodeId();
void dnodeGetCfg(int32_t *dnodeId, char *clusterId);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_EP_H
#define TDENGINE_DNODE_EP_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
int32_t dnodeInitEps();
void dnodeCleanupEps();
void dnodeUpdateEps(SDnodeEps *eps);
void dnodeUpdateEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_MINFOS_H
#define TDENGINE_DNODE_MINFOS_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
int32_t dnodeInitMInfos();
void dnodeCleanupMInfos();
void dnodeUpdateMInfos(SMnodeInfos *minfos);
void dnodeUpdateEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetMInfos(SMnodeInfos *minfos);
bool dnodeIsMasterEp(char *ep);
#ifdef __cplusplus
}
#endif
#endif
......@@ -20,6 +20,8 @@
extern "C" {
#endif
#include "trpc.h"
int32_t dnodeInitMgmt();
void dnodeCleanupMgmt();
int32_t dnodeInitMgmtTimer();
......@@ -35,8 +37,8 @@ void* dnodeGetVnodeTsdb(void *pVnode);
void dnodeReleaseVnode(void *pVnode);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeEpSetForShell(void *epSet);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
#ifdef __cplusplus
}
......
......@@ -20,9 +20,12 @@
extern "C" {
#endif
int32_t dnodeInitVnodeWrite();
void dnodeCleanupVnodeWrite();
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg);
int32_t dnodeInitVWrite();
void dnodeCleanupVWrite();
void dnodeDispatchToVWriteQueue(SRpcMsg *pMsg);
void * dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeCfg.h"
static SDnodeCfg tsCfg = {0};
static pthread_mutex_t tsCfgMutex;
static int32_t dnodeReadCfg();
static int32_t dnodeWriteCfg();
static void dnodeResetCfg(SDnodeCfg *cfg);
static void dnodePrintCfg(SDnodeCfg *cfg);
int32_t dnodeInitCfg() {
pthread_mutex_init(&tsCfgMutex, NULL);
dnodeResetCfg(NULL);
int32_t ret = dnodeReadCfg();
if (ret == 0) {
dInfo("dnode cfg is initialized");
}
return ret;
}
void dnodeCleanupCfg() { pthread_mutex_destroy(&tsCfgMutex); }
void dnodeUpdateCfg(SDnodeCfg *cfg) {
if (tsCfg.dnodeId != 0) return;
dnodeResetCfg(cfg);
}
int32_t dnodeGetDnodeId() {
int32_t dnodeId = 0;
pthread_mutex_lock(&tsCfgMutex);
dnodeId = tsCfg.dnodeId;
pthread_mutex_unlock(&tsCfgMutex);
return dnodeId;
}
void dnodeGetCfg(int32_t *dnodeId, char *clusterId) {
pthread_mutex_lock(&tsCfgMutex);
*dnodeId = tsCfg.dnodeId;
tstrncpy(clusterId, tsCfg.clusterId, TSDB_CLUSTER_ID_LEN);
pthread_mutex_unlock(&tsCfgMutex);
}
static void dnodeResetCfg(SDnodeCfg *cfg) {
if (cfg == NULL) return;
if (cfg->dnodeId == 0) return;
pthread_mutex_lock(&tsCfgMutex);
tsCfg.dnodeId = cfg->dnodeId;
tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
dnodePrintCfg(cfg);
dnodeWriteCfg();
pthread_mutex_unlock(&tsCfgMutex);
}
static void dnodePrintCfg(SDnodeCfg *cfg) {
dInfo("dnodeId is set to %d, clusterId is set to %s", cfg->dnodeId, cfg->clusterId);
}
static int32_t dnodeReadCfg() {
int32_t len = 0;
int32_t maxLen = 200;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
SDnodeCfg cfg = {0};
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeCfg.json", tsDnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("failed to read %s, file not exist", file);
goto PARSE_CFG_OVER;
}
len = fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s, content is null", file);
goto PARSE_CFG_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s, invalid json format", file);
goto PARSE_CFG_OVER;
}
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s, dnodeId not found", file);
goto PARSE_CFG_OVER;
}
cfg.dnodeId = dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s, clusterId not found", file);
goto PARSE_CFG_OVER;
}
tstrncpy(cfg.clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN);
dInfo("read file %s successed", file);
PARSE_CFG_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
terrno = 0;
dnodeResetCfg(&cfg);
return 0;
}
static int32_t dnodeWriteCfg() {
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s, reason:%s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 200;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsCfg.dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsCfg.clusterId);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
terrno = 0;
dInfo("successed to write %s", file);
return 0;
}
......@@ -15,9 +15,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnodeInt.h"
#include "dnodeCheck.h"
......@@ -264,8 +262,6 @@ int32_t dnodeInitCheck() {
}
}
dInfo("dnode check is initialized");
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "hash.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeEps.h"
static SDnodeEps *tsEps = NULL;
static SHashObj * tsEpsHash = NULL;
static pthread_mutex_t tsEpsMutex;
static int32_t dnodeReadEps();
static int32_t dnodeWriteEps();
static void dnodeResetEps(SDnodeEps *eps);
static void dnodePrintEps(SDnodeEps *eps);
int32_t dnodeInitEps() {
pthread_mutex_init(&tsEpsMutex, NULL);
tsEpsHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
dnodeResetEps(NULL);
int32_t ret = dnodeReadEps();
if (ret == 0) {
dInfo("dnode eps is initialized");
}
return ret;
}
void dnodeCleanupEps() {
pthread_mutex_lock(&tsEpsMutex);
if (tsEps) {
free(tsEps);
tsEps = NULL;
}
if (tsEpsHash) {
taosHashCleanup(tsEpsHash);
tsEpsHash = NULL;
}
pthread_mutex_unlock(&tsEpsMutex);
pthread_mutex_destroy(&tsEpsMutex);
}
void dnodeUpdateEps(SDnodeEps *eps) {
if (eps == NULL) return;
eps->dnodeNum = htonl(eps->dnodeNum);
for (int32_t i = 0; i < eps->dnodeNum; ++i) {
eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId);
eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort);
}
pthread_mutex_lock(&tsEpsMutex);
if (eps->dnodeNum != tsEps->dnodeNum) {
dnodeResetEps(eps);
dnodeWriteEps();
} else {
int32_t size = sizeof(SDnodeEps) + eps->dnodeNum * sizeof(SDnodeEp);
if (memcmp(eps, tsEps, size) != 0) {
dnodeResetEps(eps);
dnodeWriteEps();
}
}
pthread_mutex_unlock(&tsEpsMutex);
}
bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr) {
bool changed = false;
pthread_mutex_lock(&tsEpsMutex);
SDnodeEp *ep = taosHashGet(tsEpsHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
char epSaved[TSDB_EP_LEN + 1];
snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
changed = strcmp(epstr, epSaved) != 0;
tstrncpy(epstr, epSaved, TSDB_EP_LEN);
}
pthread_mutex_unlock(&tsEpsMutex);
return changed;
}
void dnodeUpdateEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
pthread_mutex_lock(&tsEpsMutex);
SDnodeEp *ep = taosHashGet(tsEpsHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) {
if (port) *port = ep->dnodePort;
if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN);
if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
}
pthread_mutex_unlock(&tsEpsMutex);
}
static void dnodeResetEps(SDnodeEps *eps) {
if (eps == NULL) {
int32_t size = sizeof(SDnodeEps) + sizeof(SDnodeEp);
if (tsEps == NULL) {
tsEps = calloc(1, size);
} else {
tsEps->dnodeNum = 0;
}
} else {
assert(tsEps);
int32_t size = sizeof(SDnodeEps) + sizeof(SDnodeEp) * eps->dnodeNum;
if (eps->dnodeNum > tsEps->dnodeNum) {
tsEps = realloc(tsEps, size);
}
memcpy(tsEps, eps, size);
dnodePrintEps(eps);
}
for (int32_t i = 0; i < tsEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsEps->dnodeEps[i];
taosHashPut(tsEpsHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
}
}
static void dnodePrintEps(SDnodeEps *eps) {
dDebug("print dnodeEp, dnodeNum:%d", eps->dnodeNum);
for (int32_t i = 0; i < eps->dnodeNum; i++) {
SDnodeEp *ep = &eps->dnodeEps[i];
dDebug("dnodeId:%d, dnodeFqdn:%s dnodePort:%u", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort);
}
}
static int32_t dnodeReadEps() {
int32_t ret = -1;
int32_t len = 0;
int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
SDnodeEps *eps = NULL;
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeEps.json", tsDnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("failed to read %s, file not exist", file);
goto PRASE_EPS_OVER;
}
len = fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s, content is null", file);
goto PRASE_EPS_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s, invalid json format", file);
goto PRASE_EPS_OVER;
}
cJSON *dnodeNum = cJSON_GetObjectItem(root, "dnodeNum");
if (!dnodeNum || dnodeNum->type != cJSON_Number) {
dError("failed to read %s, dnodeNum not found", file);
goto PRASE_EPS_OVER;
}
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
dError("failed to read %s, dnodeInfos not found", file);
goto PRASE_EPS_OVER;
}
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
if (dnodeInfosSize != dnodeNum->valueint) {
dError("failed to read %s, dnodeInfos size:%d not matched dnodeNum:%d", file, dnodeInfosSize,
(int32_t)dnodeNum->valueint);
goto PRASE_EPS_OVER;
}
int32_t epsSize = sizeof(SDnodeEps) + dnodeInfosSize * sizeof(SDnodeEp);
eps = calloc(1, epsSize);
eps->dnodeNum = dnodeInfosSize;
for (int32_t i = 0; i < dnodeInfosSize; ++i) {
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
if (dnodeInfo == NULL) break;
SDnodeEp *ep = &eps->dnodeEps[i];
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s, dnodeId not found", file);
goto PRASE_EPS_OVER;
}
ep->dnodeId = dnodeId->valueint;
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", file);
goto PRASE_EPS_OVER;
}
strncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
if (!dnodePort || dnodePort->type != cJSON_Number) {
dError("failed to read %s, dnodePort not found", file);
goto PRASE_EPS_OVER;
}
ep->dnodePort = (uint16_t)dnodePort->valueint;
}
ret = 0;
dInfo("read file %s successed", file);
dnodePrintEps(eps);
PRASE_EPS_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
if (ret != 0) {
if (eps) free(eps);
eps = NULL;
}
dnodeResetEps(eps);
if (eps) free(eps);
dnodeUpdateEp(dnodeGetDnodeId(), tsLocalEp, tsLocalFqdn, &tsServerPort);
terrno = 0;
return 0;
}
static int32_t dnodeWriteEps() {
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/dnodeEps.json", tsDnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s, reason:%s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeNum\": %d,\n", tsEps->dnodeNum);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
for (int32_t i = 0; i < tsEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsEps->dnodeEps[i];
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", ep->dnodeId);
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
len += snprintf(content + len, maxLen - len, " \"dnodePort\": %u\n", ep->dnodePort);
if (i < tsEps->dnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
terrno = 0;
dInfo("successed to write %s", file);
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "tglobal.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMInfos.h"
static SMnodeInfos tsMInfos;
static SRpcEpSet tsMEpSet;
static pthread_mutex_t tsMInfosMutex;
static void dnodeResetMInfos(SMnodeInfos *minfos);
static void dnodePrintMInfos(SMnodeInfos *minfos);
static int32_t dnodeReadMInfos();
static int32_t dnodeWriteMInfos();
int32_t dnodeInitMInfos() {
pthread_mutex_init(&tsMInfosMutex, NULL);
dnodeResetMInfos(NULL);
int32_t ret = dnodeReadMInfos();
if (ret == 0) {
dInfo("dnode minfos is initialized");
}
return ret;
}
void dnodeCleanupMInfos() { pthread_mutex_destroy(&tsMInfosMutex); }
void dnodeUpdateMInfos(SMnodeInfos *minfos) {
if (minfos->mnodeNum <= 0 || minfos->mnodeNum > 3) {
dError("invalid mnode infos, mnodeNum:%d", minfos->mnodeNum);
return;
}
for (int32_t i = 0; i < minfos->mnodeNum; ++i) {
SMnodeInfo *minfo = &minfos->mnodeInfos[i];
minfo->mnodeId = htonl(minfo->mnodeId);
if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) {
dError("invalid mnode info:%d, mnodeId:%d mnodeEp:%s", i, minfo->mnodeId, minfo->mnodeEp);
return;
}
}
pthread_mutex_lock(&tsMInfosMutex);
if (minfos->mnodeNum != tsMInfos.mnodeNum) {
dnodeResetMInfos(minfos);
dnodeWriteMInfos();
sdbUpdateAsync();
} else {
int32_t size = sizeof(SMnodeInfos);
if (memcmp(minfos, &tsMInfos, size) != 0) {
dnodeResetMInfos(minfos);
dnodeWriteMInfos();
sdbUpdateAsync();
}
}
pthread_mutex_unlock(&tsMInfosMutex);
}
void dnodeUpdateEpSetForPeer(SRpcEpSet *ep) {
if (ep->numOfEps <= 0) {
dError("mnode EP list for peer is changed, but content is invalid, discard it");
return;
}
pthread_mutex_lock(&tsMInfosMutex);
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse);
for (int i = 0; i < ep->numOfEps; ++i) {
ep->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
}
tsMEpSet = *ep;
pthread_mutex_unlock(&tsMInfosMutex);
}
bool dnodeIsMasterEp(char *ep) {
pthread_mutex_lock(&tsMInfosMutex);
bool isMaster = strcmp(ep, tsMInfos.mnodeInfos[tsMEpSet.inUse].mnodeEp) == 0;
pthread_mutex_unlock(&tsMInfosMutex);
return isMaster;
}
void dnodeGetMInfos(SMnodeInfos *minfos) {
pthread_mutex_lock(&tsMInfosMutex);
memcpy(minfos, &tsMInfos, sizeof(SMnodeInfos));
for (int32_t i = 0; i < tsMInfos.mnodeNum; ++i) {
minfos->mnodeInfos[i].mnodeId = htonl(tsMInfos.mnodeInfos[i].mnodeId);
}
pthread_mutex_unlock(&tsMInfosMutex);
}
void dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsMInfosMutex);
*epSet = tsMEpSet;
for (int i = 0; i < epSet->numOfEps; ++i) {
epSet->port[i] += TSDB_PORT_DNODEDNODE;
}
pthread_mutex_unlock(&tsMInfosMutex);
}
void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsMInfosMutex);
*epSet = tsMEpSet;
pthread_mutex_unlock(&tsMInfosMutex);
}
static void dnodePrintMInfos(SMnodeInfos *minfos) {
dInfo("print mnode infos, mnodeNum:%d inUse:%d", minfos->mnodeNum, minfos->inUse);
for (int32_t i = 0; i < minfos->mnodeNum; i++) {
dInfo("mnode index:%d, %s", minfos->mnodeInfos[i].mnodeId, minfos->mnodeInfos[i].mnodeEp);
}
}
static void dnodeResetMInfos(SMnodeInfos *minfos) {
if (minfos == NULL) {
tsMEpSet.numOfEps = 1;
taosGetFqdnPortFromEp(tsFirst, tsMEpSet.fqdn[0], &tsMEpSet.port[0]);
if (strcmp(tsSecond, tsFirst) != 0) {
tsMEpSet.numOfEps = 2;
taosGetFqdnPortFromEp(tsSecond, tsMEpSet.fqdn[1], &tsMEpSet.port[1]);
}
return;
}
if (minfos->mnodeNum == 0) return;
int32_t size = sizeof(SMnodeInfos);
memcpy(&tsMInfos, minfos, size);
tsMEpSet.inUse = tsMInfos.inUse;
tsMEpSet.numOfEps = tsMInfos.mnodeNum;
for (int32_t i = 0; i < tsMInfos.mnodeNum; i++) {
taosGetFqdnPortFromEp(tsMInfos.mnodeInfos[i].mnodeEp, tsMEpSet.fqdn[i], &tsMEpSet.port[i]);
}
dnodePrintMInfos(minfos);
}
static int32_t dnodeReadMInfos() {
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
SMnodeInfos minfos = {0};
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("failed to read %s, file not exist", file);
goto PARSE_MINFOS_OVER;
}
len = fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s, content is null", file);
goto PARSE_MINFOS_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s, invalid json format", file);
goto PARSE_MINFOS_OVER;
}
cJSON *inUse = cJSON_GetObjectItem(root, "inUse");
if (!inUse || inUse->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, inUse not found");
goto PARSE_MINFOS_OVER;
}
tsMInfos.inUse = inUse->valueint;
cJSON *nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeNum not found");
goto PARSE_MINFOS_OVER;
}
minfos.mnodeNum = nodeNum->valueint;
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
dError("failed to read mnodeEpSet.json, nodeInfos not found");
goto PARSE_MINFOS_OVER;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != minfos.mnodeNum) {
dError("failed to read mnodeEpSet.json, nodeInfos size not matched");
goto PARSE_MINFOS_OVER;
}
for (int i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeId not found");
goto PARSE_MINFOS_OVER;
}
minfos.mnodeInfos[i].mnodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
dError("failed to read mnodeEpSet.json, nodeName not found");
goto PARSE_MINFOS_OVER;
}
strncpy(minfos.mnodeInfos[i].mnodeEp, nodeEp->valuestring, TSDB_EP_LEN);
}
dInfo("read file %s successed", file);
dnodePrintMInfos(&minfos);
PARSE_MINFOS_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
terrno = 0;
for (int32_t i = 0; i < minfos.mnodeNum; ++i) {
SMnodeInfo *mInfo = &minfos.mnodeInfos[i];
dnodeUpdateEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL);
}
dnodeResetMInfos(&minfos);
return 0;
}
static int32_t dnodeWriteMInfos() {
char file[TSDB_FILENAME_LEN + 20] = {0};
sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s, reason:%s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMInfos.inUse);
len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMInfos.mnodeNum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < tsMInfos.mnodeNum; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMInfos.mnodeInfos[i].mnodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMInfos.mnodeInfos[i].mnodeEp);
if (i < tsMInfos.mnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
terrno = 0;
dInfo("successed to write %s", file);
return 0;
}
......@@ -58,7 +58,7 @@ int32_t dnodeInitMnodePeer() {
dDebug("dnode mpeer worker:%d is created", i);
}
dDebug("dnode mpeer is opened, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset);
dDebug("dnode mpeer is initialized, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset);
return 0;
}
......
......@@ -60,7 +60,7 @@ int32_t dnodeInitMnodeRead() {
dDebug("dnode mread worker:%d is created", i);
}
dDebug("dnode mread is opened, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset);
dDebug("dnode mread is initialized, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset);
return 0;
}
......
......@@ -60,7 +60,7 @@ int32_t dnodeInitMnodeWrite() {
dDebug("dnode mwrite worker:%d is created", i);
}
dDebug("dnode mwrite is opened, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset);
dDebug("dnode mwrite is initialized, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset);
return 0;
}
......
......@@ -19,11 +19,15 @@
#include "tutil.h"
#include "tconfig.h"
#include "tglobal.h"
#include "twal.h"
#include "dnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodePeer.h"
#include "dnodeModule.h"
#include "dnodeEps.h"
#include "dnodeMInfos.h"
#include "dnodeCfg.h"
#include "dnodeCheck.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
......@@ -33,26 +37,32 @@
#include "dnodeShell.h"
#include "dnodeTelemetry.h"
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
static int32_t dnodeInitStorage();
static void dnodeCleanupStorage();
static void dnodeSetRunStatus(SDnodeRunStatus status);
static void dnodeCheckDataDirOpenned(char *dir);
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
static int32_t dnodeInitComponents();
static void dnodeCleanupComponents(int32_t stepId);
static int dnodeCreateDir(const char *dir);
typedef struct {
const char *const name;
int (*init)();
int32_t (*init)();
void (*cleanup)();
} SDnodeComponent;
static const SDnodeComponent tsDnodeComponents[] = {
{"storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnodecfg", dnodeInitCfg, dnodeCleanupCfg},
{"dnodeeps", dnodeInitEps, dnodeCleanupEps},
{"globalcfg" ,taosCheckGlobalCfg, NULL},
{"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos},
{"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
{"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},
{"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer},
......@@ -75,7 +85,9 @@ static int dnodeCreateDir(const char *dir) {
static void dnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
tsDnodeComponents[i].cleanup();
if (tsDnodeComponents[i].cleanup) {
(*tsDnodeComponents[i].cleanup)();
}
}
}
......@@ -112,14 +124,13 @@ int32_t dnodeInitSystem() {
printf("failed to init log file\n");
}
if (!taosReadGlobalCfg() || !taosCheckGlobalCfg()) {
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
taosPrintGlobalCfg();
dInfo("start to initialize TDengine on %s", tsLocalEp);
dInfo("start to initialize TDengine");
if (dnodeInitComponents() != 0) {
return -1;
......@@ -198,7 +209,7 @@ static int32_t dnodeInitStorage() {
dnodeCheckDataDirOpenned(tsDnodeDir);
dInfo("storage directory is initialized");
dInfo("dnode storage is initialized at %s", tsDnodeDir);
return 0;
}
......
......@@ -31,12 +31,13 @@
#include "mnode.h"
#include "dnodeInt.h"
#include "dnodeMgmt.h"
#include "dnodeEps.h"
#include "dnodeCfg.h"
#include "dnodeMInfos.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
#include "dnodeModule.h"
#define MPEER_CONTENT_LEN 2000
typedef struct {
pthread_t thread;
int32_t threadIndex;
......@@ -49,20 +50,10 @@ typedef struct {
void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime;
static SRpcEpSet tsDMnodeEpSet = {0};
static SDMMnodeInfos tsDMnodeInfos = {0};
static SDMDnodeCfg tsDnodeCfg = {0};
static taos_qset tsMgmtQset = NULL;
static taos_queue tsMgmtQueue = NULL;
static pthread_t tsQthread;
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes);
static bool dnodeReadMnodeInfos();
static void dnodeSaveMnodeInfos();
static void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg);
static bool dnodeReadDnodeCfg();
static void dnodeSaveDnodeCfg();
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void *dnodeProcessMgmtQueue(void *param);
......@@ -86,28 +77,8 @@ int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
dnodeReadDnodeCfg();
tsRebootTime = taosGetTimestampSec();
if (!dnodeReadMnodeInfos()) {
memset(&tsDMnodeEpSet, 0, sizeof(SRpcEpSet));
memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos));
tsDMnodeEpSet.numOfEps = 1;
taosGetFqdnPortFromEp(tsFirst, tsDMnodeEpSet.fqdn[0], &tsDMnodeEpSet.port[0]);
if (strcmp(tsSecond, tsFirst) != 0) {
tsDMnodeEpSet.numOfEps = 2;
taosGetFqdnPortFromEp(tsSecond, tsDMnodeEpSet.fqdn[1], &tsDMnodeEpSet.port[1]);
}
} else {
tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse;
tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum;
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]);
}
}
int32_t code = vnodeInitResources();
if (code != TSDB_CODE_SUCCESS) {
dnodeCleanupMgmt();
......@@ -381,7 +352,7 @@ static void dnodeCloseVnodes() {
}
static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
......@@ -404,7 +375,7 @@ static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
}
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg);
SCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquire(pCreate->cfg.vgId);
if (pVnode != NULL) {
......@@ -418,7 +389,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
}
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg);
SAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquire(pAlter->cfg.vgId);
if (pVnode != NULL) {
......@@ -433,14 +404,14 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
}
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SMDDropVnodeMsg *pDrop = rpcMsg->pCont;
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return vnodeDrop(pDrop->vgId);
}
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
// SMDAlterStreamMsg *pStream = pCont;
// SAlterStreamMsg *pStream = pCont;
// pStream->uid = htobe64(pStream->uid);
// pStream->stime = htobe64(pStream->stime);
// pStream->vnode = htonl(pStream->vnode);
......@@ -453,12 +424,12 @@ static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
}
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SMDCfgDnodeMsg *pCfg = pMsg->pCont;
SCfgDnodeMsg *pCfg = pMsg->pCont;
return taosCfgDynamicOptions(pCfg->config);
}
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
SMDCreateMnodeMsg *pCfg = pMsg->pCont;
SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dError("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
......@@ -470,10 +441,10 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
}
dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.nodeNum);
for (int i = 0; i < pCfg->mnodes.nodeNum; ++i) {
pCfg->mnodes.nodeInfos[i].nodeId = htonl(pCfg->mnodes.nodeInfos[i].nodeId);
dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.nodeInfos[i].nodeId, pCfg->mnodes.nodeInfos[i].nodeEp);
dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.mnodeNum);
for (int i = 0; i < pCfg->mnodes.mnodeNum; ++i) {
pCfg->mnodes.mnodeInfos[i].mnodeId = htonl(pCfg->mnodes.mnodeInfos[i].mnodeId);
dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp);
}
dnodeStartMnode(&pCfg->mnodes);
......@@ -481,34 +452,6 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
if (pEpSet->numOfEps <= 0) {
dError("mnode EP list for peer is changed, but content is invalid, discard it");
return;
}
dInfo("mnode EP list for peer is changed, numOfEps:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
for (int i = 0; i < pEpSet->numOfEps; ++i) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
}
tsDMnodeEpSet = *pEpSet;
}
void dnodeGetMnodeEpSetForPeer(void *epSetRaw) {
SRpcEpSet *epSet = epSetRaw;
*epSet = tsDMnodeEpSet;
for (int i=0; i<epSet->numOfEps; ++i)
epSet->port[i] += TSDB_PORT_DNODEDNODE;
}
void dnodeGetMnodeEpSetForShell(void *epSetRaw) {
SRpcEpSet *epSet = epSetRaw;
*epSet = tsDMnodeEpSet;
}
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code));
......@@ -516,202 +459,24 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
return;
}
SDMStatusRsp *pStatusRsp = pMsg->pCont;
SDMMnodeInfos *pMnodes = &pStatusRsp->mnodes;
if (pMnodes->nodeNum <= 0) {
dError("status msg is invalid, num of ips is %d", pMnodes->nodeNum);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return;
}
SStatusRsp *pStatusRsp = pMsg->pCont;
SMnodeInfos *minfos = &pStatusRsp->mnodes;
dnodeUpdateMInfos(minfos);
SDMDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
pCfg->moduleStatus = htonl(pCfg->moduleStatus);
pCfg->dnodeId = htonl(pCfg->dnodeId);
for (int32_t i = 0; i < pMnodes->nodeNum; ++i) {
SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i];
pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId);
}
dnodeUpdateCfg(pCfg);
vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
// will not set mnode in status msg
// dnodeProcessModuleStatus(pCfg->moduleStatus);
dnodeUpdateDnodeCfg(pCfg);
SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
dnodeUpdateEps(pEps);
dnodeUpdateMnodeInfos(pMnodes);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
}
static bool dnodeCheckMnodeInfos(SDMMnodeInfos *pMnodes) {
if (pMnodes->nodeNum <= 0 || pMnodes->nodeNum > 3) {
dError("invalid mnode infos, num:%d", pMnodes->nodeNum);
return false;
}
for (int32_t i = 0; i < pMnodes->nodeNum; ++i) {
SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i];
if (pMnodeInfo->nodeId <= 0 || strlen(pMnodeInfo->nodeEp) <= 5) {
dError("invalid mnode info:%d, nodeId:%d nodeEp:%s", i, pMnodeInfo->nodeId, pMnodeInfo->nodeEp);
return false;
}
}
return true;
}
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
bool mnodesChanged = (memcmp(&tsDMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)) != 0);
bool mnodesNotInit = (tsDMnodeInfos.nodeNum == 0);
if (!(mnodesChanged || mnodesNotInit)) return;
if (!dnodeCheckMnodeInfos(pMnodes)) return;
memcpy(&tsDMnodeInfos, pMnodes, sizeof(SDMMnodeInfos));
dInfo("mnode infos is changed, nodeNum:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse);
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
dInfo("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp);
}
tsDMnodeEpSet.inUse = tsDMnodeInfos.inUse;
tsDMnodeEpSet.numOfEps = tsDMnodeInfos.nodeNum;
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeEpSet.fqdn[i], &tsDMnodeEpSet.port[i]);
}
dnodeSaveMnodeInfos();
sdbUpdateAsync();
}
static bool dnodeReadMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN*2] = {0};
sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "r");
if (!fp) {
dDebug("failed to read mnodeEpSet.json, file not exist");
return false;
}
bool ret = false;
int maxLen = 2000;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dError("failed to read mnodeEpSet.json, content is null");
return false;
}
content[len] = 0;
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read mnodeEpSet.json, invalid json format");
goto PARSE_OVER;
}
cJSON* inUse = cJSON_GetObjectItem(root, "inUse");
if (!inUse || inUse->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, inUse not found");
goto PARSE_OVER;
}
tsDMnodeInfos.inUse = inUse->valueint;
cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeNum not found");
goto PARSE_OVER;
}
tsDMnodeInfos.nodeNum = nodeNum->valueint;
cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
dError("failed to read mnodeEpSet.json, nodeInfos not found");
goto PARSE_OVER;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != tsDMnodeInfos.nodeNum) {
dError("failed to read mnodeEpSet.json, nodeInfos size not matched");
goto PARSE_OVER;
}
for (int i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeId not found");
goto PARSE_OVER;
}
tsDMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
dError("failed to read mnodeEpSet.json, nodeName not found");
goto PARSE_OVER;
}
strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN);
}
ret = true;
dInfo("read mnode epSet successed, numOfEps:%d inUse:%d", tsDMnodeInfos.nodeNum, tsDMnodeInfos.inUse);
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
dInfo("mnode:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp);
}
PARSE_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
static void dnodeSaveMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/mnodeEpSet.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "w");
if (!fp) return;
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsDMnodeInfos.inUse);
len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsDMnodeInfos.nodeNum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsDMnodeInfos.nodeInfos[i].nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsDMnodeInfos.nodeInfos[i].nodeEp);
if (i < tsDMnodeInfos.nodeNum -1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
dInfo("save mnode epSet successed");
}
char *dnodeGetMnodeMasterEp() {
return tsDMnodeInfos.nodeInfos[tsDMnodeEpSet.inUse].nodeEp;
}
void* dnodeGetMnodeInfos() {
return &tsDMnodeInfos;
}
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) {
dError("dnode timer is already released");
......@@ -724,22 +489,21 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
return;
}
int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SDMStatusMsg *pStatus = rpcMallocCont(contLen);
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to malloc status message");
return;
}
//strcpy(pStatus->dnodeName, tsDnodeName);
dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId);
pStatus->dnodeId = htonl(dnodeGetDnodeId());
pStatus->version = htonl(tsVersion);
pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId);
pStatus->lastReboot = htonl(tsRebootTime);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
tstrncpy(pStatus->clusterId, tsDnodeCfg.clusterId, TSDB_CLUSTER_ID_LEN);
tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
// fill cluster cfg parameters
......@@ -759,7 +523,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {
......@@ -769,110 +533,19 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
};
SRpcEpSet epSet;
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeGetEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, &rpcMsg);
}
static bool dnodeReadDnodeCfg() {
char dnodeCfgFile[TSDB_FILENAME_LEN*2] = {0};
sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeCfgFile, "r");
if (!fp) {
dDebug("failed to read dnodeCfg.json, file not exist");
return false;
}
bool ret = false;
int maxLen = 100;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dError("failed to read dnodeCfg.json, content is null");
return false;
}
content[len] = 0;
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read dnodeCfg.json, invalid json format");
goto PARSE_CFG_OVER;
}
cJSON* dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read dnodeCfg.json, dnodeId not found");
goto PARSE_CFG_OVER;
}
tsDnodeCfg.dnodeId = dnodeId->valueint;
cJSON* clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read dnodeCfg.json, clusterId not found");
goto PARSE_CFG_OVER;
}
tstrncpy(tsDnodeCfg.clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN);
ret = true;
dInfo("read numOfVnodes successed, dnodeId:%d", tsDnodeCfg.dnodeId);
PARSE_CFG_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
static void dnodeSaveDnodeCfg() {
char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeCfgFile, "w");
if (!fp) return;
int32_t len = 0;
int32_t maxLen = 200;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsDnodeCfg.dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsDnodeCfg.clusterId);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fflush(fp);
fclose(fp);
free(content);
dInfo("save dnodeId successed");
}
void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) {
if (tsDnodeCfg.dnodeId == 0) {
dInfo("dnodeId is set to %d, clusterId is set to %s", pCfg->dnodeId, pCfg->clusterId);
tsDnodeCfg.dnodeId = pCfg->dnodeId;
tstrncpy(tsDnodeCfg.clusterId, pCfg->clusterId, TSDB_CLUSTER_ID_LEN);
dnodeSaveDnodeCfg();
}
}
int32_t dnodeGetDnodeId() {
return tsDnodeCfg.dnodeId;
}
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo);
SRpcEpSet epSet = {0};
if (forShell) {
dnodeGetMnodeEpSetForShell(&epSet);
dnodeGetEpSetForShell(&epSet);
} else {
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeGetEpSetForPeer(&epSet);
}
dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
......
......@@ -114,6 +114,7 @@ int32_t dnodeInitModules() {
}
}
dInfo("dnode modules is initialized");
return 0;
}
......@@ -146,8 +147,8 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
}
}
bool dnodeStartMnode(void *pMnodes) {
SDMMnodeInfos *mnodes = pMnodes;
bool dnodeStartMnode(SMnodeInfos *minfos) {
SMnodeInfos *mnodes = minfos;
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) {
dDebug("mnode module is already started, module status:%d", tsModuleStatus);
......
......@@ -28,8 +28,8 @@
#include "dnodeMgmt.h"
#include "dnodeVWrite.h"
#include "dnodeMPeer.h"
#include "dnodeMInfos.h"
extern void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet);
static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *);
static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
......@@ -38,10 +38,10 @@ static void *tsDnodeServerRpc = NULL;
static void *tsDnodeClientRpc = NULL;
int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeDispatchToVWriteQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeDispatchToMgmtQueue;
......@@ -72,7 +72,7 @@ int32_t dnodeInitServer() {
return -1;
}
dInfo("inter-dnodes RPC server is opened");
dInfo("dnode inter-dnodes RPC server is initialized");
return 0;
}
......@@ -137,7 +137,7 @@ int32_t dnodeInitClient() {
return -1;
}
dInfo("inter-dnodes rpc client is opened");
dInfo("dnode inter-dnodes rpc client is initialized");
return 0;
}
......@@ -145,13 +145,13 @@ void dnodeCleanupClient() {
if (tsDnodeClientRpc) {
rpcClose(tsDnodeClientRpc);
tsDnodeClientRpc = NULL;
dInfo("inter-dnodes rpc client is closed");
dInfo("dnode inter-dnodes rpc client is closed");
}
}
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeEpSetForPeer(pEpSet);
dnodeUpdateEpSetForPeer(pEpSet);
}
if (dnodeProcessRspMsgFp[pMsg->msgType]) {
......@@ -173,7 +173,7 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeGetEpSetForPeer(&epSet);
rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp);
}
......
......@@ -38,10 +38,10 @@ static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;
......@@ -97,7 +97,7 @@ int32_t dnodeInitShell() {
return -1;
}
dInfo("shell rpc server is opened");
dInfo("dnode shell rpc server is initialized");
return 0;
}
......@@ -146,12 +146,12 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code;
SDMAuthMsg *pMsg = rpcMallocCont(sizeof(SDMAuthMsg));
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, sizeof(pMsg->user));
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pMsg;
rpcMsg.contLen = sizeof(SDMAuthMsg);
rpcMsg.contLen = sizeof(SAuthMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH;
dDebug("user:%s, send auth msg to mnodes", user);
......@@ -161,7 +161,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
} else {
SDMAuthRsp *pRsp = rpcRsp.pCont;
SAuthRsp *pRsp = rpcRsp.pCont;
dDebug("user:%s, auth msg received from mnodes", user);
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
......@@ -176,8 +176,8 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) {
dDebug("vgId:%d, tid:%d send config table msg to mnode", vgId, tid);
int32_t contLen = sizeof(SDMConfigTableMsg);
SDMConfigTableMsg *pMsg = rpcMallocCont(contLen);
int32_t contLen = sizeof(SConfigTableMsg);
SConfigTableMsg *pMsg = rpcMallocCont(contLen);
pMsg->dnodeId = htonl(dnodeGetDnodeId());
pMsg->vgId = htonl(vgId);
......
......@@ -268,7 +268,7 @@ static void dnodeGetEmail(char* filepath) {
return;
}
if (taosTRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
if (taosRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
}
......@@ -299,6 +299,7 @@ int32_t dnodeInitTelemetry() {
dTrace("failed to create telemetry thread, reason:%s", strerror(errno));
}
dInfo("dnode telemetry is initialized");
return 0;
}
......
......@@ -61,7 +61,7 @@ int32_t dnodeInitVnodeRead() {
pWorker->workerId = i;
}
dInfo("dnode read is opened, min worker:%d max worker:%d", readPool.min, readPool.max);
dInfo("dnode read is initialized, min worker:%d max worker:%d", readPool.min, readPool.max);
return 0;
}
......@@ -132,7 +132,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
}
}
void *dnodeAllocateVnodeRqueue(void *pVnode) {
void *dnodeAllocVReadQueue(void *pVnode) {
pthread_mutex_lock(&readPool.mutex);
taos_queue queue = taosOpenQueue();
if (queue == NULL) {
......@@ -167,7 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
return queue;
}
void dnodeFreeVnodeRqueue(void *rqueue) {
void dnodeFreeVReadQueue(void *rqueue) {
taosCloseQueue(rqueue);
// dynamically adjust the number of threads
......
此差异已折叠。
......@@ -21,6 +21,7 @@ extern "C" {
#endif
#include "trpc.h"
#include "taosmsg.h"
typedef struct {
int32_t queryReqNum;
......@@ -38,12 +39,13 @@ SDnodeRunStatus dnodeGetRunStatus();
SDnodeStatisInfo dnodeGetStatisInfo();
bool dnodeIsFirstDeploy();
char * dnodeGetMnodeMasterEp();
void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeEpSetForShell(void *epSet);
void * dnodeGetMnodeInfos();
bool dnodeIsMasterEp(char *ep);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
int32_t dnodeGetDnodeId();
bool dnodeStartMnode(void *pModes);
void dnodeUpdateEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr);
bool dnodeStartMnode(SMnodeInfos *minfos);
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
......@@ -51,11 +53,11 @@ void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue);
void *dnodeAllocateVnodeRqueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue);
void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue();
void dnodeFreeMnodePqueue();
......
此差异已折叠。
......@@ -68,7 +68,7 @@ typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, uin
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
typedef int (*FGetWalInfo)(void *ahandle, char *name, uint32_t *index);
typedef int32_t (*FGetWalInfo)(void *ahandle, char *fileName, int64_t *fileId);
// when a forward pkt is received, call this to handle data
typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type);
......
......@@ -19,9 +19,11 @@
extern "C" {
#endif
#define TAOS_WAL_NOLOG 0
#define TAOS_WAL_WRITE 1
#define TAOS_WAL_FSYNC 2
typedef enum {
TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1,
TAOS_WAL_FSYNC = 2
} EWalType;
typedef struct {
int8_t msgType;
......@@ -34,8 +36,9 @@ typedef struct {
} SWalHead;
typedef struct {
int8_t walLevel; // wal level
int32_t vgId;
int32_t fsyncPeriod; // millisecond
int8_t walLevel; // wal level
int8_t wals; // number of WAL files;
int8_t keep; // keep the wal file when closed
} SWalCfg;
......@@ -43,14 +46,18 @@ typedef struct {
typedef void* twalh; // WAL HANDLE
typedef int (*FWalWrite)(void *ahandle, void *pHead, int type);
twalh walOpen(const char *path, const SWalCfg *pCfg);
int walAlter(twalh pWal, const SWalCfg *pCfg);
int32_t walInit();
void walCleanUp();
twalh walOpen(char *path, SWalCfg *pCfg);
int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walStop(twalh);
void walClose(twalh);
int walRenew(twalh);
int walWrite(twalh, SWalHead *);
int32_t walRenew(twalh);
int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh);
int walRestore(twalh, void *pVnode, FWalWrite writeFp);
int walGetWalFile(twalh, char *name, uint32_t *index);
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId);
int64_t walGetVersion(twalh);
#ifdef __cplusplus
......
......@@ -43,10 +43,10 @@ typedef struct {
extern char *vnodeStatus[];
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId);
void* vnodeAcquire(int32_t vgId); // add refcount
......@@ -60,7 +60,7 @@ int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources();
void vnodeCleanupResources();
......
......@@ -244,7 +244,7 @@ int32_t shellRunCommand(TAOS* con, char* command) {
}
*p++ = c;
if (c == ';') {
if (c == ';' && quote == 0) {
c = *p;
*p = 0;
if (shellRunSingleCommand(con, cmd) < 0) {
......
......@@ -45,7 +45,7 @@ void mnodeCleanupProfile();
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port, int32_t pid, const char* app);
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port);
void mnodeReleaseConn(SConnObj *pConn);
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg);
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg);
#ifdef __cplusplus
}
......
......@@ -41,7 +41,7 @@
static void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, SMnodeMsg *pMsg);
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *pMsg);
static int32_t mnodeDropDb(SMnodeMsg *newMsg);
static int32_t mnodeSetDbDropping(SDbObj *pDb);
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
......@@ -352,7 +352,7 @@ static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
return code;
}
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, SMnodeMsg *pMsg) {
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *pMsg) {
int32_t code = acctCheck(pAcct, ACCT_GRANT_DB);
if (code != 0) return code;
......@@ -805,7 +805,7 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
}
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
SCMCreateDbMsg *pCreate = pMsg->rpcMsg.pCont;
SCreateDbMsg *pCreate = pMsg->rpcMsg.pCont;
pCreate->maxTables = htonl(pCreate->maxTables);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCreate->totalBlocks = htonl(pCreate->totalBlocks);
......@@ -830,7 +830,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
return code;
}
static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
SDbCfg newCfg = pDb->cfg;
int32_t maxTables = htonl(pAlter->maxTables);
int32_t cacheBlockSize = htonl(pAlter->cacheBlockSize);
......@@ -977,7 +977,7 @@ static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) {
static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
SDbCfg newCfg = mnodeGetAlterDbOption(pDb, pAlter);
if (terrno != TSDB_CODE_SUCCESS) {
return terrno;
......@@ -1009,7 +1009,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) {
}
static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
SCMAlterDbMsg *pAlter = pMsg->rpcMsg.pCont;
SAlterDbMsg *pAlter = pMsg->rpcMsg.pCont;
mDebug("db:%s, alter db msg is received from thandle:%p", pAlter->db, pMsg->rpcMsg.handle);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pAlter->db);
......@@ -1060,7 +1060,7 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
}
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
SCMDropDbMsg *pDrop = pMsg->rpcMsg.pCont;
SDropDbMsg *pDrop = pMsg->rpcMsg.pCont;
mDebug("db:%s, drop db msg is received from thandle:%p", pDrop->db, pMsg->rpcMsg.handle);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pDrop->db);
......
此差异已折叠。
此差异已折叠。
......@@ -280,7 +280,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
}
// not thread safe, need optimized
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg) {
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) {
pConn->numOfQueries = htonl(pHBMsg->numOfQueries);
if (pConn->numOfQueries > 0) {
if (pConn->pQueries == NULL) {
......@@ -561,7 +561,7 @@ static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = pMsg->pUser;
if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS;
SCMKillQueryMsg *pKill = pMsg->rpcMsg.pCont;
SKillQueryMsg *pKill = pMsg->rpcMsg.pCont;
mInfo("kill query msg is received, queryId:%s", pKill->queryId);
const char delim = ':';
......@@ -592,7 +592,7 @@ static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = pMsg->pUser;
if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS;
SCMKillQueryMsg *pKill = pMsg->rpcMsg.pCont;
SKillQueryMsg *pKill = pMsg->rpcMsg.pCont;
mInfo("kill stream msg is received, streamId:%s", pKill->queryId);
const char delim = ':';
......@@ -623,7 +623,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = pMsg->pUser;
if (strcmp(pUser->user, TSDB_DEFAULT_USER) != 0) return TSDB_CODE_MND_NO_RIGHTS;
SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont;
SKillConnMsg *pKill = pMsg->rpcMsg.pCont;
int32_t connId = atoi(pKill->queryId);
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -52,9 +52,10 @@ extern "C" {
#include "osWindows.h"
#endif
#include "osDef.h"
#include "osAlloc.h"
#include "osAtomic.h"
#include "osCommon.h"
#include "osDef.h"
#include "osDir.h"
#include "osFile.h"
#include "osLz4.h"
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册