diff --git a/ZoomPipeline_CtrlSvr/ZoomPipeline_CtrlSvr.pro b/ZoomPipeline_CtrlSvr/ZoomPipeline_CtrlSvr.pro new file mode 100644 index 0000000000000000000000000000000000000000..81b789573abf61f145fdae3a3614f9b1464cf2d6 --- /dev/null +++ b/ZoomPipeline_CtrlSvr/ZoomPipeline_CtrlSvr.pro @@ -0,0 +1,20 @@ +#------------------------------------------------- +# +# Project created by QtCreator 2014-02-22T21:50:54 +# +#------------------------------------------------- + +QT += core gui network sql + +greaterThan(QT_MAJOR_VERSION, 4): QT += widgets + +TARGET = ZoomPipeline_CtrlSvr +TEMPLATE = app + + +SOURCES += main.cpp\ + maindialog.cpp + +HEADERS += maindialog.h + +FORMS += maindialog.ui diff --git a/ZoomPipeline_CtrlSvr/main.cpp b/ZoomPipeline_CtrlSvr/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..aa1b31c5d2d680fd5eb01d391f69899675ab03bd --- /dev/null +++ b/ZoomPipeline_CtrlSvr/main.cpp @@ -0,0 +1,11 @@ +#include "maindialog.h" +#include + +int main(int argc, char *argv[]) +{ + QApplication a(argc, argv); + MainDialog w; + w.show(); + + return a.exec(); +} diff --git a/ZoomPipeline_CtrlSvr/maindialog.cpp b/ZoomPipeline_CtrlSvr/maindialog.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0eb97d81c913ba3a51bcf22a2246c0a9a963c98e --- /dev/null +++ b/ZoomPipeline_CtrlSvr/maindialog.cpp @@ -0,0 +1,14 @@ +#include "maindialog.h" +#include "ui_maindialog.h" + +MainDialog::MainDialog(QWidget *parent) : + QDialog(parent), + ui(new Ui::MainDialog) +{ + ui->setupUi(this); +} + +MainDialog::~MainDialog() +{ + delete ui; +} diff --git a/ZoomPipeline_CtrlSvr/maindialog.h b/ZoomPipeline_CtrlSvr/maindialog.h new file mode 100644 index 0000000000000000000000000000000000000000..05393a1ce1da7ceb15ab045243815a60e9af5cae --- /dev/null +++ b/ZoomPipeline_CtrlSvr/maindialog.h @@ -0,0 +1,22 @@ +#ifndef MAINDIALOG_H +#define MAINDIALOG_H + +#include + +namespace Ui { +class MainDialog; +} + +class MainDialog : public QDialog +{ + Q_OBJECT + +public: + explicit MainDialog(QWidget *parent = 0); + ~MainDialog(); + +private: + Ui::MainDialog *ui; +}; + +#endif // MAINDIALOG_H diff --git a/ZoomPipeline_CtrlSvr/maindialog.ui b/ZoomPipeline_CtrlSvr/maindialog.ui new file mode 100644 index 0000000000000000000000000000000000000000..423125158281326214ed7fe68710ff82d5798ffb --- /dev/null +++ b/ZoomPipeline_CtrlSvr/maindialog.ui @@ -0,0 +1,20 @@ + + MainDialog + + + + 0 + 0 + 400 + 300 + + + + MainDialog + + + + + + + diff --git a/ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr.pro b/ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr.pro index 55ce116ce82b3a4303daa238e2314a8569404a64..2dde4d4639e781ca68e8896f97562958dcbf71f1 100644 --- a/ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr.pro +++ b/ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr.pro @@ -25,7 +25,8 @@ SOURCES += main.cpp\ database/databaseresource.cpp \ smartlink/st_clientnode_basetrans.cpp \ smartlink/st_clientnode_app_imp.cpp \ - smartlink/st_clientnode_applayer.cpp + smartlink/st_clientnode_applayer.cpp \ + cluster/zp_clusterterm.cpp HEADERS += zpmainframe.h \ network/zp_tcpserver.h \ @@ -40,7 +41,8 @@ HEADERS += zpmainframe.h \ database/databaseresource.h \ smartlink/st_msg_applayer.h \ smartlink/st_clientnode_basetrans.h \ - smartlink/st_clientnode_applayer.h + smartlink/st_clientnode_applayer.h \ + cluster/zp_clusterterm.h FORMS += zpmainframe.ui diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp new file mode 100644 index 0000000000000000000000000000000000000000..17c6a420675d21e59a005589ed8c0b09519fecca --- /dev/null +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.cpp @@ -0,0 +1,21 @@ +#include "zp_clusterterm.h" +namespace ZP_Cluster{ +zp_ClusterTerm::zp_ClusterTerm(const QString & name,int nTransThreads ,QObject *parent ) : + QObject(parent) + ,m_strTermName(name) +{ + m_pClusterEng = new ZPTaskEngine::zp_pipeline(this); + m_pClusterNet = new ZPNetwork::zp_net_ThreadPool(8192,this); + m_pClusterEng->addThreads(nTransThreads); +} +void zp_ClusterTerm::StartListen(const QHostAddress &addr, int nPort) +{ + m_pClusterNet->AddListeningAddress(m_strTermName,addr,nPort,false); + +} +bool zp_ClusterTerm::JoinCluster(const QHostAddress &addr, int nPort) +{ + return m_pClusterNet->connectTo(addr,nPort); +} + +} diff --git a/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h new file mode 100644 index 0000000000000000000000000000000000000000..7e22bdd1885035f142a9dba61a2f3ca2a2703703 --- /dev/null +++ b/ZoomPipeline_FuncSvr/cluster/zp_clusterterm.h @@ -0,0 +1,42 @@ +#ifndef ZP_CLUSTERTERM_H +#define ZP_CLUSTERTERM_H + +#include +#include +#include "../network/zp_net_threadpool.h" +#include "../pipeline/zp_pipeline.h" +#include "../pipeline/zp_pltaskbase.h" + +namespace ZP_Cluster{ +//!this class enable server processes can +//! communicate with each other. +class zp_ClusterTerm : public QObject +{ + Q_OBJECT +public: + explicit zp_ClusterTerm(const QString & name,int nTransThreads = 4,QObject *parent = 0); + //cluster status + bool isListening(){ return m_pClusterNet->ListenerNames().size()==0?false:true;} + const QString & name() {return m_strTermName;} + int transThreads(){ return m_pClusterNet->TransThreadNum(); } + int transClients(int idx){ return m_pClusterNet->totalClients(idx);} + int payload(){ return m_pClusterEng->payload();} + int threadsCount(){ return m_pClusterEng->threadsCount();} + int threadsIdel(){ return m_pClusterEng->idleThreads();} +protected: + QString m_strTermName;//the Terminal's name + ZPNetwork::zp_net_ThreadPool * m_pClusterNet; + ZPTaskEngine::zp_pipeline * m_pClusterEng; +signals: + +public slots: + //!Start listen, this term can be connected by newly joined terms in future. + void StartListen(const QHostAddress &addr, int nPort); + //!Join cluster, using existing term (addr:nPort) + //!as soon as connection established, more existing terms will be sent to this term, + //!an p2p connection will start + bool JoinCluster(const QHostAddress &addr, int nPort); + +}; +} +#endif // ZP_CLUSTERTERM_H diff --git a/ZoomPipeline_FuncSvr/network/zp_net_threadpool.cpp b/ZoomPipeline_FuncSvr/network/zp_net_threadpool.cpp index c97de922f233e5327ac08840c40d36da7c53c7c6..c248c2985f49799dc991d91fc37eb98fdcdaeac5 100644 --- a/ZoomPipeline_FuncSvr/network/zp_net_threadpool.cpp +++ b/ZoomPipeline_FuncSvr/network/zp_net_threadpool.cpp @@ -176,6 +176,7 @@ void zp_net_ThreadPool::AddClientTransThreads(int nThreads,bool bSSL) connect (clientTH,&zp_netTransThread::evt_NewClientConnected,this,&zp_net_ThreadPool::evt_NewClientConnected,Qt::QueuedConnection); connect (clientTH,&zp_netTransThread::evt_SocketError,this,&zp_net_ThreadPool::evt_SocketError,Qt::QueuedConnection); connect (this,&zp_net_ThreadPool::evt_EstablishConnection,clientTH,&zp_netTransThread::incomingConnection,Qt::QueuedConnection); + connect (this,&zp_net_ThreadPool::evt_FireConnection,clientTH,&zp_netTransThread::startConnection,Qt::QueuedConnection); connect (this,&zp_net_ThreadPool::evt_BroadcastData,clientTH,&zp_netTransThread::BroadcastData,Qt::QueuedConnection); connect (this,&zp_net_ThreadPool::evt_SendDataToClient,clientTH,&zp_netTransThread::SendDataToClient,Qt::QueuedConnection); connect (this,&zp_net_ThreadPool::evt_KickAll,clientTH,&zp_netTransThread::KickAllClients,Qt::QueuedConnection); @@ -206,6 +207,7 @@ bool zp_net_ThreadPool::TransThreadDel(zp_netTransThread * pThreadObj) disconnect (clientTH,&zp_netTransThread::evt_NewClientConnected,this,&zp_net_ThreadPool::evt_NewClientConnected); disconnect (clientTH,&zp_netTransThread::evt_SocketError,this,&zp_net_ThreadPool::evt_SocketError); disconnect (this,&zp_net_ThreadPool::evt_EstablishConnection,clientTH,&zp_netTransThread::incomingConnection); + disconnect (this,&zp_net_ThreadPool::evt_FireConnection,clientTH,&zp_netTransThread::startConnection); disconnect (this,&zp_net_ThreadPool::evt_BroadcastData,clientTH,&zp_netTransThread::BroadcastData); disconnect (this,&zp_net_ThreadPool::evt_SendDataToClient,clientTH,&zp_netTransThread::SendDataToClient); disconnect (this,&zp_net_ThreadPool::evt_KickAll,clientTH,&zp_netTransThread::KickAllClients); @@ -288,4 +290,45 @@ bool zp_net_ThreadPool::CanExit() //m_mutex_listen.unlock(); return res; } + +bool zp_net_ThreadPool::connectTo (const QHostAddress & address , quint16 nPort,bool bSSLConn) +{ + bool res= false; + //m_mutex_trans.lock(); + int nsz = m_vec_NetTransThreads.size(); + int nMinPay = 0x7fffffff; + int nMinIdx = -1; + + for (int i=0;iisActive()==false || + m_vec_NetTransThreads[i]->SSLConnection()!=bSSLConn + ) + continue; + int nPat = m_vec_NetTransThreads[i]->CurrentClients(); + + if (nPatisActive()==false ) + TransThreadDel(m_vec_NetTransThreads[i]); + + if (nMinIdx>=0 && nMinIdx"+QString(tr("Need Trans Thread Object for clients."))); + } + //m_mutex_trans.unlock(); + return res; +} + } diff --git a/ZoomPipeline_FuncSvr/network/zp_net_threadpool.h b/ZoomPipeline_FuncSvr/network/zp_net_threadpool.h index dcf2bb520d3babb96b2e20a5f8a42560dfb7023b..3fdafdd84136fcc89287a211a07c264226cda2c7 100644 --- a/ZoomPipeline_FuncSvr/network/zp_net_threadpool.h +++ b/ZoomPipeline_FuncSvr/network/zp_net_threadpool.h @@ -83,6 +83,7 @@ signals: void startListen(const QString & id); void stopListen(const QString & id); void evt_EstablishConnection(QObject * threadid,qintptr socketDescriptor); + void evt_FireConnection(QObject * threadid,const QHostAddress & hostAddr, quint16 port); //Trans Control,for intenal thread usage void evt_SendDataToClient(QObject * objClient,const QByteArray & dtarray); void evt_BroadcastData(QObject * objFromClient,const QByteArray & dtarray); @@ -102,6 +103,9 @@ public slots: //Close client Immediatele void KickClients(QObject * object); + //Possive Connection Methods + bool connectTo (const QHostAddress & address , quint16 nPort,bool bSSLConn = true); + }; } #endif // ZP_NET_THREADPOOL_H diff --git a/ZoomPipeline_FuncSvr/network/zp_nettransthread.cpp b/ZoomPipeline_FuncSvr/network/zp_nettransthread.cpp index 21291decc2ec42b05227a2e702b733ad0ab3727e..40ead1fd284a32658e8e7d1aab84fb4f9e9f4103 100644 --- a/ZoomPipeline_FuncSvr/network/zp_nettransthread.cpp +++ b/ZoomPipeline_FuncSvr/network/zp_nettransthread.cpp @@ -4,6 +4,7 @@ #include #include #include +#include namespace ZPNetwork{ zp_netTransThread::zp_netTransThread(zp_net_ThreadPool *pThreadPool,int nPayLoad,QObject *parent) : QObject(parent) @@ -58,10 +59,10 @@ void zp_netTransThread::incomingConnection(QObject * threadid,qintptr socketDesc //Initial content if (true ==sock_client->setSocketDescriptor(socketDescriptor)) { - connect(sock_client, SIGNAL(readyRead()),this, SLOT(new_data_recieved()),Qt::QueuedConnection); - connect(sock_client, SIGNAL(disconnected()),this,SLOT(client_closed()),Qt::QueuedConnection); + connect(sock_client, &QTcpSocket::readyRead,this, &zp_netTransThread::new_data_recieved,Qt::QueuedConnection); + connect(sock_client, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed,Qt::QueuedConnection); connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)),Qt::QueuedConnection); - connect(sock_client, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)),Qt::QueuedConnection); + connect(sock_client, &QTcpSocket::bytesWritten, this, &zp_netTransThread::some_data_sended,Qt::QueuedConnection); m_mutex_protect.lock(); m_clientList[sock_client] = 0; m_mutex_protect.unlock(); @@ -84,6 +85,52 @@ void zp_netTransThread::incomingConnection(QObject * threadid,qintptr socketDesc } } + +void zp_netTransThread::startConnection(QObject * threadid,const QHostAddress & addr, quint16 port) +{ + if (threadid!=this) + return; + QTcpSocket * sock_client = 0; + if (m_bSSLConnection) + sock_client = new QSslSocket(this); + else + sock_client = new QTcpSocket(this); + if (sock_client) + { + if (m_bSSLConnection==true) + { + QSslSocket * psslsock = qobject_cast(sock_client); + assert(psslsock!=NULL); + QString strCerPath = QCoreApplication::applicationDirPath() + "/ca_cert.pem"; + QList lstCas = QSslCertificate::fromPath(strCerPath); + psslsock->setCaCertificates(lstCas); + + connect(sock_client, &QTcpSocket::readyRead,this,&zp_netTransThread::new_data_recieved,Qt::QueuedConnection); + connect(sock_client, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed,Qt::QueuedConnection); + connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)),Qt::QueuedConnection); + connect(sock_client, &QTcpSocket::bytesWritten, this,&zp_netTransThread::some_data_sended,Qt::QueuedConnection); + connect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted,Qt::QueuedConnection); + m_mutex_protect.lock(); + m_clientList[sock_client] = 0; + m_mutex_protect.unlock(); + + psslsock->connectToHostEncrypted(addr.toString(),port); + } + else + { + connect(sock_client, &QTcpSocket::readyRead,this, &zp_netTransThread::new_data_recieved,Qt::QueuedConnection); + connect(sock_client, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed,Qt::QueuedConnection); + connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)),Qt::QueuedConnection); + connect(sock_client, &QTcpSocket::bytesWritten, this,&zp_netTransThread::some_data_sended,Qt::QueuedConnection); + connect(sock_client, &QTcpSocket::connected,this, &zp_netTransThread::on_encrypted,Qt::QueuedConnection); + sock_client->connectToHost(addr,port); + + } + } + else + assert(false); +} + void zp_netTransThread::on_encrypted() { QTcpSocket * pSock = qobject_cast(sender()); @@ -101,10 +148,10 @@ void zp_netTransThread::client_closed() if (psslsock) disconnect(psslsock, &QSslSocket::encrypted,this, &zp_netTransThread::on_encrypted); } - disconnect(pSock, SIGNAL(readyRead()),this, SLOT(new_data_recieved())); - disconnect(pSock, SIGNAL(disconnected()),this,SLOT(client_closed())); + disconnect(pSock, &QTcpSocket::readyRead,this, &zp_netTransThread::new_data_recieved); + disconnect(pSock, &QTcpSocket::disconnected,this,&zp_netTransThread::client_closed); disconnect(pSock, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError))); - disconnect(pSock, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64))); + disconnect(pSock, &QTcpSocket::bytesWritten, this, &zp_netTransThread::some_data_sended); m_buffer_sending.remove(pSock); m_buffer_sending_offset.remove(pSock); m_mutex_protect.lock(); diff --git a/ZoomPipeline_FuncSvr/network/zp_nettransthread.h b/ZoomPipeline_FuncSvr/network/zp_nettransthread.h index 5dc54a91f4a22c7d389337a1c433e8d28bcfe68a..7fe093380f92a8c7ccd4688e4d25e8caa3716953 100644 --- a/ZoomPipeline_FuncSvr/network/zp_nettransthread.h +++ b/ZoomPipeline_FuncSvr/network/zp_nettransthread.h @@ -37,6 +37,8 @@ private: public slots: //This slot dealing with multi-thread client socket accept. void incomingConnection(QObject * threadid,qintptr socketDescriptor); + //This slot dealing with possive connect to method. + void startConnection(QObject * threadid,const QHostAddress & addr, quint16 port); //sending dtarray to objClient. dtarray will be pushed into m_buffer_sending void SendDataToClient(QObject * objClient,const QByteArray & dtarray); //Broadcast dtarray to every client except objFromClient itself diff --git a/ZoomPipeline_FuncSvr/zpmainframe.cpp b/ZoomPipeline_FuncSvr/zpmainframe.cpp index 1c15ef11381375ece70bbb558ed3dc7e0c813f86..88a7d43bae8d90d6a0f8833c41324d4c66453088 100644 --- a/ZoomPipeline_FuncSvr/zpmainframe.cpp +++ b/ZoomPipeline_FuncSvr/zpmainframe.cpp @@ -17,7 +17,7 @@ ZPMainFrame::ZPMainFrame(QWidget *parent) : ui->setupUi(this); //Create net engine - m_netEngine = new zp_net_ThreadPool (4096); + m_netEngine = new zp_net_ThreadPool (8192); connect (m_netEngine,&zp_net_ThreadPool::evt_Message,this,&ZPMainFrame::on_evt_Message); connect (m_netEngine,&zp_net_ThreadPool::evt_SocketError,this,&ZPMainFrame::on_evt_SocketError); //Create TaskEngine @@ -146,8 +146,12 @@ void ZPMainFrame::timerEvent(QTimerEvent * e) str_msg += tr("Current Trans Threads: %1\n").arg(nClientThreads); for (int i=0;itotalClients(i)); - + { + str_msg += tr("\t%1:%2").arg(i+1).arg(m_netEngine->totalClients(i)); + if ((i+1)%5==0) + str_msg += "\n"; + } + str_msg += "\n"; //recording task status str_msg += tr("Current Task Threads: %1\n").arg(m_taskEngine->threadsCount()); str_msg += tr("Current Task Payload: %1\n").arg(m_taskEngine->payload()); diff --git a/ZoomPipeline_FuncSvr/zpmainframe.ui b/ZoomPipeline_FuncSvr/zpmainframe.ui index 32a160d76d9276c6829d13d08a3db90402754b1a..beb4af59d1543416b0feac9dd31d4f9b4ef45258 100644 --- a/ZoomPipeline_FuncSvr/zpmainframe.ui +++ b/ZoomPipeline_FuncSvr/zpmainframe.ui @@ -492,6 +492,127 @@ + + + + :/icons/Resources/hanukkah_03.png:/icons/Resources/hanukkah_03.png + + + Cluster + + + + + + + + Terminal Address + + + + + + + + + + Terminal Port + + + + + + + + + + Qt::Horizontal + + + + 40 + 20 + + + + + + + + + + + + Publish Name + + + + + + + + + + Publish Address + + + + + + + + + + Publish Port + + + + + + + + + + &Save + + + + + + + Qt::Horizontal + + + + 40 + 20 + + + + + + + + + + Active Terminals + + + + + + QListView::Static + + + QListView::IconMode + + + + + + + + @@ -503,6 +624,13 @@ + + + + &Save + + + @@ -639,7 +767,7 @@ 0 0 556 - 21 + 23 diff --git a/zoomPipeline.pro b/zoomPipeline.pro index 2958657b173070a32c53f44ab276955611165a70..aae2367a72bdfeae3c9aefcdba8b022232ef49c8 100644 --- a/zoomPipeline.pro +++ b/zoomPipeline.pro @@ -1,5 +1,6 @@ TEMPLATE = subdirs -SUBDIRS += QTcpClientTest +SUBDIRS += QTcpClientTest \ + ZoomPipeline_CtrlSvr SUBDIRS += FunctionalClientTest SUBDIRS += ZoomPipeline_FuncSvr TRANSLATIONS += ./ZoomPipeline_FuncSvr/ZoomPipeline_FuncSvr_zh_CN.ts