未验证 提交 bd908f4a 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge branch 'develop' into mergetodevelop

...@@ -82,7 +82,7 @@ TDengine 分布式架构的逻辑结构图如下: ...@@ -82,7 +82,7 @@ TDengine 分布式架构的逻辑结构图如下:
### 节点之间的通讯 ### 节点之间的通讯
**通讯方式:**TDengine系统的各个节点之间的通讯是通过TCP/UDP进行的。因为考虑到物联网场景,数据写入的包一般不大,因此TDengine 除采用TCP做传输之外,还采用UDP方式,因为UDP 更加高效,而且不受连接数的限制。TDengine实现了自己的超时、重传、确认等机制,以确保UDP的可靠传输。对于数据量不到15K的数据包,采取UDP的方式进行传输,超过15K的,或者是查询类的操作,自动采取TCP的方式进行传输。同时,TDengine根据配置和数据包,会自动对数据进行压缩/解压缩,数字签名/认证等处理。对于数据节点之间的数据复制,只采用TCP方式进行数据传输。 **通讯方式:**TDengine系统的各个节点之间的通讯是通过TCP/UDP进行的。因为考虑到物联网场景,数据写入的包一般不大,因此TDengine 除采用TCP做传输之外,还采用UDP方式,因为UDP 更加高效,而且不受连接数的限制。TDengine实现了自己的超时、重传、确认等机制,以确保UDP的可靠传输。对于数据量不到15K的数据包,采取UDP的方式进行传输,超过15K的,或者是查询类的操作,自动采取TCP的方式进行传输。同时,TDengine根据配置和数据包,会自动对数据进行压缩/解压缩,数字签名/认证等处理。对于数据节点之间的数据复制,只采用TCP方式进行数据传输。
**FQDN配置**:一个数据节点有一个或多个FQDN,可以在系统配置文件taos.cfg通过选项“fqdn"进行指定,如果没有指定,系统将自动获取FQDN。如果节点没有配置FQDN,可以直接使用IP地址作为FQDN,但不建议使用,因为IP地址可变,一旦变化,将让集群无法正常工作。一个数据节点的EP(End Point)由FQDN + Port组成 **FQDN配置**:一个数据节点有一个或多个FQDN,可以在系统配置文件taos.cfg通过参数“fqdn"进行指定,如果没有指定,系统将自动获取FQDN。如果节点没有配置FQDN,可以直接将该节点的配置参数fqdn设置为它的IP地址。但不建议使用IP,因为IP地址可变,一旦变化,将让集群无法正常工作。一个数据节点的EP(End Point)由FQDN + Port组成。采用FQDN,需要保证DNS服务正常工作,或者在节点以及应用所在的节点配置好hosts文件
**端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定,对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口,是serverPort+10. 为支持多线程高效的处理UDP数据,每个对内和对外的UDP链接,都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10,总共11个TCP/UDP端口。使用时,需要确保防火墙将这些端口打开。每个数据节点可以配置不同的serverPort。 **端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定,对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口,是serverPort+10. 为支持多线程高效的处理UDP数据,每个对内和对外的UDP链接,都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10,总共11个TCP/UDP端口。使用时,需要确保防火墙将这些端口打开。每个数据节点可以配置不同的serverPort。
......
...@@ -705,6 +705,7 @@ function install_TDengine() { ...@@ -705,6 +705,7 @@ function install_TDengine() {
echo echo
echo -e "\033[44;32;1mTDengine client is installed successfully!${NC}" echo -e "\033[44;32;1mTDengine client is installed successfully!${NC}"
fi fi
touch ~/.taos_history
rm -rf $(tar -tf taos.tar.gz) rm -rf $(tar -tf taos.tar.gz)
} }
......
...@@ -285,10 +285,10 @@ void tscKillConnection(STscObj *pObj) { ...@@ -285,10 +285,10 @@ void tscKillConnection(STscObj *pObj) {
SSqlObj *pSql = pObj->sqlList; SSqlObj *pSql = pObj->sqlList;
while (pSql) { while (pSql) {
//taosStopRpcConn(pSql->thandle);
pSql = pSql->next; pSql = pSql->next;
} }
SSqlStream *pStream = pObj->streamList; SSqlStream *pStream = pObj->streamList;
while (pStream) { while (pStream) {
SSqlStream *tmp = pStream->next; SSqlStream *tmp = pStream->next;
......
...@@ -226,13 +226,17 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -226,13 +226,17 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = &pSql->pRpcCtx, .handle = &pSql->pRpcCtx,
.code = 0 .code = 0
}; };
// NOTE: the rpc context should be acquired before sending data to server. // NOTE: the rpc context should be acquired before sending data to server.
// Otherwise, the pSql object may have been released already during the response function, which is // Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely // processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash. // cause crash.
if (pObj != NULL && pObj->signature == pObj) {
rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg); rpcSendRequest(pObj->pDnodeConn, &pSql->epSet, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else {
//pObj->signature has been reset by other thread, ignore concurrency problem
return TSDB_CODE_TSC_CONN_KILLED;
}
} }
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
......
...@@ -119,11 +119,8 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -119,11 +119,8 @@ int32_t main(int32_t argc, char *argv[]) {
syslog(LOG_INFO, "Started TDengine service successfully."); syslog(LOG_INFO, "Started TDengine service successfully.");
for (int res = tsem_wait(&exitSem); res != 0; res = tsem_wait(&exitSem)) { if (tsem_wait(&exitSem) != 0) {
if (res != EINTR) { syslog(LOG_ERR, "failed to wait exit semphore: %s", strerror(errno));
syslog(LOG_ERR, "failed to wait exit semphore: %d", res);
break;
}
} }
dnodeCleanUpSystem(); dnodeCleanUpSystem();
......
...@@ -97,6 +97,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_APP_ERROR, 0, 0x0211, "Applicatio ...@@ -97,6 +97,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_APP_ERROR, 0, 0x0211, "Applicatio
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_ACTION_IN_PROGRESS, 0, 0x0212, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DISCONNECTED, 0, 0x0213, "Disconnected from service")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, 0, 0x0214, "No write permission")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, 0, 0x0215, "Connection killed")
// mnode // mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, 0, 0x0300, "Message not processed")
......
...@@ -100,7 +100,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { ...@@ -100,7 +100,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
}; };
tstrncpy(connObj.user, user, sizeof(connObj.user)); tstrncpy(connObj.user, user, sizeof(connObj.user));
SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME*1000); SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000);
mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return pConn; return pConn;
......
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
#ifndef TAOS_OS_FUNC_SEMPHONE #ifndef TAOS_OS_FUNC_SEMPHONE
#define tsem_t sem_t #define tsem_t sem_t
#define tsem_init sem_init #define tsem_init sem_init
#define tsem_wait sem_wait int tsem_wait(tsem_t* sem);
#define tsem_post sem_post #define tsem_post sem_post
#define tsem_destroy sem_destroy #define tsem_destroy sem_destroy
#endif #endif
......
...@@ -16,6 +16,18 @@ ...@@ -16,6 +16,18 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#ifndef TAOS_OS_FUNC_SEMPHONE
int tsem_wait(tsem_t* sem) {
int ret = 0;
do {
ret = sem_wait(sem);
} while (ret != 0 && errno == EINTR);
return ret;
}
#endif
#ifndef TAOS_OS_FUNC_SEMPHONE_PTHREAD #ifndef TAOS_OS_FUNC_SEMPHONE_PTHREAD
bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; }
......
...@@ -99,9 +99,7 @@ int main(int argc, char *argv[]) { ...@@ -99,9 +99,7 @@ int main(int argc, char *argv[]) {
sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsArbitratorPort); sInfo("TAOS arbitrator: %s:%d is running", tsNodeFqdn, tsArbitratorPort);
for (int res = tsem_wait(&tsArbSem); res != 0; res = tsem_wait(&tsArbSem)) { tsem_wait(&tsArbSem);
if (res != EINTR) break;
}
taosCloseTcpThreadPool(tsArbTcpPool); taosCloseTcpThreadPool(tsArbTcpPool);
sInfo("TAOS arbitrator is shut down\n"); sInfo("TAOS arbitrator is shut down\n");
......
...@@ -123,11 +123,6 @@ void *taosProcessSchedQueue(void *param) { ...@@ -123,11 +123,6 @@ void *taosProcessSchedQueue(void *param) {
while (1) { while (1) {
if (tsem_wait(&pSched->fullSem) != 0) { if (tsem_wait(&pSched->fullSem) != 0) {
if (errno == EINTR) {
/* sem_wait is interrupted by interrupt, ignore and continue */
uDebug("wait %s fullSem was interrupted", pSched->label);
continue;
}
uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
} }
if (pSched->stop) { if (pSched->stop) {
...@@ -163,12 +158,8 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { ...@@ -163,12 +158,8 @@ int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
return 0; return 0;
} }
while (tsem_wait(&pSched->emptySem) != 0) { if (tsem_wait(&pSched->emptySem) != 0) {
if (errno != EINTR) {
uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
break;
}
uDebug("wait %s emptySem was interrupted", pSched->label);
} }
if (pthread_mutex_lock(&pSched->queueMutex) != 0) if (pthread_mutex_lock(&pSched->queueMutex) != 0)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册