提交 61477aa0 编写于 作者: Y yihaoDeng

Merge branch 'develop' of https://github.com/taosdata/TDengine into develop

......@@ -4,7 +4,7 @@ PROJECT(TDengine)
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "2.0.4.0")
SET(TD_VER_NUMBER "2.0.5.1")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -82,8 +82,7 @@ TDengine系统后台服务由taosd提供,可以在配置文件taos.cfg里修
下面仅仅列出一些重要的配置参数,更多的参数请看配置文件里的说明。各个参数的详细介绍及作用请看前述章节,而且这些参数的缺省配置都是工作的,一般无需设置。**注意:配置修改后,需要重启*taosd*服务才能生效。**
- firstEp: taosd启动时,主动连接的集群中第一个dnode的end point, 默认值为localhost:6030。
- secondEp: taosd启动时,如果first连接不上,尝试连接集群中第二个dnode的end point, 默认值为空。
- firstEp: taosd启动时,主动连接的集群中首个dnode的end point, 默认值为localhost:6030。
- fqdn:数据节点的FQDN,缺省为操作系统配置的第一个hostname。如果习惯IP地址访问,可设置为该节点的IP地址。
- serverPort:taosd启动后,对外服务的端口号,默认值为6030。
- httpPort: RESTful服务使用的端口号,所有的HTTP请求(TCP)都需要向该接口发起查询/写入请求, 默认值为6041。
......@@ -156,76 +155,80 @@ TDengine系统的前台交互客户端应用程序为taos,它与taosd共享同
客户端配置参数
- firstEp: taos启动时,主动连接的集群中第一个taosd实例的end point, 缺省值为 localhost:6030。
- secondEp: taos启动时,如果first连接不上,尝试连接集群中第二个taosd实例的end point, 缺省值为空。
- locale
> 默认值:系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置
默认值:系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置
TDengine为存储中文、日文、韩文等非ASCII编码的宽字符,提供一种专门的字段类型nchar。写入nchar字段的数据将统一采用UCS4-LE格式进行编码并发送到服务器。需要注意的是,编码正确性是客户端来保证。因此,如果用户想要正常使用nchar字段来存储诸如中文、日文、韩文等非ASCII字符,需要正确设置客户端的编码格式。
TDengine为存储中文、日文、韩文等非ASCII编码的宽字符,提供一种专门的字段类型nchar。写入nchar字段的数据将统一采用UCS4-LE格式进行编码并发送到服务器。需要注意的是,编码正确性是客户端来保证。因此,如果用户想要正常使用nchar字段来存储诸如中文、日文、韩文等非ASCII字符,需要正确设置客户端的编码格式。
客户端的输入的字符均采用操作系统当前默认的编码格式,在Linux系统上多为UTF-8,部分中文系统编码则可能是GB18030或GBK等。在docker环境中默认的编码是POSIX。在中文版Windows系统中,编码则是CP936。客户端需要确保正确设置自己所使用的字符集,即客户端运行的操作系统当前编码字符集,才能保证nchar中的数据正确转换为UCS4-LE编码格式。
客户端的输入的字符均采用操作系统当前默认的编码格式,在Linux系统上多为UTF-8,部分中文系统编码则可能是GB18030或GBK等。在docker环境中默认的编码是POSIX。在中文版Windows系统中,编码则是CP936。客户端需要确保正确设置自己所使用的字符集,即客户端运行的操作系统当前编码字符集,才能保证nchar中的数据正确转换为UCS4-LE编码格式。
在 Linux 中 locale 的命名规则为: <语言>_<地区>.<字符集编码> 如:zh_CN.UTF-8,zh代表中文,CN代表大陆地区,UTF-8表示字符集。字符集编码为客户端正确解析本地字符串提供编码转换的说明。Linux系统与 Mac OSX 系统可以通过设置locale来确定系统的字符编码,由于Windows使用的locale中不是POSIX标准的locale格式,因此在Windows下需要采用另一个配置参数charset来指定字符编码。在Linux 系统中也可以使用charset来指定字符编码。
在 Linux 中 locale 的命名规则为: <语言>_<地区>.<字符集编码> 如:zh_CN.UTF-8,zh代表中文,CN代表大陆地区,UTF-8表示字符集。字符集编码为客户端正确解析本地字符串提供编码转换的说明。Linux系统与 Mac OSX 系统可以通过设置locale来确定系统的字符编码,由于Windows使用的locale中不是POSIX标准的locale格式,因此在Windows下需要采用另一个配置参数charset来指定字符编码。在Linux 系统中也可以使用charset来指定字符编码。
- charset
> 默认值:系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置
默认值:系统中动态获取,如果自动获取失败,需要用户在配置文件设置或通过API设置
如果配置文件中不设置charset,在Linux系统中,taos在启动时候,自动读取系统当前的locale信息,并从locale信息中解析提取charset编码格式。如果自动读取locale信息失败,则尝试读取charset配置,如果读取charset配置也失败,则中断启动过程。
如果配置文件中不设置charset,在Linux系统中,taos在启动时候,自动读取系统当前的locale信息,并从locale信息中解析提取charset编码格式。如果自动读取locale信息失败,则尝试读取charset配置,如果读取charset配置也失败,则中断启动过程。
在Linux系统中,locale信息包含了字符编码信息,因此正确设置了Linux系统locale以后可以不用再单独设置charset。例如:
```
在Linux系统中,locale信息包含了字符编码信息,因此正确设置了Linux系统locale以后可以不用再单独设置charset。例如:
```
locale zh_CN.UTF-8
```
在Windows系统中,无法从locale获取系统当前编码。如果无法从配置文件中读取字符串编码信息,taos默认设置为字符编码为CP936。其等效在配置文件中添加如下配置:
```
```
在Windows系统中,无法从locale获取系统当前编码。如果无法从配置文件中读取字符串编码信息,taos默认设置为字符编码为CP936。其等效在配置文件中添加如下配置:
```
charset CP936
```
如果需要调整字符编码,请查阅当前操作系统使用的编码,并在配置文件中正确设置。
```
如果需要调整字符编码,请查阅当前操作系统使用的编码,并在配置文件中正确设置。
在Linux系统中,如果用户同时设置了locale和字符集编码charset,并且locale和charset的不一致,后设置的值将覆盖前面设置的值。
```
在Linux系统中,如果用户同时设置了locale和字符集编码charset,并且locale和charset的不一致,后设置的值将覆盖前面设置的值。
```
locale zh_CN.UTF-8
charset GBK
```
则charset的有效值是GBK。
```
```
则charset的有效值是GBK。
```
charset GBK
locale zh_CN.UTF-8
```
charset的有效值是UTF-8。
```
charset的有效值是UTF-8。
日志的配置参数,与server 的配置参数完全一样。
日志的配置参数,与server 的配置参数完全一样。
- timezone
默认值:从系统中动态获取当前的时区设置
客户端运行系统所在的时区。为应对多时区的数据写入和查询问题,TDengine 采用 Unix 时间戳(Unix Timestamp)来记录和存储时间戳。Unix 时间戳的特点决定了任一时刻不论在任何时区,产生的时间戳均一致。需要注意的是,Unix时间戳是在客户端完成转换和记录。为了确保客户端其他形式的时间转换为正确的 Unix 时间戳,需要设置正确的时区。
客户端运行系统所在的时区。为应对多时区的数据写入和查询问题,TDengine 采用 Unix 时间戳(Unix Timestamp)来记录和存储时间戳。Unix 时间戳的特点决定了任一时刻不论在任何时区,产生的时间戳均一致。需要注意的是,Unix时间戳是在客户端完成转换和记录。为了确保客户端其他形式的时间转换为正确的 Unix 时间戳,需要设置正确的时区。
在Linux系统中,客户端会自动读取系统设置的时区信息。用户也可以采用多种方式在配置文件设置时区。例如:
```
在Linux系统中,客户端会自动读取系统设置的时区信息。用户也可以采用多种方式在配置文件设置时区。例如:
```
timezone UTC-8
timezone GMT-8
timezone Asia/Shanghai
```
均是合法的设置东八区时区的格式。
```
均是合法的设置东八区时区的格式。
时区的设置对于查询和写入SQL语句中非Unix时间戳的内容(时间戳字符串、关键词now的解析)产生影响。例如:
```
时区的设置对于查询和写入SQL语句中非Unix时间戳的内容(时间戳字符串、关键词now的解析)产生影响。例如:
```
SELECT count(*) FROM table_name WHERE TS<'2019-04-11 12:01:08';
```
在东八区,SQL语句等效于
```
```
在东八区,SQL语句等效于
```
SELECT count(*) FROM table_name WHERE TS<1554955268000;
```
在UTC时区,SQL语句等效于
```
```
在UTC时区,SQL语句等效于
```
SELECT count(*) FROM table_name WHERE TS<1554984068000;
```
为了避免使用字符串时间格式带来的不确定性,也可以直接使用Unix时间戳。此外,还可以在SQL语句中使用带有时区的时间戳字符串,例如:RFC3339格式的时间戳字符串,2013-04-12T15:52:01.123+08:00或者ISO-8601格式时间戳字符串2013-04-12T15:52:01.123+0800。上述两个字符串转化为Unix时间戳不受系统所在时区的影响。
```
为了避免使用字符串时间格式带来的不确定性,也可以直接使用Unix时间戳。此外,还可以在SQL语句中使用带有时区的时间戳字符串,例如:RFC3339格式的时间戳字符串,2013-04-12T15:52:01.123+08:00或者ISO-8601格式时间戳字符串2013-04-12T15:52:01.123+0800。上述两个字符串转化为Unix时间戳不受系统所在时区的影响。
启动taos时,也可以从命令行指定一个taosd实例的end point,否则就从taos.cfg读取。
- maxBinaryDisplayWidth
启动taos时,也可以从命令行指定一个taosd实例的end point,否则就从taos.cfg读取。
Shell中binary 和 nchar字段的显示宽度上限,超过此限制的部分将被隐藏。默认值:30。可在 shell 中通过命令 set max_binary_display_width nn 动态修改此选项。
## 用户管理
......
......@@ -2,9 +2,11 @@ FROM centos:7
WORKDIR /root
ARG version
RUN echo $version
COPY tdengine.tar.gz /root/
RUN tar -zxf tdengine.tar.gz
WORKDIR /root/TDengine-server/
WORKDIR /root/TDengine-server-$version/
RUN sh install.sh -e no
......
#!/bin/bash
set -x
docker build --rm -f "Dockerfile" -t tdengine/tdengine:$1 "."
docker build --rm -f "Dockerfile" -t tdengine/tdengine:$1 "." --build-arg version=$1
docker login -u tdengine -p $2 #replace the docker registry username and password
docker push tdengine/tdengine:$1
......@@ -272,6 +272,29 @@ function install_config() {
break
fi
done
# user email
#EMAIL_PATTERN='^[A-Za-z0-9\u4e00-\u9fa5]+@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+)+$'
#EMAIL_PATTERN='^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$'
#EMAIL_PATTERN="^[\w-]+(\.[\w-]+)*@[\w-]+(\.[\w-]+)+$"
echo
echo -e -n "${GREEN}Enter your email address for priority support or enter empty to skip${NC}: "
read emailAddr
while true; do
if [ ! -z "$emailAddr" ]; then
# check the format of the emailAddr
#if [[ "$emailAddr" =~ $EMAIL_PATTERN ]]; then
# Write the email address to temp file
email_file="${install_main_dir}/email"
${csudo} bash -c "echo $emailAddr > ${email_file}"
break
#else
# read -p "Please enter the correct email address: " emailAddr
#fi
else
break
fi
done
}
......
......@@ -957,11 +957,11 @@ static void balanceMonitorDnodeModule() {
continue;
}
mLInfo("dnode:%d, numOfMnodes:%d expect:%d, add mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes);
mnodeAddMnode(pDnode->dnodeId);
mLInfo("dnode:%d, numOfMnodes:%d expect:%d, create mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes);
mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, true);
numOfMnodes = mnodeGetMnodesNum();
if (numOfMnodes >= tsNumOfMnodes) return;
// Only create one mnode each time
return;
}
}
......
......@@ -20,6 +20,7 @@
#include "tcache.h"
#include "tnote.h"
#include "trpc.h"
#include "ttimer.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscUtil.h"
......@@ -260,6 +261,9 @@ void taos_close(TAOS *taos) {
return;
}
pObj->signature = NULL;
taosTmrStopA(&(pObj->pTimer));
SSqlObj* pHb = pObj->pHb;
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
if (pHb->pRpcCtx != NULL) { // wait for rsp from dnode
......@@ -698,8 +702,10 @@ void taos_stop_query(TAOS_RES *res) {
tscKillSTableQuery(pSql);
} else {
if (pSql->cmd.command < TSDB_SQL_LOCAL) {
assert(pSql->pRpcCtx != NULL);
rpcCancelRequest(pSql->pRpcCtx);
if (pSql->pRpcCtx != NULL) {
rpcCancelRequest(pSql->pRpcCtx);
pSql->pRpcCtx = NULL;
}
}
}
......
......@@ -34,6 +34,7 @@ extern int32_t tsStatusInterval;
extern int32_t tsNumOfMnodes;
extern int32_t tsEnableVnodeBak;
extern int32_t tsEnableTelemetryReporting;
extern char tsEmail[];
// common
extern int tsRpcTimer;
......
......@@ -42,6 +42,7 @@ int32_t tsStatusInterval = 1; // second
int32_t tsNumOfMnodes = 3;
int32_t tsEnableVnodeBak = 1;
int32_t tsEnableTelemetryReporting = 1;
char tsEmail[TSDB_FQDN_LEN] = {0};
// common
int32_t tsRpcTimer = 1000;
......
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b
......@@ -33,7 +33,8 @@ typedef struct {
} SMPeerWorker;
typedef struct {
int32_t num;
int32_t curNum;
int32_t maxNum;
SMPeerWorker *peerWorker;
} SMPeerWorkerPool;
......@@ -46,37 +47,44 @@ static void *dnodeProcessMnodePeerQueue(void *param);
int32_t dnodeInitMnodePeer() {
tsMPeerQset = taosOpenQset();
tsMPeerPool.num = 1;
tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.num);
tsMPeerPool.maxNum = 1;
tsMPeerPool.curNum = 0;
tsMPeerPool.peerWorker = (SMPeerWorker *)calloc(sizeof(SMPeerWorker), tsMPeerPool.maxNum);
if (tsMPeerPool.peerWorker == NULL) return -1;
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
pWorker->workerId = i;
dDebug("dnode mpeer worker:%d is created", i);
}
dInfo("dnode mpeer is opened");
dDebug("dnode mpeer is opened, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset);
return 0;
}
void dnodeCleanupMnodePeer() {
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsMPeerQset);
}
dDebug("dnode mpeer worker:%d is closed", i);
}
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
for (int32_t i = 0; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
dDebug("dnode mpeer worker:%d start to join", i);
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
dDebug("dnode mpeer worker:%d join success", i);
}
dDebug("dnode mpeer is closed, qset:%p", tsMPeerQset);
taosCloseQset(tsMPeerQset);
tsMPeerQset = NULL;
taosTFree(tsMPeerPool.peerWorker);
dInfo("dnode mpeer is closed");
}
int32_t dnodeAllocateMnodePqueue() {
......@@ -85,7 +93,7 @@ int32_t dnodeAllocateMnodePqueue() {
taosAddIntoQset(tsMPeerQset, tsMPeerQueue, NULL);
for (int32_t i = 0; i < tsMPeerPool.num; ++i) {
for (int32_t i = tsMPeerPool.curNum; i < tsMPeerPool.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerPool.peerWorker + i;
pWorker->workerId = i;
......@@ -98,7 +106,9 @@ int32_t dnodeAllocateMnodePqueue() {
}
pthread_attr_destroy(&thAttr);
dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.num);
tsMPeerPool.curNum = i + 1;
dDebug("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.maxNum);
}
dDebug("dnode mpeer queue:%p is allocated", tsMPeerQueue);
......@@ -106,6 +116,7 @@ int32_t dnodeAllocateMnodePqueue() {
}
void dnodeFreeMnodePqueue() {
dDebug("dnode mpeer queue:%p is freed", tsMPeerQueue);
taosCloseQueue(tsMPeerQueue);
tsMPeerQueue = NULL;
}
......@@ -148,7 +159,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) {
while (1) {
if (taosReadQitemFromQset(tsMPeerQset, &type, (void **)&pPeerMsg, &unUsed) == 0) {
dDebug("dnodeProcessMnodePeerQueue: got no message from qset, exiting...");
dDebug("qset:%p, mnode peer got no message from qset, exiting", tsMPeerQset);
break;
}
......
......@@ -33,7 +33,8 @@ typedef struct {
} SMReadWorker;
typedef struct {
int32_t num;
int32_t curNum;
int32_t maxNum;
SMReadWorker *readWorker;
} SMReadWorkerPool;
......@@ -46,40 +47,46 @@ static void *dnodeProcessMnodeReadQueue(void *param);
int32_t dnodeInitMnodeRead() {
tsMReadQset = taosOpenQset();
tsMReadPool.num = tsNumOfCores * tsNumOfThreadsPerCore / 2;
tsMReadPool.num = MAX(2, tsMReadPool.num);
tsMReadPool.num = MIN(4, tsMReadPool.num);
tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.num);
tsMReadPool.maxNum = tsNumOfCores * tsNumOfThreadsPerCore / 2;
tsMReadPool.maxNum = MAX(2, tsMReadPool.maxNum);
tsMReadPool.maxNum = MIN(4, tsMReadPool.maxNum);
tsMReadPool.curNum = 0;
tsMReadPool.readWorker = (SMReadWorker *)calloc(sizeof(SMReadWorker), tsMReadPool.maxNum);
if (tsMReadPool.readWorker == NULL) return -1;
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
pWorker->workerId = i;
dDebug("dnode mread worker:%d is created", i);
}
dInfo("dnode mread is opened");
dDebug("dnode mread is opened, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset);
return 0;
}
void dnodeCleanupMnodeRead() {
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsMReadQset);
}
dDebug("dnode mread worker:%d is closed", i);
}
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
for (int32_t i = 0; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
dDebug("dnode mread worker:%d start to join", i);
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
dDebug("dnode mread worker:%d start to join", i);
}
dDebug("dnode mread is closed, qset:%p", tsMReadQset);
taosCloseQset(tsMReadQset);
tsMReadQset = NULL;
free(tsMReadPool.readWorker);
dInfo("dnode mread is closed");
}
int32_t dnodeAllocateMnodeRqueue() {
......@@ -88,7 +95,7 @@ int32_t dnodeAllocateMnodeRqueue() {
taosAddIntoQset(tsMReadQset, tsMReadQueue, NULL);
for (int32_t i = 0; i < tsMReadPool.num; ++i) {
for (int32_t i = tsMReadPool.curNum; i < tsMReadPool.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadPool.readWorker + i;
pWorker->workerId = i;
......@@ -101,7 +108,8 @@ int32_t dnodeAllocateMnodeRqueue() {
}
pthread_attr_destroy(&thAttr);
dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.num);
tsMReadPool.curNum = i + 1;
dDebug("dnode mread worker:%d is launched, total:%d", pWorker->workerId, tsMReadPool.maxNum);
}
dDebug("dnode mread queue:%p is allocated", tsMReadQueue);
......@@ -109,6 +117,7 @@ int32_t dnodeAllocateMnodeRqueue() {
}
void dnodeFreeMnodeRqueue() {
dDebug("dnode mread queue:%p is freed", tsMReadQueue);
taosCloseQueue(tsMReadQueue);
tsMReadQueue = NULL;
}
......@@ -156,7 +165,7 @@ static void *dnodeProcessMnodeReadQueue(void *param) {
while (1) {
if (taosReadQitemFromQset(tsMReadQset, &type, (void **)&pReadMsg, &unUsed) == 0) {
dDebug("dnodeProcessMnodeReadQueue: got no message from qset, exiting...");
dDebug("qset:%p, mnode read got no message from qset, exiting", tsMReadQset);
break;
}
......
......@@ -34,7 +34,8 @@ typedef struct {
} SMWriteWorker;
typedef struct {
int32_t num;
int32_t curNum;
int32_t maxNum;
SMWriteWorker *writeWorker;
} SMWriteWorkerPool;
......@@ -47,38 +48,45 @@ static void *dnodeProcessMnodeWriteQueue(void *param);
int32_t dnodeInitMnodeWrite() {
tsMWriteQset = taosOpenQset();
tsMWritePool.num = 1;
tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.num);
tsMWritePool.maxNum = 1;
tsMWritePool.curNum = 0;
tsMWritePool.writeWorker = (SMWriteWorker *)calloc(sizeof(SMWriteWorker), tsMWritePool.maxNum);
if (tsMWritePool.writeWorker == NULL) return -1;
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
pWorker->workerId = i;
dDebug("dnode mwrite worker:%d is created", i);
}
dInfo("dnode mwrite is opened");
dDebug("dnode mwrite is opened, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset);
return 0;
}
void dnodeCleanupMnodeWrite() {
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
if (pWorker->thread) {
taosQsetThreadResume(tsMWriteQset);
}
dDebug("dnode mwrite worker:%d is closed", i);
}
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
for (int32_t i = 0; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
dDebug("dnode mwrite worker:%d start to join", i);
if (pWorker->thread) {
pthread_join(pWorker->thread, NULL);
}
dDebug("dnode mwrite worker:%d join success", i);
}
dDebug("dnode mwrite is closed, qset:%p", tsMWriteQset);
taosCloseQset(tsMWriteQset);
tsMWriteQset = NULL;
taosTFree(tsMWritePool.writeWorker);
dInfo("dnode mwrite is closed");
}
int32_t dnodeAllocateMnodeWqueue() {
......@@ -87,7 +95,7 @@ int32_t dnodeAllocateMnodeWqueue() {
taosAddIntoQset(tsMWriteQset, tsMWriteQueue, NULL);
for (int32_t i = 0; i < tsMWritePool.num; ++i) {
for (int32_t i = tsMWritePool.curNum; i < tsMWritePool.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWritePool.writeWorker + i;
pWorker->workerId = i;
......@@ -100,7 +108,8 @@ int32_t dnodeAllocateMnodeWqueue() {
}
pthread_attr_destroy(&thAttr);
dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.num);
tsMWritePool.curNum = i + 1;
dDebug("dnode mwrite worker:%d is launched, total:%d", pWorker->workerId, tsMWritePool.maxNum);
}
dDebug("dnode mwrite queue:%p is allocated", tsMWriteQueue);
......@@ -108,6 +117,7 @@ int32_t dnodeAllocateMnodeWqueue() {
}
void dnodeFreeMnodeWqueue() {
dDebug("dnode mwrite queue:%p is freed", tsMWriteQueue);
taosCloseQueue(tsMWriteQueue);
tsMWriteQueue = NULL;
}
......@@ -122,11 +132,15 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
SMnodeMsg *pWrite = (SMnodeMsg *)taosAllocateQitem(sizeof(SMnodeMsg));
mnodeCreateMsg(pWrite, pMsg);
dDebug("app:%p:%p, msg:%s is put into mwrite queue", pWrite->rpcMsg.ahandle, pWrite, taosMsg[pWrite->rpcMsg.msgType]);
dDebug("app:%p:%p, msg:%s is put into mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
dDebug("app:%p:%p, msg:%s is freed from mwrite queue:%p", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
mnodeCleanupMsg(pWrite);
taosFreeQitem(pWrite);
}
......@@ -158,7 +172,7 @@ static void *dnodeProcessMnodeWriteQueue(void *param) {
while (1) {
if (taosReadQitemFromQset(tsMWriteQset, &type, (void **)&pWrite, &unUsed) == 0) {
dDebug("dnodeProcessMnodeWriteQueue: got no message from qset, exiting...");
dDebug("qset:%p, mnode write got no message from qset, exiting", tsMWriteQset);
break;
}
......@@ -182,8 +196,8 @@ void dnodeReprocessMnodeWriteMsg(void *pMsg) {
dnodeSendRedirectMsg(pMsg, true);
dnodeFreeMnodeWriteMsg(pWrite);
} else {
dDebug("app:%p:%p, msg:%s is reput into mwrite queue, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
dDebug("app:%p:%p, msg:%s is reput into mwrite queue:%p, retry times:%d", pWrite->rpcMsg.ahandle, pWrite,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
......
......@@ -74,14 +74,16 @@ static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg);
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeMsg;
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
dnodeReadDnodeCfg();
......@@ -226,7 +228,7 @@ static void *dnodeProcessMgmtQueue(void *param) {
while (1) {
if (taosReadQitemFromQset(tsMgmtQset, &type, (void **) &pMsg, &handle) == 0) {
dDebug("dnode mgmt got no message from qset, exit ...");
dDebug("qset:%p, dnode mgmt got no message from qset, exit", tsMgmtQset);
break;
}
......@@ -451,10 +453,34 @@ static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
}
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont;
SMDCfgDnodeMsg *pCfg = pMsg->pCont;
return taosCfgDynamicOptions(pCfg->config);
}
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
SMDCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dError("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
}
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
dError("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
}
dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.nodeNum);
for (int i = 0; i < pCfg->mnodes.nodeNum; ++i) {
pCfg->mnodes.nodeInfos[i].nodeId = htonl(pCfg->mnodes.nodeInfos[i].nodeId);
dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.nodeInfos[i].nodeId, pCfg->mnodes.nodeInfos[i].nodeEp);
}
dnodeStartMnode(&pCfg->mnodes);
return TSDB_CODE_SUCCESS;
}
void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
if (pEpSet->numOfEps <= 0) {
dError("mnode EP list for peer is changed, but content is invalid, discard it");
......@@ -465,29 +491,6 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
for (int i = 0; i < pEpSet->numOfEps; ++i) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
if (!mnodeIsRunning()) {
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
dInfo("mnode index:%d %s:%u should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]);
bool find = false;
for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) {
if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) {
dInfo("localEp found in mnode infos");
find = true;
break;
}
}
if (!find) {
dInfo("localEp not found in mnode infos, will set into mnode infos");
tstrncpy(tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeEp, tsLocalEp, TSDB_EP_LEN);
tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeId = dnodeGetDnodeId();
tsDMnodeInfos.nodeNum++;
}
dnodeStartMnode();
}
}
}
tsDMnodeEpSet = *pEpSet;
......@@ -532,7 +535,9 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
}
vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
dnodeProcessModuleStatus(pCfg->moduleStatus);
// will not set mnode in status msg
// dnodeProcessModuleStatus(pCfg->moduleStatus);
dnodeUpdateDnodeCfg(pCfg);
dnodeUpdateMnodeInfos(pMnodes);
......@@ -576,7 +581,7 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
}
dnodeSaveMnodeInfos();
sdbUpdateSync();
sdbUpdateAsync();
}
static bool dnodeReadMnodeInfos() {
......
......@@ -146,7 +146,9 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
}
}
bool dnodeStartMnode() {
bool dnodeStartMnode(void *pMnodes) {
SDMMnodeInfos *mnodes = pMnodes;
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) {
dDebug("mnode module is already started, module status:%d", tsModuleStatus);
return false;
......@@ -156,6 +158,7 @@ bool dnodeStartMnode() {
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
dnodeProcessModuleStatus(moduleStatus);
sdbUpdateSync();
sdbUpdateSync(mnodes);
return true;
}
......@@ -48,6 +48,7 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMnodePeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMnodePeerQueue;
......@@ -170,8 +171,12 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsDnodeClientRpc, epSet, rpcMsg);
}
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet);
rpcSendRecv(tsDnodeClientRpc, &epSet, rpcMsg, rpcRsp);
}
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
rpcSendRecv(tsDnodeClientRpc, epSet, rpcMsg, rpcRsp);
}
\ No newline at end of file
......@@ -156,7 +156,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcRsp = {0};
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp);
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
......@@ -189,7 +189,7 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) {
rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE;
SRpcMsg rpcRsp = {0};
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp);
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
terrno = rpcRsp.code;
if (rpcRsp.code != 0) {
......
......@@ -177,7 +177,8 @@ static void addMemoryInfo(SBufferWriter* bw) {
static void addVersionInfo(SBufferWriter* bw) {
addStringField(bw, "version", version);
addStringField(bw, "buildInfo", buildinfo);
addStringField(bw, "gitInfo", gitinfo);
addStringField(bw, "gitInfo", gitinfo);
addStringField(bw, "email", tsEmail);
}
static void addRuntimeInfo(SBufferWriter* bw) {
......@@ -261,11 +262,27 @@ static void* telemetryThread(void* param) {
return NULL;
}
static void dnodeGetEmail(char* filepath) {
int fd = open(filepath, O_RDONLY);
if (fd < 0) {
return;
}
if (taosTRead(fd, (void *)tsEmail, TSDB_FQDN_LEN) < 0) {
dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
}
close(fd);
}
int32_t dnodeInitTelemetry() {
if (!tsEnableTelemetryReporting) {
return 0;
}
dnodeGetEmail("/usr/local/taos/email");
if (tsem_init(&tsExitSem, 0, 0) == -1) {
// just log the error, it is ok for telemetry to fail
dTrace("failed to create semaphore for telemetry, reason:%s", strerror(errno));
......
......@@ -199,7 +199,7 @@ static void *dnodeProcessReadQueue(void *param) {
while (1) {
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) {
dDebug("dnodeProcessReadQueee: got no message from qset, exiting...");
dDebug("qset:%p dnode read got no message from qset, exiting", readQset);
break;
}
......
......@@ -222,7 +222,7 @@ static void *dnodeProcessWriteQueue(void *param) {
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) {
dDebug("dnodeProcessWriteQueee: got no message from qset, exiting...");
dDebug("qset:%p, dnode write got no message from qset, exiting", pWorker->qset);
break;
}
......
......@@ -43,11 +43,12 @@ void dnodeGetMnodeEpSetForPeer(void *epSet);
void dnodeGetMnodeEpSetForShell(void *epSet);
void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId();
bool dnodeStartMnode();
bool dnodeStartMnode(void *pModes);
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid);
void *dnodeAllocateVnodeWqueue(void *pVnode);
......
......@@ -60,7 +60,8 @@ int32_t mnodeInitSystem();
int32_t mnodeStartSystem();
void mnodeCleanupSystem();
void mnodeStopSystem();
void sdbUpdateSync();
void sdbUpdateAsync();
void sdbUpdateSync(void *pMnodes);
bool mnodeIsRunning();
int32_t mnodeProcessRead(SMnodeMsg *pMsg);
int32_t mnodeProcessWrite(SMnodeMsg *pMsg);
......
......@@ -139,6 +139,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, 0, 0x0339, "Vgroup alr
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_NOT_FREE, 0, 0x033A, "Dnode not avaliable")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CLUSTER_ID, 0, 0x033B, "Cluster id not match")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, 0, 0x033C, "Cluster not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED, 0, 0x033D, "Dnode Id not configured")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED, 0, 0x033E, "Dnode Ep not configured")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "Account already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "Invalid account")
......
......@@ -59,7 +59,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
......@@ -719,6 +719,12 @@ typedef struct {
char ep[TSDB_EP_LEN]; // end point, hostname:port
} SCMCreateDnodeMsg, SCMDropDnodeMsg;
typedef struct {
int32_t dnodeId;
char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port
SDMMnodeInfos mnodes;
} SMDCreateMnodeMsg;
typedef struct {
int32_t dnodeId;
int32_t vgId;
......
......@@ -31,7 +31,7 @@ typedef enum {
int32_t mnodeInitMnodes();
void mnodeCleanupMnodes();
int32_t mnodeAddMnode(int32_t dnodeId);
void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm);
int32_t mnodeDropMnode(int32_t dnodeId);
void mnodeDropMnodeLocal(int32_t dnodeId);
......
......@@ -147,7 +147,7 @@ static int32_t mnodeDnodeActionRestored() {
mnodeCreateDnode(tsLocalEp, NULL);
SDnodeObj *pDnode = mnodeGetDnodeByEp(tsLocalEp);
if (pDnode != NULL) {
mnodeAddMnode(pDnode->dnodeId);
mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, false);
mnodeDecDnodeRef(pDnode);
}
}
......
......@@ -109,7 +109,7 @@ int32_t mnodeStartSystem() {
mInfo("mnode is initialized successfully");
sdbUpdateSync();
sdbUpdateSync(NULL);
return 0;
}
......
......@@ -23,6 +23,8 @@
#include "tutil.h"
#include "tsocket.h"
#include "tdataformat.h"
#include "dnode.h"
#include "mnode.h"
#include "mnodeDef.h"
#include "mnodeInt.h"
#include "mnodeMnode.h"
......@@ -30,6 +32,7 @@
#include "mnodeSdb.h"
#include "mnodeShow.h"
#include "mnodeUser.h"
#include "mnodeVgroup.h"
static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0;
......@@ -266,25 +269,87 @@ void mnodeGetMnodeInfos(void *mnodeInfos) {
mnodeMnodeUnLock();
}
int32_t mnodeAddMnode(int32_t dnodeId) {
static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) {
mDebug("dnode:%d, send create mnode msg to dnode %s", dnodeId, dnodeEp);
SMDCreateMnodeMsg *pCreate = rpcMallocCont(sizeof(SMDCreateMnodeMsg));
if (pCreate == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
} else {
pCreate->dnodeId = htonl(dnodeId);
tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
pCreate->mnodes = tsMnodeInfos;
bool found = false;
for (int i = 0; i < pCreate->mnodes.nodeNum; ++i) {
if (pCreate->mnodes.nodeInfos[i].nodeId == htonl(dnodeId)) {
found = true;
}
}
if (!found) {
pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeId = htonl(dnodeId);
tstrncpy(pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
pCreate->mnodes.nodeNum++;
}
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pCreate;
rpcMsg.contLen = sizeof(SMDCreateMnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_MD_CREATE_MNODE;
SRpcMsg rpcRsp = {0};
SRpcEpSet epSet = mnodeGetEpSetFromIp(pCreate->dnodeEp);
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp, &epSet);
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
mError("dnode:%d, failed to send create mnode msg, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(rpcRsp.code));
} else {
mDebug("dnode:%d, create mnode msg is disposed, mnode is created in dnode", dnodeId);
}
rpcFreeCont(rpcRsp.pCont);
return rpcRsp.code;
}
static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
mError("failed to create mnode, reason:%s", tstrerror(code));
} else {
mDebug("mnode is created successfully");
mnodeUpdateMnodeEpSet();
sdbUpdateAsync();
}
return code;
}
void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
pMnode->mnodeId = dnodeId;
pMnode->createdTime = taosGetTimestampMs();
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.table = tsMnodeSdb,
.pObj = pMnode,
.pObj = pMnode,
.writeCb = mnodeCreateMnodeCb
};
int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
taosTFree(pMnode);
int32_t code = TSDB_CODE_SUCCESS;
if (needConfirm) {
code = mnodeSendCreateMnodeMsg(dnodeId, dnodeEp);
}
mnodeUpdateMnodeEpSet();
if (code != TSDB_CODE_SUCCESS) {
taosTFree(pMnode);
return;
}
return code;
code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to create mnode, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(code));
taosTFree(pMnode);
}
}
void mnodeDropMnodeLocal(int32_t dnodeId) {
......@@ -296,6 +361,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) {
}
mnodeUpdateMnodeEpSet();
sdbUpdateAsync();
}
int32_t mnodeDropMnode(int32_t dnodeId) {
......@@ -315,6 +381,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
sdbDecRef(tsMnodeSdb, pMnode);
mnodeUpdateMnodeEpSet();
sdbUpdateAsync();
return code;
}
......
......@@ -58,10 +58,15 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
mDebug("%p, msg:%s in mpeer queue, will be redireced, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
mDebug("%p, msg:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i]));
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
return TSDB_CODE_RPC_REDIRECT;
......
......@@ -51,14 +51,21 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
mnodeGetMnodeEpSetForShell(epSet);
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
mDebug("%p, msg:%s in mread queue, will be redireced, inUse:%d", pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->inUse);
mDebug("%p, msg:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("mnode index:%d ep:%s:%d", i, epSet->fqdn[i], htons(epSet->port[i]));
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i]));
}
}
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
return TSDB_CODE_RPC_REDIRECT;
}
......
......@@ -91,6 +91,7 @@ typedef struct {
} SSdbWriteWorkerPool;
extern void * tsMnodeTmr;
static void * tsUpdateSyncTmr;
static SSdbObject tsSdbObj = {0};
static taos_qset tsSdbWriteQset;
static taos_qall tsSdbWriteQall;
......@@ -297,27 +298,25 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
taosFreeQitem(pOper);
}
void sdbUpdateSync() {
static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(NULL); }
void sdbUpdateAsync() {
taosTmrReset(sdbUpdateSyncTmrFp, 200, NULL, tsMnodeTmr, &tsUpdateSyncTmr);
}
void sdbUpdateSync(void *pMnodes) {
SDMMnodeInfos *mnodes = pMnodes;
if (!mnodeIsRunning()) {
mDebug("mnode not start yet, update sync info later");
mDebug("mnode not start yet, update sync config later");
return;
}
mDebug("update sync info in sdb");
mDebug("update sync config in sync module, mnodes:%p", pMnodes);
SSyncCfg syncCfg = {0};
int32_t index = 0;
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
SDMMnodeInfo *node = &mnodes->nodeInfos[i];
syncCfg.nodeInfo[i].nodeId = node->nodeId;
taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[i].nodeFqdn, &syncCfg.nodeInfo[i].nodePort);
syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
index++;
}
if (index == 0) {
if (mnodes == NULL) {
void *pIter = NULL;
while (1) {
SMnodeObj *pMnode = NULL;
......@@ -337,9 +336,19 @@ void sdbUpdateSync() {
mnodeDecMnodeRef(pMnode);
}
sdbFreeIter(pIter);
syncCfg.replica = index;
mDebug("mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica);
} else {
for (index = 0; index < mnodes->nodeNum; ++index) {
SDMMnodeInfo *node = &mnodes->nodeInfos[index];
syncCfg.nodeInfo[index].nodeId = node->nodeId;
taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort);
syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC;
}
syncCfg.replica = index;
mDebug("mnodes info input, numOfMnodes:%d", syncCfg.replica);
}
syncCfg.replica = index;
syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2;
bool hasThisDnode = false;
......@@ -350,8 +359,15 @@ void sdbUpdateSync() {
}
}
if (!hasThisDnode) return;
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return;
if (!hasThisDnode) {
sdbDebug("update sync config, this dnode not exist");
return;
}
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) {
sdbDebug("update sync config, info not changed");
return;
}
sdbInfo("work as mnode, replica:%d", syncCfg.replica);
for (int32_t i = 0; i < syncCfg.replica; ++i) {
......@@ -1038,7 +1054,7 @@ static void *sdbWorkerFp(void *param) {
while (1) {
numOfMsgs = taosReadAllQitemsFromQset(tsSdbWriteQset, tsSdbWriteQall, &unUsed);
if (numOfMsgs == 0) {
sdbDebug("sdbWorkerFp: got no message from qset, exiting...");
sdbDebug("qset:%p, sdb got no message from qset, exiting", tsSdbWriteQset);
break;
}
......
......@@ -310,7 +310,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
if (pVgid->pDnode == pDnode) {
mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d", pDnode->dnodeId, pVgroup->vgId, pVgid->role);
mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d:%s", pDnode->dnodeId, pVgroup->vgId, pVgid->role, syncRole[pVgid->role]);
pVgid->role = pVload->role;
if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
pVgroup->inUse = i;
......
......@@ -54,11 +54,17 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet);
mDebug("app:%p:%p, msg:%s will be redireced inUse:%d", pMsg->rpcMsg.ahandle, pMsg, taosMsg[pMsg->rpcMsg.msgType],
epSet->inUse);
mDebug("app:%p:%p, msg:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg->rpcMsg.ahandle, pMsg,
taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse);
for (int32_t i = 0; i < epSet->numOfEps; ++i) {
mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i],
htons(epSet->port[i]));
if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) {
epSet->inUse = (i + 1) % epSet->numOfEps;
mDebug("app:%p:%p, mnode index:%d ep:%s:%d, set inUse to %d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i],
htons(epSet->port[i]), epSet->inUse);
} else {
mDebug("app:%p:%p, mnode index:%d ep:%s:%d", pMsg->rpcMsg.ahandle, pMsg, i, epSet->fqdn[i],
htons(epSet->port[i]));
}
}
return TSDB_CODE_RPC_REDIRECT;
......
......@@ -67,7 +67,7 @@ static void *httpProcessResultQueue(void *param) {
while (1) {
if (taosReadQitemFromQset(tsHttpQset, &type, (void **)&pMsg, &unUsed) == 0) {
httpDebug("httpResultQueue: got no message from qset, exiting...");
httpDebug("qset:%p, http queue got no message from qset, exiting", tsHttpQset);
break;
}
......
......@@ -542,10 +542,7 @@ void rpcCancelRequest(void *handle) {
if (pContext->pConn) {
tDebug("%s, app tries to cancel request", pContext->pConn->info);
pContext->pConn->pReqMsg = NULL;
rpcCloseConn(pContext->pConn);
pContext->pConn = NULL;
rpcFreeCont(pContext->pCont);
}
}
......@@ -613,8 +610,10 @@ static void rpcReleaseConn(SRpcConn *pConn) {
if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg
} else {
// if there is an outgoing message, free it
if (pConn->outType && pConn->pReqMsg)
if (pConn->outType && pConn->pReqMsg) {
if (pConn->pContext) pConn->pContext->pConn = NULL;
rpcFreeMsg(pConn->pReqMsg);
}
}
// memset could not be used, since lockeBy can not be reset
......@@ -1121,9 +1120,13 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;
if (pEpSet->numOfEps > 0) {
memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
tDebug("%s, redirect is received, numOfEps:%d", pConn->info, pContext->epSet.numOfEps);
for (int i=0; i<pContext->epSet.numOfEps; ++i)
tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,
pContext->epSet.inUse);
for (int i = 0; i < pContext->epSet.numOfEps; ++i) {
pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
tDebug("%s, redirect is received, index:%d ep:%s:%u", pConn->info, i, pContext->epSet.fqdn[i],
pContext->epSet.port[i]);
}
}
rpcSendReqToServer(pRpc, pContext);
rpcFreeCont(rpcMsg.pCont);
......
......@@ -525,7 +525,7 @@ static void *taosProcessTcpData(void *param) {
while (pThreadObj->pHead) {
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
taosReportBrokenLink(pFdObj);
}
pthread_mutex_destroy(&(pThreadObj->mutex));
......
......@@ -215,6 +215,9 @@ void syncStop(void *param) {
pthread_mutex_lock(&(pNode->mutex));
if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t));
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
if (pPeer) syncRemovePeer(pPeer);
......@@ -223,9 +226,6 @@ void syncStop(void *param) {
pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
if (pPeer) syncRemovePeer(pPeer);
if (vgIdHash) taosHashRemove(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t));
if (pNode->pFwdTimer) taosTmrStop(pNode->pFwdTimer);
pthread_mutex_unlock(&(pNode->mutex));
syncDecNodeRef(pNode);
......@@ -313,6 +313,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
// always update version
nodeVersion = pWalHead->version;
sDebug("replica:%d nodeRole:%d qtype:%d", pNode->replica, nodeRole, qtype);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded
......@@ -1189,6 +1191,8 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncNode *pNode = param;
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
if (pSyncFwds == NULL) return;
uint64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) {
......
......@@ -2639,8 +2639,7 @@ int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STa
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
int32_t i = 0;
for(; i < size; ++i) {
for(int32_t i = 0; i < size; ++i) {
STableIdInfo *id = taosArrayGet(pTableIdList, i);
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), id->uid);
......@@ -2665,8 +2664,12 @@ int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STa
return terrno;
}
pGroupInfo->numOfTables = i;
taosArrayPush(pGroupInfo->pGroupList, &group);
pGroupInfo->numOfTables = taosArrayGetSize(group);
if (pGroupInfo->numOfTables > 0) {
taosArrayPush(pGroupInfo->pGroupList, &group);
} else {
taosArrayDestroy(group);
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -263,6 +263,7 @@ void taosCloseQset(taos_qset param) {
// thread to exit.
void taosQsetThreadResume(taos_qset param) {
STaosQset *qset = (STaosQset *)param;
uDebug("qset:%p, it will exit", qset);
tsem_post(&qset->sem);
}
......
......@@ -427,7 +427,7 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(false);
// ASSERT(false);
break;
}
......
......@@ -111,24 +111,24 @@ echo "serverPort ${NODE}" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "vDebugFlag 135" >> $TAOS_CFG
echo "tsdbDebugFlag 135" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "jnidebugFlag 135" >> $TAOS_CFG
echo "odbcdebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG
echo "mqttDebugFlag 135" >> $TAOS_CFG
echo "qdebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 143" >> $TAOS_CFG
echo "sdbDebugFlag 143" >> $TAOS_CFG
echo "dDebugFlag 143" >> $TAOS_CFG
echo "vDebugFlag 143" >> $TAOS_CFG
echo "tsdbDebugFlag 143" >> $TAOS_CFG
echo "cDebugFlag 143" >> $TAOS_CFG
echo "jnidebugFlag 143" >> $TAOS_CFG
echo "odbcdebugFlag 143" >> $TAOS_CFG
echo "httpDebugFlag 143" >> $TAOS_CFG
echo "monitorDebugFlag 143" >> $TAOS_CFG
echo "mqttDebugFlag 143" >> $TAOS_CFG
echo "qdebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG
echo "sdebugFlag 135" >> $TAOS_CFG
echo "wdebugFlag 135" >> $TAOS_CFG
echo "cqdebugFlag 135" >> $TAOS_CFG
echo "udebugFlag 143" >> $TAOS_CFG
echo "sdebugFlag 143" >> $TAOS_CFG
echo "wdebugFlag 143" >> $TAOS_CFG
echo "cqdebugFlag 143" >> $TAOS_CFG
echo "monitor 0" >> $TAOS_CFG
echo "monitorInterval 1" >> $TAOS_CFG
echo "http 0" >> $TAOS_CFG
......
......@@ -109,15 +109,10 @@ echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "scriptDir ${CODE_DIR}/../script" >> $TAOS_CFG
echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG
echo "monitorDebugFlag 135" >> $TAOS_CFG
echo "udebugFlag 135" >> $TAOS_CFG
echo "cDebugFlag 143" >> $TAOS_CFG
echo "udebugFlag 143" >> $TAOS_CFG
echo "tablemetakeeptimer 5" >> $TAOS_CFG
echo "wal 0" >> $TAOS_CFG
echo "asyncLog 0" >> $TAOS_CFG
......
......@@ -27,7 +27,16 @@ system sh/exec.sh -n dnode2 -s start
sql create dnode $hostname3
system sh/exec.sh -n dnode3 -s start
sleep 5000
sleep 3000
$x = 0
show2:
$x = $x + 1
sleep 2000
if $x == 10 then
return -1
endi
sql show mnodes
$dnode1Role = $data2_1
$dnode2Role = $data2_2
......@@ -37,6 +46,16 @@ print $dnode1Role
print $dnode2Role
print $dnode3Role
if $dnode1Role != master then
goto show2
endi
if $dnode2Role != slave then
goto show2
endi
if $dnode3Role != slave then
goto show2
endi
print ============================== step3
$count = 2
while $count < 102
......
......@@ -26,11 +26,11 @@ $x = 0
show2:
$x = $x + 1
sleep 2000
if $x == 10 then
if $x == 5 then
return -1
endi
sql show mnodes
sql show mnodes -x show2
print dnode1 ==> $data2_1
print dnode2 ==> $data2_2
if $data2_1 != master then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册