提交 a41d8880 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

Merge branch 'master' into smartlink_svr

#-------------------------------------------------
#
# Project created by QtCreator 2014-02-22T21:50:54
#
#-------------------------------------------------
QT += core gui network sql
greaterThan(QT_MAJOR_VERSION, 4): QT += widgets
TARGET = ZoomPipeline_CtrlSvr
TEMPLATE = app
SOURCES += main.cpp\
maindialog.cpp
HEADERS += maindialog.h
FORMS += maindialog.ui
#include "maindialog.h"
#include <QApplication>
int main(int argc, char *argv[])
{
QApplication a(argc, argv);
MainDialog w;
w.show();
return a.exec();
}
#include "maindialog.h"
#include "ui_maindialog.h"
MainDialog::MainDialog(QWidget *parent) :
QDialog(parent),
ui(new Ui::MainDialog)
{
ui->setupUi(this);
}
MainDialog::~MainDialog()
{
delete ui;
}
#ifndef MAINDIALOG_H
#define MAINDIALOG_H
#include <QDialog>
namespace Ui {
class MainDialog;
}
class MainDialog : public QDialog
{
Q_OBJECT
public:
explicit MainDialog(QWidget *parent = 0);
~MainDialog();
private:
Ui::MainDialog *ui;
};
#endif // MAINDIALOG_H
<ui version="4.0">
<class>MainDialog</class>
<widget class="QDialog" name="MainDialog" >
<property name="geometry" >
<rect>
<x>0</x>
<y>0</y>
<width>400</width>
<height>300</height>
</rect>
</property>
<property name="windowTitle" >
<string>MainDialog</string>
</property>
</widget>
<layoutDefault spacing="6" margin="11" />
<pixmapfunction></pixmapfunction>
<resources/>
<connections/>
</ui>
......@@ -25,7 +25,8 @@ SOURCES += main.cpp\
database/databaseresource.cpp \
smartlink/st_clientnode_basetrans.cpp \
smartlink/st_clientnode_app_imp.cpp \
smartlink/st_clientnode_applayer.cpp
smartlink/st_clientnode_applayer.cpp \
cluster/zp_clusterterm.cpp
HEADERS += zpmainframe.h \
network/zp_tcpserver.h \
......@@ -40,7 +41,8 @@ HEADERS += zpmainframe.h \
database/databaseresource.h \
smartlink/st_msg_applayer.h \
smartlink/st_clientnode_basetrans.h \
smartlink/st_clientnode_applayer.h
smartlink/st_clientnode_applayer.h \
cluster/zp_clusterterm.h
FORMS += zpmainframe.ui
......
#include "zp_clusterterm.h"
namespace ZP_Cluster{
zp_ClusterTerm::zp_ClusterTerm(const QString & name,int nTransThreads ,QObject *parent ) :
QObject(parent)
,m_strTermName(name)
{
m_pClusterEng = new ZPTaskEngine::zp_pipeline(this);
m_pClusterNet = new ZPNetwork::zp_net_ThreadPool(8192,this);
m_pClusterEng->addThreads(nTransThreads);
}
void zp_ClusterTerm::StartListen(const QHostAddress &addr, int nPort)
{
m_pClusterNet->AddListeningAddress(m_strTermName,addr,nPort,false);
}
bool zp_ClusterTerm::JoinCluster(const QHostAddress &addr, int nPort)
{
return m_pClusterNet->connectTo(addr,nPort);
}
}
#ifndef ZP_CLUSTERTERM_H
#define ZP_CLUSTERTERM_H
#include <QObject>
#include <QHostAddress>
#include "../network/zp_net_threadpool.h"
#include "../pipeline/zp_pipeline.h"
#include "../pipeline/zp_pltaskbase.h"
namespace ZP_Cluster{
//!this class enable server processes can
//! communicate with each other.
class zp_ClusterTerm : public QObject
{
Q_OBJECT
public:
explicit zp_ClusterTerm(const QString & name,int nTransThreads = 4,QObject *parent = 0);
//cluster status
bool isListening(){ return m_pClusterNet->ListenerNames().size()==0?false:true;}
const QString & name() {return m_strTermName;}
int transThreads(){ return m_pClusterNet->TransThreadNum(); }
int transClients(int idx){ return m_pClusterNet->totalClients(idx);}
int payload(){ return m_pClusterEng->payload();}
int threadsCount(){ return m_pClusterEng->threadsCount();}
int threadsIdel(){ return m_pClusterEng->idleThreads();}
protected:
QString m_strTermName;//the Terminal's name
ZPNetwork::zp_net_ThreadPool * m_pClusterNet;
ZPTaskEngine::zp_pipeline * m_pClusterEng;
signals:
public slots:
//!Start listen, this term can be connected by newly joined terms in future.
void StartListen(const QHostAddress &addr, int nPort);
//!Join cluster, using existing term (addr:nPort)
//!as soon as connection established, more existing terms will be sent to this term,
//!an p2p connection will start
bool JoinCluster(const QHostAddress &addr, int nPort);
};
}
#endif // ZP_CLUSTERTERM_H
......@@ -176,6 +176,7 @@ void zp_net_ThreadPool::AddClientTransThreads(int nThreads,bool bSSL)
connect (clientTH,&zp_netTransThread::evt_NewClientConnected,this,&zp_net_ThreadPool::evt_NewClientConnected,Qt::QueuedConnection);
connect (clientTH,&zp_netTransThread::evt_SocketError,this,&zp_net_ThreadPool::evt_SocketError,Qt::QueuedConnection);
connect (this,&zp_net_ThreadPool::evt_EstablishConnection,clientTH,&zp_netTransThread::incomingConnection,Qt::QueuedConnection);
connect (this,&zp_net_ThreadPool::evt_FireConnection,clientTH,&zp_netTransThread::startConnection,Qt::QueuedConnection);
connect (this,&zp_net_ThreadPool::evt_BroadcastData,clientTH,&zp_netTransThread::BroadcastData,Qt::QueuedConnection);
connect (this,&zp_net_ThreadPool::evt_SendDataToClient,clientTH,&zp_netTransThread::SendDataToClient,Qt::QueuedConnection);
connect (this,&zp_net_ThreadPool::evt_KickAll,clientTH,&zp_netTransThread::KickAllClients,Qt::QueuedConnection);
......@@ -206,6 +207,7 @@ bool zp_net_ThreadPool::TransThreadDel(zp_netTransThread * pThreadObj)
disconnect (clientTH,&zp_netTransThread::evt_NewClientConnected,this,&zp_net_ThreadPool::evt_NewClientConnected);
disconnect (clientTH,&zp_netTransThread::evt_SocketError,this,&zp_net_ThreadPool::evt_SocketError);
disconnect (this,&zp_net_ThreadPool::evt_EstablishConnection,clientTH,&zp_netTransThread::incomingConnection);
disconnect (this,&zp_net_ThreadPool::evt_FireConnection,clientTH,&zp_netTransThread::startConnection);
disconnect (this,&zp_net_ThreadPool::evt_BroadcastData,clientTH,&zp_netTransThread::BroadcastData);
disconnect (this,&zp_net_ThreadPool::evt_SendDataToClient,clientTH,&zp_netTransThread::SendDataToClient);
disconnect (this,&zp_net_ThreadPool::evt_KickAll,clientTH,&zp_netTransThread::KickAllClients);
......@@ -288,4 +290,45 @@ bool zp_net_ThreadPool::CanExit()
//m_mutex_listen.unlock();
return res;
}
bool zp_net_ThreadPool::connectTo (const QHostAddress & address , quint16 nPort,bool bSSLConn)
{
bool res= false;
//m_mutex_trans.lock();
int nsz = m_vec_NetTransThreads.size();
int nMinPay = 0x7fffffff;
int nMinIdx = -1;
for (int i=0;i<nsz && nMinPay!=0;i++)
{
if (m_vec_NetTransThreads[i]->isActive()==false ||
m_vec_NetTransThreads[i]->SSLConnection()!=bSSLConn
)
continue;
int nPat = m_vec_NetTransThreads[i]->CurrentClients();
if (nPat<nMinPay)
{
nMinPay = nPat;
nMinIdx = i;
}
//qDebug()<<i<<" "<<nPat<<" "<<nMinIdx;
}
for (int i=0;i<nsz;i++)
if (m_vec_NetTransThreads[i]->isActive()==false )
TransThreadDel(m_vec_NetTransThreads[i]);
if (nMinIdx>=0 && nMinIdx<nsz)
{
res = true;
emit evt_FireConnection(m_vec_NetTransThreads[nMinIdx],address,nPort);
}
else
{
emit evt_Message("Waring>"+QString(tr("Need Trans Thread Object for clients.")));
}
//m_mutex_trans.unlock();
return res;
}
}
......@@ -83,6 +83,7 @@ signals:
void startListen(const QString & id);
void stopListen(const QString & id);
void evt_EstablishConnection(QObject * threadid,qintptr socketDescriptor);
void evt_FireConnection(QObject * threadid,const QHostAddress & hostAddr, quint16 port);
//Trans Control,for intenal thread usage
void evt_SendDataToClient(QObject * objClient,const QByteArray & dtarray);
void evt_BroadcastData(QObject * objFromClient,const QByteArray & dtarray);
......@@ -102,6 +103,9 @@ public slots:
//Close client Immediatele
void KickClients(QObject * object);
//Possive Connection Methods
bool connectTo (const QHostAddress & address , quint16 nPort,bool bSSLConn = true);
};
}
#endif // ZP_NET_THREADPOOL_H
......@@ -4,6 +4,7 @@
#include <assert.h>
#include <QDebug>
#include <QCoreApplication>
#include <QHostAddress>
namespace ZPNetwork{
zp_netTransThread::zp_netTransThread(zp_net_ThreadPool *pThreadPool,int nPayLoad,QObject *parent) :
QObject(parent)
......@@ -58,10 +59,10 @@ void zp_netTransThread::incomingConnection(QObject * threadid,qintptr socketDesc
//Initial content
if (true ==sock_client->setSocketDescriptor(socketDescriptor))
{
connect(sock_client, SIGNAL(readyRead()),this, SLOT(new_data_recieved()),Qt::QueuedConnection);
connect(sock_client, SIGNAL(disconnected()),this,SLOT(client_closed()),Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::readyRead,this, &zp_netTransThread::new_data_recieved,Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed,Qt::QueuedConnection);
connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)),Qt::QueuedConnection);
connect(sock_client, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)),Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::bytesWritten, this, &zp_netTransThread::some_data_sended,Qt::QueuedConnection);
m_mutex_protect.lock();
m_clientList[sock_client] = 0;
m_mutex_protect.unlock();
......@@ -84,6 +85,52 @@ void zp_netTransThread::incomingConnection(QObject * threadid,qintptr socketDesc
}
}
void zp_netTransThread::startConnection(QObject * threadid,const QHostAddress & addr, quint16 port)
{
if (threadid!=this)
return;
QTcpSocket * sock_client = 0;
if (m_bSSLConnection)
sock_client = new QSslSocket(this);
else
sock_client = new QTcpSocket(this);
if (sock_client)
{
if (m_bSSLConnection==true)
{
QSslSocket * psslsock = qobject_cast<QSslSocket *>(sock_client);
assert(psslsock!=NULL);
QString strCerPath = QCoreApplication::applicationDirPath() + "/ca_cert.pem";
QList<QSslCertificate> lstCas = QSslCertificate::fromPath(strCerPath);
psslsock->setCaCertificates(lstCas);
connect(sock_client, &QTcpSocket::readyRead,this,&zp_netTransThread::new_data_recieved,Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed,Qt::QueuedConnection);
connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)),Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::bytesWritten, this,&zp_netTransThread::some_data_sended,Qt::QueuedConnection);
connect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted,Qt::QueuedConnection);
m_mutex_protect.lock();
m_clientList[sock_client] = 0;
m_mutex_protect.unlock();
psslsock->connectToHostEncrypted(addr.toString(),port);
}
else
{
connect(sock_client, &QTcpSocket::readyRead,this, &zp_netTransThread::new_data_recieved,Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed,Qt::QueuedConnection);
connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)),Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::bytesWritten, this,&zp_netTransThread::some_data_sended,Qt::QueuedConnection);
connect(sock_client, &QTcpSocket::connected,this, &zp_netTransThread::on_encrypted,Qt::QueuedConnection);
sock_client->connectToHost(addr,port);
}
}
else
assert(false);
}
void zp_netTransThread::on_encrypted()
{
QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
......@@ -101,10 +148,10 @@ void zp_netTransThread::client_closed()
if (psslsock)
disconnect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted);
}
disconnect(pSock, SIGNAL(readyRead()),this, SLOT(new_data_recieved()));
disconnect(pSock, SIGNAL(disconnected()),this,SLOT(client_closed()));
disconnect(pSock, &QTcpSocket::readyRead,this, &zp_netTransThread::new_data_recieved);
disconnect(pSock, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed);
disconnect(pSock, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));
disconnect(pSock, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)));
disconnect(pSock, &QTcpSocket::bytesWritten, this, &zp_netTransThread::some_data_sended);
m_buffer_sending.remove(pSock);
m_buffer_sending_offset.remove(pSock);
m_mutex_protect.lock();
......
......@@ -37,6 +37,8 @@ private:
public slots:
//This slot dealing with multi-thread client socket accept.
void incomingConnection(QObject * threadid,qintptr socketDescriptor);
//This slot dealing with possive connect to method.
void startConnection(QObject * threadid,const QHostAddress & addr, quint16 port);
//sending dtarray to objClient. dtarray will be pushed into m_buffer_sending
void SendDataToClient(QObject * objClient,const QByteArray & dtarray);
//Broadcast dtarray to every client except objFromClient itself
......
......@@ -17,7 +17,7 @@ ZPMainFrame::ZPMainFrame(QWidget *parent) :
ui->setupUi(this);
//Create net engine
m_netEngine = new zp_net_ThreadPool (4096);
m_netEngine = new zp_net_ThreadPool (8192);
connect (m_netEngine,&zp_net_ThreadPool::evt_Message,this,&ZPMainFrame::on_evt_Message);
connect (m_netEngine,&zp_net_ThreadPool::evt_SocketError,this,&ZPMainFrame::on_evt_SocketError);
//Create TaskEngine
......@@ -146,8 +146,12 @@ void ZPMainFrame::timerEvent(QTimerEvent * e)
str_msg += tr("Current Trans Threads: %1\n").arg(nClientThreads);
for (int i=0;i<nClientThreads;i++)
str_msg += tr("\tTrans Threads %1 hold %2 Client Sockets.\n").arg(i+1).arg(m_netEngine->totalClients(i));
{
str_msg += tr("\t%1:%2").arg(i+1).arg(m_netEngine->totalClients(i));
if ((i+1)%5==0)
str_msg += "\n";
}
str_msg += "\n";
//recording task status
str_msg += tr("Current Task Threads: %1\n").arg(m_taskEngine->threadsCount());
str_msg += tr("Current Task Payload: %1\n").arg(m_taskEngine->payload());
......
......@@ -492,6 +492,127 @@
</item>
</layout>
</widget>
<widget class="QWidget" name="tab_cluster">
<attribute name="icon">
<iconset resource="resource.qrc">
<normaloff>:/icons/Resources/hanukkah_03.png</normaloff>:/icons/Resources/hanukkah_03.png</iconset>
</attribute>
<attribute name="title">
<string>Cluster</string>
</attribute>
<layout class="QVBoxLayout" name="verticalLayout_6">
<item>
<layout class="QHBoxLayout" name="horizontalLayout_12">
<item>
<widget class="QLabel" name="label_20">
<property name="text">
<string>Terminal Address</string>
</property>
</widget>
</item>
<item>
<widget class="QLineEdit" name="lineEdit_cluster_term_addr"/>
</item>
<item>
<widget class="QLabel" name="label_19">
<property name="text">
<string>Terminal Port</string>
</property>
</widget>
</item>
<item>
<widget class="QLineEdit" name="lineEdit_cluster_term_port"/>
</item>
<item>
<spacer name="horizontalSpacer_6">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
</layout>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout_13">
<item>
<widget class="QLabel" name="label_23">
<property name="text">
<string>Publish Name</string>
</property>
</widget>
</item>
<item>
<widget class="QLineEdit" name="lineEdit_cluster_pub_name"/>
</item>
<item>
<widget class="QLabel" name="label_21">
<property name="text">
<string>Publish Address</string>
</property>
</widget>
</item>
<item>
<widget class="QLineEdit" name="lineEdit_cluster_pub_Addr"/>
</item>
<item>
<widget class="QLabel" name="label_22">
<property name="text">
<string>Publish Port</string>
</property>
</widget>
</item>
<item>
<widget class="QLineEdit" name="lineEdit_cluster_pub_Port"/>
</item>
<item>
<widget class="QPushButton" name="pushButton_cluster_apply">
<property name="text">
<string>&amp;Save</string>
</property>
</widget>
</item>
<item>
<spacer name="horizontalSpacer_7">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
</layout>
</item>
<item>
<widget class="QGroupBox" name="groupBox">
<property name="title">
<string>Active Terminals</string>
</property>
<layout class="QHBoxLayout" name="horizontalLayout_14">
<item>
<widget class="QListView" name="listView_activeTerms">
<property name="movement">
<enum>QListView::Static</enum>
</property>
<property name="viewMode">
<enum>QListView::IconMode</enum>
</property>
</widget>
</item>
</layout>
</widget>
</item>
</layout>
</widget>
<widget class="QWidget" name="tab">
<attribute name="icon">
<iconset resource="resource.qrc">
......@@ -503,6 +624,13 @@
<layout class="QVBoxLayout" name="verticalLayout_5">
<item>
<layout class="QHBoxLayout" name="horizontalLayout_9">
<item>
<widget class="QPushButton" name="pushButton_smartlink_save">
<property name="text">
<string>&amp;Save</string>
</property>
</widget>
</item>
<item>
<widget class="QLabel" name="label">
<property name="text">
......@@ -639,7 +767,7 @@
<x>0</x>
<y>0</y>
<width>556</width>
<height>21</height>
<height>23</height>
</rect>
</property>
<widget class="QMenu" name="menu_Control">
......
TEMPLATE = subdirs
SUBDIRS += QTcpClientTest
SUBDIRS += QTcpClientTest \
ZoomPipeline_CtrlSvr
SUBDIRS += FunctionalClientTest
SUBDIRS += ZoomPipeline_FuncSvr
TRANSLATIONS += ./ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr_zh_CN.ts
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册