提交 2b2fec42 编写于 作者: 丁劲犇's avatar 丁劲犇 😸

All intends changed to standard tabs

上级 14350d1c
......@@ -3,9 +3,9 @@
int main(int argc, char *argv[])
{
QApplication a(argc, argv);
MainDialog w;
w.show();
return a.exec();
QApplication a(argc, argv);
MainDialog w;
w.show();
return a.exec();
}
......@@ -614,7 +614,7 @@ int MainDialog::filter_message(const QByteArray & block, int offset)
else
{
displayMessage(tr("Client Send a unknown start Header %1 %2. Close client immediately.")
.arg((int)(ptrCurrData[0])).arg((int)(ptrCurrData[1])));
.arg((int)(ptrCurrData[0])).arg((int)(ptrCurrData[1])));
m_currentMessageSize = 0;
m_currentBlock = QByteArray();
offset = blocklen;
......@@ -723,7 +723,7 @@ int MainDialog::deal_current_message_block()
QMessageBox::information(this,tr("Failed!"),tr("download in Failed!"));
displayMessage(tr("Res = %1")
.arg(pApp->MsgUnion.msg_DownloadUserListRsp.DoneCode)
);
);
}
else if (pApp->header.MsgType==0x3801)
......
......@@ -10,67 +10,67 @@
#include "../ZoomPipeline_FuncSvr/smartlink/st_msg_applayer.h"
using namespace SmartLink;
namespace Ui {
class MainDialog;
class MainDialog;
}
class MainDialog : public QDialog
{
Q_OBJECT
Q_OBJECT
public:
explicit MainDialog(QWidget *parent = 0);
~MainDialog();
virtual void timerEvent(QTimerEvent * evt);
explicit MainDialog(QWidget *parent = 0);
~MainDialog();
virtual void timerEvent(QTimerEvent * evt);
private:
Ui::MainDialog *ui;
QGHTcpClient * client;
QStandardItemModel model;
int nTimer;
void saveIni();
Ui::MainDialog *ui;
QGHTcpClient * client;
QStandardItemModel model;
int nTimer;
void saveIni();
bool m_bLogedIn;
bool m_bUUIDGot ;
bool m_bLogedIn;
bool m_bUUIDGot ;
//!Message Dealers
//!Message Dealers
protected:
//!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader
//!return bytes Used.
int filter_message(const QByteArray &, int offset);
int deal_current_message_block();
//!deal one message, affect m_currentRedOffset,m_currentMessageSize,m_currentHeader
//!return bytes Used.
int filter_message(const QByteArray &, int offset);
int deal_current_message_block();
//data items
//data items
protected:
//The current Read Offset, from m_list_RawData's beginning
int m_currentReadOffset;
//Current Message Offset, according to m_currentHeader
int m_currentMessageSize;
//Current un-procssed message block.for large blocks,
//this array will be re-setted as soon as some part of data has been
//dealed, eg, send a 200MB block, the 200MB data will be splitted into pieces
QByteArray m_currentBlock;
//current Header
SMARTLINK_MSG m_currentHeader;
SMARTLINK_MSG_APP m_current_app_header;
//The current Read Offset, from m_list_RawData's beginning
int m_currentReadOffset;
//Current Message Offset, according to m_currentHeader
int m_currentMessageSize;
//Current un-procssed message block.for large blocks,
//this array will be re-setted as soon as some part of data has been
//dealed, eg, send a 200MB block, the 200MB data will be splitted into pieces
QByteArray m_currentBlock;
//current Header
SMARTLINK_MSG m_currentHeader;
SMARTLINK_MSG_APP m_current_app_header;
public slots:
void new_data_recieved();
void on_client_trasferred(qint64);
void on_client_connected();
void on_client_disconnected();
void displayError(QAbstractSocket::SocketError);
void displayMessage(const QString &str);
void new_data_recieved();
void on_client_trasferred(qint64);
void on_client_connected();
void on_client_disconnected();
void displayError(QAbstractSocket::SocketError);
void displayMessage(const QString &str);
void on_pushButton_connect_clicked();
void on_pushButton_regisit_clicked();
void on_pushButton_Login_clicked();
void on_pushButton_clientLogin_clicked();
void on_pushButton_CrTime_clicked();
void on_pushButton_box_upload_uid_clicked();
void on_pushButton_box_download_uid_clicked();
void on_pushButton_client_downHost_clicked();
void on_pushButton_clientLogout_clicked();
void on_pushButton_connect_clicked();
void on_pushButton_regisit_clicked();
void on_pushButton_Login_clicked();
void on_pushButton_clientLogin_clicked();
void on_pushButton_CrTime_clicked();
void on_pushButton_box_upload_uid_clicked();
void on_pushButton_box_download_uid_clicked();
void on_pushButton_client_downHost_clicked();
void on_pushButton_clientLogout_clicked();
};
......
......@@ -2,12 +2,12 @@
#include <assert.h>
QGHTcpClient::QGHTcpClient(QObject *parent,int nPayLoad)
: QTcpSocket(parent),
m_nPayLoad(nPayLoad)
: QTcpSocket(parent),
m_nPayLoad(nPayLoad)
{
assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);
connect(this, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)));
assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);
connect(this, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)));
}
......@@ -17,41 +17,41 @@ QGHTcpClient::~QGHTcpClient()
}
void QGHTcpClient::some_data_sended(qint64 wsended)
{
while (m_buffer_sending.empty()==false)
{
QByteArray & arraySending = *m_buffer_sending.begin();
qint64 & currentOffset = *m_buffer_sending_offset.begin();
qint64 nTotalBytes = arraySending.size();
assert(nTotalBytes>=currentOffset);
qint64 nBytesWritten = write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad));
currentOffset += nBytesWritten;
if (currentOffset>=nTotalBytes)
{
m_buffer_sending.pop_front();
m_buffer_sending_offset.pop_front();
}
else
break;
}
while (m_buffer_sending.empty()==false)
{
QByteArray & arraySending = *m_buffer_sending.begin();
qint64 & currentOffset = *m_buffer_sending_offset.begin();
qint64 nTotalBytes = arraySending.size();
assert(nTotalBytes>=currentOffset);
qint64 nBytesWritten = write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad));
currentOffset += nBytesWritten;
if (currentOffset>=nTotalBytes)
{
m_buffer_sending.pop_front();
m_buffer_sending_offset.pop_front();
}
else
break;
}
}
void QGHTcpClient::SendData(QByteArray dtarray)
{
if (dtarray.size())
{
if (m_buffer_sending.empty()==true)
{
qint64 bytesWritten = write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));
if (bytesWritten < dtarray.size())
{
m_buffer_sending.push_back(dtarray);
m_buffer_sending_offset.push_back(bytesWritten);
}
}
else
{
m_buffer_sending.push_back(dtarray);
m_buffer_sending_offset.push_back(0);
}
}
if (dtarray.size())
{
if (m_buffer_sending.empty()==true)
{
qint64 bytesWritten = write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));
if (bytesWritten < dtarray.size())
{
m_buffer_sending.push_back(dtarray);
m_buffer_sending_offset.push_back(bytesWritten);
}
}
else
{
m_buffer_sending.push_back(dtarray);
m_buffer_sending_offset.push_back(0);
}
}
}
......@@ -5,19 +5,19 @@
#include <QList>
class QGHTcpClient : public QTcpSocket
{
Q_OBJECT
Q_OBJECT
public:
QGHTcpClient(QObject *parent,int nPayLoad = 4096);
~QGHTcpClient();
QGHTcpClient(QObject *parent,int nPayLoad = 4096);
~QGHTcpClient();
private:
int m_nPayLoad;
QList<QByteArray> m_buffer_sending;
QList<qint64> m_buffer_sending_offset;
int m_nPayLoad;
QList<QByteArray> m_buffer_sending;
QList<qint64> m_buffer_sending_offset;
public slots:
void some_data_sended(qint64);
void SendData(QByteArray dtarray);
void some_data_sended(qint64);
void SendData(QByteArray dtarray);
};
......
......@@ -4,9 +4,9 @@
#include <stdlib.h>
int main(int argc, char *argv[])
{
srand(time(0));
QApplication a(argc, argv);
QTcpClientTest w;
w.show();
return a.exec();
srand(time(0));
QApplication a(argc, argv);
QTcpClientTest w;
w.show();
return a.exec();
}
......@@ -2,12 +2,12 @@
#include <assert.h>
QGHTcpClient::QGHTcpClient(QObject *parent,int nPayLoad)
: QSslSocket(parent),
m_nPayLoad(nPayLoad)
: QSslSocket(parent),
m_nPayLoad(nPayLoad)
{
assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);
connect(this, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)));
assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);
connect(this, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)));
}
......@@ -17,41 +17,41 @@ QGHTcpClient::~QGHTcpClient()
}
void QGHTcpClient::some_data_sended(qint64 wsended)
{
while (m_buffer_sending.empty()==false)
{
QByteArray & arraySending = *m_buffer_sending.begin();
qint64 & currentOffset = *m_buffer_sending_offset.begin();
qint64 nTotalBytes = arraySending.size();
assert(nTotalBytes>=currentOffset);
qint64 nBytesWritten = write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad));
currentOffset += nBytesWritten;
if (currentOffset>=nTotalBytes)
{
m_buffer_sending.pop_front();
m_buffer_sending_offset.pop_front();
}
else
break;
}
while (m_buffer_sending.empty()==false)
{
QByteArray & arraySending = *m_buffer_sending.begin();
qint64 & currentOffset = *m_buffer_sending_offset.begin();
qint64 nTotalBytes = arraySending.size();
assert(nTotalBytes>=currentOffset);
qint64 nBytesWritten = write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad));
currentOffset += nBytesWritten;
if (currentOffset>=nTotalBytes)
{
m_buffer_sending.pop_front();
m_buffer_sending_offset.pop_front();
}
else
break;
}
}
void QGHTcpClient::SendData(QByteArray dtarray)
{
if (dtarray.size())
{
if (m_buffer_sending.empty()==true)
{
qint64 bytesWritten = write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));
if (bytesWritten < dtarray.size())
{
m_buffer_sending.push_back(dtarray);
m_buffer_sending_offset.push_back(bytesWritten);
}
}
else
{
m_buffer_sending.push_back(dtarray);
m_buffer_sending_offset.push_back(0);
}
}
if (dtarray.size())
{
if (m_buffer_sending.empty()==true)
{
qint64 bytesWritten = write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));
if (bytesWritten < dtarray.size())
{
m_buffer_sending.push_back(dtarray);
m_buffer_sending_offset.push_back(bytesWritten);
}
}
else
{
m_buffer_sending.push_back(dtarray);
m_buffer_sending_offset.push_back(0);
}
}
}
......@@ -5,19 +5,19 @@
#include <QList>
class QGHTcpClient : public QSslSocket
{
Q_OBJECT
Q_OBJECT
public:
QGHTcpClient(QObject *parent,int nPayLoad = 4096);
~QGHTcpClient();
QGHTcpClient(QObject *parent,int nPayLoad = 4096);
~QGHTcpClient();
private:
int m_nPayLoad;
QList<QByteArray> m_buffer_sending;
QList<qint64> m_buffer_sending_offset;
int m_nPayLoad;
QList<QByteArray> m_buffer_sending;
QList<qint64> m_buffer_sending_offset;
public slots:
void some_data_sended(qint64);
void SendData(QByteArray dtarray);
void some_data_sended(qint64);
void SendData(QByteArray dtarray);
};
......
......@@ -4,18 +4,18 @@
#include "../ZoomPipeline_FuncSvr/smartlink/st_message.h"
using namespace SmartLink;
QTcpClientTest::QTcpClientTest(QWidget *parent, Qt::WindowFlags flags)
: QMainWindow(parent, flags)
: QMainWindow(parent, flags)
{
ui.setupUi(this);
//Paramenters
QSettings settings("goldenhawking club","QTcpClientTest",this);
ui.lineEdit_ip->setText(settings.value("ip","localhost").toString());
ui.lineEdit_Port->setText(settings.value("port","23457").toString());
ui.dial->setValue(settings.value("clientNum","32").toInt());
ui.lcdNumber->display(settings.value("clientNum","32").toInt());
ui.horizontalSlider->setValue(settings.value("Payload","2048").toInt());
ui.label_load->setText(QString("Payload = %1").arg(settings.value("Payload","2048").toInt()));
ui.listView_msg->setModel(&model);
ui.setupUi(this);
//Paramenters
QSettings settings("goldenhawking club","QTcpClientTest",this);
ui.lineEdit_ip->setText(settings.value("ip","localhost").toString());
ui.lineEdit_Port->setText(settings.value("port","23457").toString());
ui.dial->setValue(settings.value("clientNum","32").toInt());
ui.lcdNumber->display(settings.value("clientNum","32").toInt());
ui.horizontalSlider->setValue(settings.value("Payload","2048").toInt());
ui.label_load->setText(QString("Payload = %1").arg(settings.value("Payload","2048").toInt()));
ui.listView_msg->setModel(&model);
}
QTcpClientTest::~QTcpClientTest()
......@@ -24,162 +24,162 @@ QTcpClientTest::~QTcpClientTest()
}
void QTcpClientTest::on_horizontalSlider_valueChanged(int value)
{
ui.label_load->setText(QString("Payload = %1").arg(value));
ui.label_load->setText(QString("Payload = %1").arg(value));
}
void QTcpClientTest::on_action_Connect_triggered(bool bConn)
{
//connect to the server
QSettings settings("goldenhawking club","QTcpClientTest",this);
settings.setValue("ip",ui.lineEdit_ip->text());
settings.setValue("port",ui.lineEdit_Port->text());
settings.setValue("clientNum",ui.dial->value());
settings.setValue("Payload",ui.horizontalSlider->value());
if (bConn==true)
{
nTimer = startTimer(100);
}
else
killTimer(nTimer);
//connect to the server
QSettings settings("goldenhawking club","QTcpClientTest",this);
settings.setValue("ip",ui.lineEdit_ip->text());
settings.setValue("port",ui.lineEdit_Port->text());
settings.setValue("clientNum",ui.dial->value());
settings.setValue("Payload",ui.horizontalSlider->value());
if (bConn==true)
{
nTimer = startTimer(100);
}
else
killTimer(nTimer);
}
void QTcpClientTest::on_client_trasferred(qint64 dtw)
{
QGHTcpClient * pSock = qobject_cast<QGHTcpClient*>(sender());
if (pSock)
{
displayMessage(QString("client %1 Transferrd %2 bytes.").arg((quintptr)pSock).arg(dtw));
}
QGHTcpClient * pSock = qobject_cast<QGHTcpClient*>(sender());
if (pSock)
{
displayMessage(QString("client %1 Transferrd %2 bytes.").arg((quintptr)pSock).arg(dtw));
}
}
void QTcpClientTest::on_client_connected()
{
QGHTcpClient * pSock = qobject_cast<QGHTcpClient*>(sender());
if (pSock)
{
displayMessage(QString("client %1 connected.").arg((quintptr)pSock));
QGHTcpClient * pSock = qobject_cast<QGHTcpClient*>(sender());
if (pSock)
{
displayMessage(QString("client %1 connected.").arg((quintptr)pSock));
}
}
}
void QTcpClientTest::on_client_disconnected()
{
QGHTcpClient * pSock = qobject_cast<QGHTcpClient*>(sender());
if (pSock)
{
displayMessage(QString("client %1 disconnected.").arg((quintptr)pSock));
//disconnect the signal immediately so that the system resource could be freed.
disconnect(pSock, SIGNAL(readyRead()),this, SLOT(new_data_recieved()));
disconnect(pSock, SIGNAL(connected()),this, SLOT(on_client_connected()));
disconnect(pSock, SIGNAL(disconnected()),this,SLOT(on_client_disconnected()));
disconnect(pSock, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));
disconnect(pSock, SIGNAL(bytesWritten(qint64)), this, SLOT(on_client_trasferred(qint64)));
m_clients.remove(pSock);
pSock->deleteLater();
}
QGHTcpClient * pSock = qobject_cast<QGHTcpClient*>(sender());
if (pSock)
{
displayMessage(QString("client %1 disconnected.").arg((quintptr)pSock));
//disconnect the signal immediately so that the system resource could be freed.
disconnect(pSock, SIGNAL(readyRead()),this, SLOT(new_data_recieved()));
disconnect(pSock, SIGNAL(connected()),this, SLOT(on_client_connected()));
disconnect(pSock, SIGNAL(disconnected()),this,SLOT(on_client_disconnected()));
disconnect(pSock, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));
disconnect(pSock, SIGNAL(bytesWritten(qint64)), this, SLOT(on_client_trasferred(qint64)));
m_clients.remove(pSock);
pSock->deleteLater();
}
}
void QTcpClientTest::displayError(QAbstractSocket::SocketError /*err*/)
{
QGHTcpClient * sock = qobject_cast<QGHTcpClient *> (sender());
if (sock)
displayMessage(QString("client %1 error msg:").arg((quintptr)sock)+sock->errorString());
QGHTcpClient * sock = qobject_cast<QGHTcpClient *> (sender());
if (sock)
displayMessage(QString("client %1 error msg:").arg((quintptr)sock)+sock->errorString());
}
void QTcpClientTest::new_data_recieved()
{
QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
if (pSock)
{
QByteArray array =pSock->readAll();
//in this example, we just do nothing but to display the byte size.
displayMessage(QString("client %1 Recieved %2 bytes.").arg((quintptr)pSock).arg(array.size()));
}
QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());
if (pSock)
{
QByteArray array =pSock->readAll();
//in this example, we just do nothing but to display the byte size.
displayMessage(QString("client %1 Recieved %2 bytes.").arg((quintptr)pSock).arg(array.size()));
}
}
void QTcpClientTest::timerEvent(QTimerEvent * evt)
{
static int nCount = 0;
if (evt->timerId()==nTimer)
{
int nTotalClients = ui.dial->value();
int nPayload = ui.horizontalSlider->value();
QList<QGHTcpClient*> listObj = m_clients.keys();
nCount++;
if (nCount % 100 == 0)
{
//send heart-beating
foreach(QGHTcpClient * pSock,listObj)
{
QByteArray array(sizeof(SMARTLINK_HEARTBEATING),0);
char * ptr = array.data();
SMARTLINK_HEARTBEATING * pMsg = (SMARTLINK_HEARTBEATING *)ptr;
pMsg->Mark = 0xBEBE;
pMsg->tmStamp = 0;
//3/10 possibility to send a data block to server
(pSock)->SendData(array);
}
}
foreach(QGHTcpClient * sock,listObj)
{
QGHTcpClient * sockDestin = listObj.at(rand() % listObj.size());
static int nCount = 0;
if (evt->timerId()==nTimer)
{
int nTotalClients = ui.dial->value();
int nPayload = ui.horizontalSlider->value();
QList<QGHTcpClient*> listObj = m_clients.keys();
nCount++;
if (nCount % 100 == 0)
{
//send heart-beating
foreach(QGHTcpClient * pSock,listObj)
{
QByteArray array(sizeof(SMARTLINK_HEARTBEATING),0);
char * ptr = array.data();
SMARTLINK_HEARTBEATING * pMsg = (SMARTLINK_HEARTBEATING *)ptr;
pMsg->Mark = 0xBEBE;
pMsg->tmStamp = 0;
//3/10 possibility to send a data block to server
(pSock)->SendData(array);
}
}
foreach(QGHTcpClient * sock,listObj)
{
QGHTcpClient * sockDestin = listObj.at(rand() % listObj.size());
if (rand()%1000<5)
{
quint16 nMsgLen = qrand()%(32)+nPayload-32-sizeof(SMARTLINK_MSG);
QByteArray array(sizeof(SMARTLINK_MSG) + nMsgLen - 1,0);
char * ptr = array.data();
SMARTLINK_MSG * pMsg = (SMARTLINK_MSG *)ptr;
pMsg->Mark = 0x55AA;
pMsg->version = 1;
pMsg->SerialNum = 0;
pMsg->Priority = 1;
pMsg->Reserved1 = 0;
pMsg->source_id = (quint32)((quint64)(sock) & 0xffffffff );
if (rand()%1000<5)
{
quint16 nMsgLen = qrand()%(32)+nPayload-32-sizeof(SMARTLINK_MSG);
QByteArray array(sizeof(SMARTLINK_MSG) + nMsgLen - 1,0);
char * ptr = array.data();
SMARTLINK_MSG * pMsg = (SMARTLINK_MSG *)ptr;
pMsg->Mark = 0x55AA;
pMsg->version = 1;
pMsg->SerialNum = 0;
pMsg->Priority = 1;
pMsg->Reserved1 = 0;
pMsg->source_id = (quint32)((quint64)(sock) & 0xffffffff );
pMsg->destin_id = (quint32)((quint64)(sockDestin) & 0xffffffff );;
pMsg->destin_id = (quint32)((quint64)(sockDestin) & 0xffffffff );;
pMsg->data_length = nMsgLen;
pMsg->Reserved2 = 0;
for (int i=0;i<nMsgLen;i++)
pMsg->data[i] = '0' + i%10;
//3/10 possibility to send a data block to server
sock->SendData(array);
}
}
//
if (rand()%100 <1)
//if (m_clients.size()==0)
{
//1/10 chance to make new connections.
if (m_clients.size()>nTotalClients)
{
int nDel = m_clients.size()-nTotalClients;
QList<QGHTcpClient*> listObj = m_clients.keys();
for (int i=0;i<nDel;i++)
{
listObj.at(i)->abort();
}
}
QGHTcpClient * client = new QGHTcpClient(this,ui.horizontalSlider->value());
//client->connectToHost(ui.lineEdit_ip->text(),ui.lineEdit_Port->text().toUShort());
m_clients[client] = QDateTime::currentDateTime();
connect(client, SIGNAL(readyRead()),this, SLOT(new_data_recieved()));
//connect(client, SIGNAL(connected()),this, SLOT(on_client_connected()));
connect(client, SIGNAL(disconnected()),this,SLOT(on_client_disconnected()));
connect(client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));
connect(client, SIGNAL(bytesWritten(qint64)), this, SLOT(on_client_trasferred(qint64)));
connect(client, SIGNAL(encrypted()), this, SLOT(on_client_connected()));
QString strCerPath = QCoreApplication::applicationDirPath() + "/ca_cert.pem";
QList<QSslCertificate> lstCas = QSslCertificate::fromPath(strCerPath);
client->setCaCertificates(lstCas);
//debug
//client->setPeerVerifyMode(QSslSocket::VerifyNone);
client->connectToHostEncrypted(ui.lineEdit_ip->text(),ui.lineEdit_Port->text().toUShort());
}
}
pMsg->data_length = nMsgLen;
pMsg->Reserved2 = 0;
for (int i=0;i<nMsgLen;i++)
pMsg->data[i] = '0' + i%10;
//3/10 possibility to send a data block to server
sock->SendData(array);
}
}
//
if (rand()%100 <1)
//if (m_clients.size()==0)
{
//1/10 chance to make new connections.
if (m_clients.size()>nTotalClients)
{
int nDel = m_clients.size()-nTotalClients;
QList<QGHTcpClient*> listObj = m_clients.keys();
for (int i=0;i<nDel;i++)
{
listObj.at(i)->abort();
}
}
QGHTcpClient * client = new QGHTcpClient(this,ui.horizontalSlider->value());
//client->connectToHost(ui.lineEdit_ip->text(),ui.lineEdit_Port->text().toUShort());
m_clients[client] = QDateTime::currentDateTime();
connect(client, SIGNAL(readyRead()),this, SLOT(new_data_recieved()));
//connect(client, SIGNAL(connected()),this, SLOT(on_client_connected()));
connect(client, SIGNAL(disconnected()),this,SLOT(on_client_disconnected()));
connect(client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));
connect(client, SIGNAL(bytesWritten(qint64)), this, SLOT(on_client_trasferred(qint64)));
connect(client, SIGNAL(encrypted()), this, SLOT(on_client_connected()));
QString strCerPath = QCoreApplication::applicationDirPath() + "/ca_cert.pem";
QList<QSslCertificate> lstCas = QSslCertificate::fromPath(strCerPath);
client->setCaCertificates(lstCas);
//debug
//client->setPeerVerifyMode(QSslSocket::VerifyNone);
client->connectToHostEncrypted(ui.lineEdit_ip->text(),ui.lineEdit_Port->text().toUShort());
}
}
}
void QTcpClientTest::displayMessage(const QString &str)
{
model.insertRow(0,new QStandardItem(str));
while (model.rowCount()>=256)
model.removeRow(model.rowCount()-1);
model.insertRow(0,new QStandardItem(str));
while (model.rowCount()>=256)
model.removeRow(model.rowCount()-1);
}
......@@ -10,26 +10,26 @@
#include <QStandardItem>
class QTcpClientTest : public QMainWindow
{
Q_OBJECT
Q_OBJECT
public:
QTcpClientTest(QWidget *parent = 0, Qt::WindowFlags flags = 0);
~QTcpClientTest();
virtual void timerEvent(QTimerEvent * evt);
QTcpClientTest(QWidget *parent = 0, Qt::WindowFlags flags = 0);
~QTcpClientTest();
virtual void timerEvent(QTimerEvent * evt);
private:
Ui::QTcpClientTestClass ui;
QMap<QGHTcpClient *, QDateTime> m_clients;
int nTimer;
QStandardItemModel model;
Ui::QTcpClientTestClass ui;
QMap<QGHTcpClient *, QDateTime> m_clients;
int nTimer;
QStandardItemModel model;
public slots:
void on_horizontalSlider_valueChanged(int);
void on_action_Connect_triggered(bool);
void new_data_recieved();
void on_client_trasferred(qint64);
void on_client_connected();
void on_client_disconnected();
void displayError(QAbstractSocket::SocketError);
void displayMessage(const QString &str);
void on_horizontalSlider_valueChanged(int);
void on_action_Connect_triggered(bool);
void new_data_recieved();
void on_client_trasferred(qint64);
void on_client_connected();
void on_client_disconnected();
void displayError(QAbstractSocket::SocketError);
void displayMessage(const QString &str);
};
#endif // QTCPCLIENTTEST_H
......@@ -3,9 +3,9 @@
int main(int argc, char *argv[])
{
QApplication a(argc, argv);
MainDialog w;
w.show();
QApplication a(argc, argv);
MainDialog w;
w.show();
return a.exec();
return a.exec();
}
......@@ -2,13 +2,13 @@
#include "ui_maindialog.h"
MainDialog::MainDialog(QWidget *parent) :
QDialog(parent),
ui(new Ui::MainDialog)
QDialog(parent),
ui(new Ui::MainDialog)
{
ui->setupUi(this);
ui->setupUi(this);
}
MainDialog::~MainDialog()
{
delete ui;
delete ui;
}
......@@ -4,19 +4,19 @@
#include <QDialog>
namespace Ui {
class MainDialog;
class MainDialog;
}
class MainDialog : public QDialog
{
Q_OBJECT
Q_OBJECT
public:
explicit MainDialog(QWidget *parent = 0);
~MainDialog();
explicit MainDialog(QWidget *parent = 0);
~MainDialog();
private:
Ui::MainDialog *ui;
Ui::MainDialog *ui;
};
#endif // MAINDIALOG_H
......@@ -6,221 +6,221 @@
namespace ZPDatabase{
DatabaseResource::DatabaseResource(QObject *parent) :
QThread(parent)
{
bTerm = false;
}
//!Get an database connection belong to current thread.
//!if database does not exist, it will be added using dbtype
QSqlDatabase DatabaseResource::databse(const QString & strDBName)
{
QMutexLocker locker(&m_mutex_reg);
if (false==QSqlDatabase::contains(strDBName))
{
QString msg = tr(" Connection name ")+strDBName+ tr(" does not exist.");
emit evt_Message(msg);
return QSqlDatabase();
}
return QSqlDatabase::database(strDBName);
}
void DatabaseResource::remove_connections()
{
QMap<QString,tagConnectionPara> sets;
{
QMutexLocker locker(&m_mutex_reg);
sets = currentDatabaseConnections();
}
DatabaseResource::DatabaseResource(QObject *parent) :
QThread(parent)
{
bTerm = false;
}
//!Get an database connection belong to current thread.
//!if database does not exist, it will be added using dbtype
QSqlDatabase DatabaseResource::databse(const QString & strDBName)
{
QMutexLocker locker(&m_mutex_reg);
if (false==QSqlDatabase::contains(strDBName))
{
QString msg = tr(" Connection name ")+strDBName+ tr(" does not exist.");
emit evt_Message(msg);
return QSqlDatabase();
}
return QSqlDatabase::database(strDBName);
}
void DatabaseResource::remove_connections()
{
QMap<QString,tagConnectionPara> sets;
{
QMutexLocker locker(&m_mutex_reg);
sets = currentDatabaseConnections();
}
foreach (QString name, sets.keys())
this->remove_connection(name);
}
foreach (QString name, sets.keys())
this->remove_connection(name);
}
//!Remove Database
void DatabaseResource::remove_connection(const QString & strDBName)
{
QMutexLocker locker(&m_mutex_reg);
if (true==QSqlDatabase::contains(strDBName))
{
QSqlDatabase db = QSqlDatabase::database(strDBName);
if (db.isOpen()==true)
db.close();
QSqlDatabase::removeDatabase(strDBName);
QString msg = tr(" Connection removed ")+strDBName+ tr(" .");
emit evt_Message(msg);
//!Remove Database
void DatabaseResource::remove_connection(const QString & strDBName)
{
QMutexLocker locker(&m_mutex_reg);
if (true==QSqlDatabase::contains(strDBName))
{
QSqlDatabase db = QSqlDatabase::database(strDBName);
if (db.isOpen()==true)
db.close();
QSqlDatabase::removeDatabase(strDBName);
QString msg = tr(" Connection removed ")+strDBName+ tr(" .");
emit evt_Message(msg);
}
else
{
QString msg = tr(" Connection name ")+strDBName+ tr(" does not exist.");
emit evt_Message(msg);
}
m_dbNames.remove(strDBName) ;
}
else
{
QString msg = tr(" Connection name ")+strDBName+ tr(" does not exist.");
emit evt_Message(msg);
}
m_dbNames.remove(strDBName) ;
}
bool DatabaseResource::addConnection(
const QString & connName,
const QString & type,
const QString & HostAddr,
int port,
const QString & dbName,
const QString & User,
const QString & Pass,
const QString & ExtraOptions,
const QString & testSQL
)
{
QMutexLocker locker(&m_mutex_reg);
tagConnectionPara para;
para.connName = connName;
para.type = type;
para.HostAddr = HostAddr;
para.port = port;
para.dbName = dbName;
para.User = User;
para.Pass = Pass;
para.status = true;
para.testSQL = testSQL;
para.ExtraOptions = ExtraOptions;
}
bool DatabaseResource::addConnection(
const QString & connName,
const QString & type,
const QString & HostAddr,
int port,
const QString & dbName,
const QString & User,
const QString & Pass,
const QString & ExtraOptions,
const QString & testSQL
)
{
QMutexLocker locker(&m_mutex_reg);
tagConnectionPara para;
para.connName = connName;
para.type = type;
para.HostAddr = HostAddr;
para.port = port;
para.dbName = dbName;
para.User = User;
para.Pass = Pass;
para.status = true;
para.testSQL = testSQL;
para.ExtraOptions = ExtraOptions;
if (true==QSqlDatabase::contains(connName))
{
QSqlDatabase db = QSqlDatabase::database(connName);
if (db.isOpen()==true)
db.close();
QSqlDatabase::removeDatabase(connName);
QString msg = tr(" Connection removed ")+connName+ tr(" .");
emit evt_Message(msg);
}
if (true==QSqlDatabase::contains(connName))
{
QSqlDatabase db = QSqlDatabase::database(connName);
if (db.isOpen()==true)
db.close();
QSqlDatabase::removeDatabase(connName);
QString msg = tr(" Connection removed ")+connName+ tr(" .");
emit evt_Message(msg);
}
m_dbNames[connName] = para;
QSqlDatabase db = QSqlDatabase::addDatabase(type,connName);
db.setHostName(HostAddr);
db.setPort(port);
db.setDatabaseName(dbName);
db.setUserName(User);
db.setPassword(Pass);
db.setConnectOptions(ExtraOptions);
if (db.open()==true)
{
QString msg = tr(" Connection ")+connName+ tr(" Established.");
emit evt_Message(msg);
return true;
}
QString msg = tr(" Connection ")+connName+ tr(" Can't be opened. MSG=");
msg += db.lastError().text();
emit evt_Message(msg);
QSqlDatabase::removeDatabase(connName);
m_dbNames.remove(connName) ;
return false;
}
bool DatabaseResource::confirmConnection (const QString & connName)
{
QMutexLocker locker(&m_mutex_reg);
if (false==m_dbNames.contains(connName))
{
QString msg = tr(" Connection ")+connName+ tr(" has not been added.");
emit evt_Message(msg);
return false;
}
tagConnectionPara & para = m_dbNames[connName];
if (true==QSqlDatabase::contains(connName) )
{
QSqlDatabase db = QSqlDatabase::database(connName);
if (db.isOpen()==true)
{
bool bNeedDisconnect = false;
if (para.testSQL.length())
{
QSqlQuery query(db);
query.exec(para.testSQL);
if (query.lastError().type()!=QSqlError::NoError)
{
QString msg = tr(" Connection ")+connName+ tr(" confirm failed. MSG=");
msg += query.lastError().text();
emit evt_Message(msg);
bNeedDisconnect = true;
}
}
if (bNeedDisconnect==true)
{
db.close();
QSqlDatabase::removeDatabase(connName);
return false;
}
else
return true;
}
QString msg = tr(" Connection ")+connName+ tr(" has not been opened.");
emit evt_Message(msg);
db = QSqlDatabase::addDatabase(para.type,para.connName);
db.setHostName(para.HostAddr);
db.setPort(para.port);
db.setDatabaseName(para.dbName);
db.setUserName(para.User);
db.setPassword(para.Pass);
db.setConnectOptions(para.ExtraOptions);
if (db.open()==true)
{
para.status = true;
para.lastError = "";
msg = tr(" Connection ")+connName+ tr(" Re-Established.");
emit evt_Message(msg);
return true;
}
QSqlDatabase::removeDatabase(connName);
msg = tr(" Connection ")+connName+ tr(" Can't be opened. MSG=");
msg += db.lastError().text();
emit evt_Message(msg);
para.status = false;
para.lastError = db.lastError().text();
return false;
}
m_dbNames[connName] = para;
QSqlDatabase db = QSqlDatabase::addDatabase(type,connName);
db.setHostName(HostAddr);
db.setPort(port);
db.setDatabaseName(dbName);
db.setUserName(User);
db.setPassword(Pass);
db.setConnectOptions(ExtraOptions);
if (db.open()==true)
{
QString msg = tr(" Connection ")+connName+ tr(" Established.");
emit evt_Message(msg);
return true;
}
QString msg = tr(" Connection ")+connName+ tr(" Can't be opened. MSG=");
msg += db.lastError().text();
emit evt_Message(msg);
QSqlDatabase::removeDatabase(connName);
m_dbNames.remove(connName) ;
return false;
}
bool DatabaseResource::confirmConnection (const QString & connName)
{
QMutexLocker locker(&m_mutex_reg);
if (false==m_dbNames.contains(connName))
{
QString msg = tr(" Connection ")+connName+ tr(" has not been added.");
emit evt_Message(msg);
return false;
}
tagConnectionPara & para = m_dbNames[connName];
if (true==QSqlDatabase::contains(connName) )
{
QSqlDatabase db = QSqlDatabase::database(connName);
if (db.isOpen()==true)
{
bool bNeedDisconnect = false;
if (para.testSQL.length())
{
QSqlQuery query(db);
query.exec(para.testSQL);
if (query.lastError().type()!=QSqlError::NoError)
{
QString msg = tr(" Connection ")+connName+ tr(" confirm failed. MSG=");
msg += query.lastError().text();
emit evt_Message(msg);
bNeedDisconnect = true;
}
}
if (bNeedDisconnect==true)
{
db.close();
QSqlDatabase::removeDatabase(connName);
return false;
}
else
return true;
}
QString msg = tr(" Connection ")+connName+ tr(" has not been opened.");
emit evt_Message(msg);
db = QSqlDatabase::addDatabase(para.type,para.connName);
db.setHostName(para.HostAddr);
db.setPort(para.port);
db.setDatabaseName(para.dbName);
db.setUserName(para.User);
db.setPassword(para.Pass);
db.setConnectOptions(para.ExtraOptions);
if (db.open()==true)
{
para.status = true;
para.lastError = "";
msg = tr(" Connection ")+connName+ tr(" Re-Established.");
emit evt_Message(msg);
return true;
}
QSqlDatabase::removeDatabase(connName);
msg = tr(" Connection ")+connName+ tr(" Can't be opened. MSG=");
msg += db.lastError().text();
emit evt_Message(msg);
para.status = false;
para.lastError = db.lastError().text();
return false;
}
QSqlDatabase db = QSqlDatabase::addDatabase(para.type,para.connName);
db.setHostName(para.HostAddr);
db.setPort(para.port);
db.setDatabaseName(para.dbName);
db.setUserName(para.User);
db.setPassword(para.Pass);
db.setConnectOptions(para.ExtraOptions);
if (db.open()==true)
{
para.status = true;
para.lastError = "";
QString msg = tr(" Connection ")+connName+ tr(" Re-Established.");
emit evt_Message(msg);
return true;
}
QString msg = tr(" Connection ")+connName+ tr(" Can't be opened. MSG=");
msg += db.lastError().text();
emit evt_Message(msg);
QSqlDatabase::removeDatabase(connName);
para.status = false;
para.lastError = db.lastError().text();
return false;
}
QSqlDatabase db = QSqlDatabase::addDatabase(para.type,para.connName);
db.setHostName(para.HostAddr);
db.setPort(para.port);
db.setDatabaseName(para.dbName);
db.setUserName(para.User);
db.setPassword(para.Pass);
db.setConnectOptions(para.ExtraOptions);
if (db.open()==true)
{
para.status = true;
para.lastError = "";
QString msg = tr(" Connection ")+connName+ tr(" Re-Established.");
emit evt_Message(msg);
return true;
}
QString msg = tr(" Connection ")+connName+ tr(" Can't be opened. MSG=");
msg += db.lastError().text();
emit evt_Message(msg);
QSqlDatabase::removeDatabase(connName);
para.status = false;
para.lastError = db.lastError().text();
return false;
}
void DatabaseResource::run()
{
while(bTerm==false)
{
QMap<QString,tagConnectionPara> sets;
{
QMutexLocker locker(&m_mutex_reg);
sets = currentDatabaseConnections();
}
void DatabaseResource::run()
{
while(bTerm==false)
{
QMap<QString,tagConnectionPara> sets;
{
QMutexLocker locker(&m_mutex_reg);
sets = currentDatabaseConnections();
}
foreach (QString name, sets.keys())
{
confirmConnection(name) ;
if (bTerm==true)
break;
}
if (bTerm==false)
QThread::currentThread()->msleep(30000);
}
foreach (QString name, sets.keys())
{
confirmConnection(name) ;
if (bTerm==true)
break;
}
if (bTerm==false)
QThread::currentThread()->msleep(30000);
}
}
}
};
......@@ -11,62 +11,62 @@
//!In different thread, workers can get existing db connections
//! immediately without re-creation operations.
namespace ZPDatabase{
class DatabaseResource : public QThread
{
Q_OBJECT
public:
struct tagConnectionPara{
QString connName;
QString type;
QString HostAddr;
int port;
QString dbName;
QString User;
QString Pass;
QString ExtraOptions;
QString testSQL;
bool status;
QString lastError;
class DatabaseResource : public QThread
{
Q_OBJECT
public:
struct tagConnectionPara{
QString connName;
QString type;
QString HostAddr;
int port;
QString dbName;
QString User;
QString Pass;
QString ExtraOptions;
QString testSQL;
bool status;
QString lastError;
} ;
} ;
public:
explicit DatabaseResource(QObject *parent = 0);
public:
explicit DatabaseResource(QObject *parent = 0);
//!Get an database connection belong to current thread.
//!if database does not exist, it will be added using dbtype
QSqlDatabase databse(const QString & strDBName);
//!Get an database connection belong to current thread.
//!if database does not exist, it will be added using dbtype
QSqlDatabase databse(const QString & strDBName);
//!add connection connName, return true if ok.
bool addConnection(
const QString & connName,
const QString & type,
const QString & HostAddr,
int port,
const QString & dbName,
const QString & User,
const QString & Pass,
const QString & ExtraOptions,
const QString & testSQL
);
void remove_connection(const QString & strDBName);
void remove_connections();
bool confirmConnection (const QString & connName);
//!add connection connName, return true if ok.
bool addConnection(
const QString & connName,
const QString & type,
const QString & HostAddr,
int port,
const QString & dbName,
const QString & User,
const QString & Pass,
const QString & ExtraOptions,
const QString & testSQL
);
void remove_connection(const QString & strDBName);
void remove_connections();
bool confirmConnection (const QString & connName);
QMap <QString,tagConnectionPara> currentDatabaseConnections(){return m_dbNames;}
QMap <QString,tagConnectionPara> currentDatabaseConnections(){return m_dbNames;}
void run();
void run();
void TerminateMe(){bTerm = true;}
void TerminateMe(){bTerm = true;}
protected:
bool bTerm;
QMutex m_mutex_reg;
QMap <QString,tagConnectionPara> m_dbNames;
signals:
void evt_Message(const QString &);
public slots:
protected:
bool bTerm;
QMutex m_mutex_reg;
QMap <QString,tagConnectionPara> m_dbNames;
signals:
void evt_Message(const QString &);
public slots:
};
};
};
#endif // DATABASERESOURCE_H
......@@ -6,24 +6,24 @@ int main(int argc, char *argv[])
{
QApplication app(argc, argv);
QApplication app(argc, argv);
QTranslator qtTranslator;
qtTranslator.load("qt_" + QLocale::system().name(),
QLibraryInfo::location(QLibraryInfo::TranslationsPath));
app.installTranslator(&qtTranslator);
QTranslator qtTranslator;
qtTranslator.load("qt_" + QLocale::system().name(),
QLibraryInfo::location(QLibraryInfo::TranslationsPath));
app.installTranslator(&qtTranslator);
QTranslator appTranslator;
QString strTransLocalFile =
QCoreApplication::applicationDirPath()+"/" +
QCoreApplication::applicationName()+"_"+
QLocale::system().name()+".qm";
appTranslator.load(strTransLocalFile );
app.installTranslator(&appTranslator);
QTranslator appTranslator;
QString strTransLocalFile =
QCoreApplication::applicationDirPath()+"/" +
QCoreApplication::applicationName()+"_"+
QLocale::system().name()+".qm";
appTranslator.load(strTransLocalFile );
app.installTranslator(&appTranslator);
ZPMainFrame w;
w.show();
int pp = app.exec();
return pp;
ZPMainFrame w;
w.show();
int pp = app.exec();
return pp;
}
......@@ -15,97 +15,97 @@
namespace ZPNetwork{
class zp_net_ThreadPool : public QObject
{
Q_OBJECT
public:
explicit zp_net_ThreadPool(int nPayLoad = 4096,QObject *parent = 0);
//Listening Ctrl
//Begin a listening socket at special address and port. The socket will be activated as soon as possible
void AddListeningAddress(const QString & id,const QHostAddress & address , quint16 nPort,bool bSSLConn = true);
//Remove a listening socket at special address and port.The socket will be deactivated as soon as possible
void RemoveListeningAddress(const QString & id);
//Remove all listening sockets
void RemoveAllAddresses();
//Trans Control
//Add n client-Trans Thread(s).
void AddClientTransThreads(int nThreads,bool bSSL=true);
//Remove n client-Trans Thread(s).a thread marked reomved will be terminated after its last client socket exited.
void RemoveClientTransThreads(int nThreads,bool bSSL=true);
//Kick All Clients
void KickAllClients();
//Deactive Immediately
void DeactiveImmediately();
//when Program exit, wait for close;
bool CanExit();
//The status
QStringList ListenerNames();
int TransThreadNum();
int TransThreadNum(bool bSSL);
int totalClients(int idxThread);
protected:
int m_nPayLoad;
//QMutex m_mutex_listen;
//QMutex m_mutex_trans;
//This map stores listenThreadObjects
QMap<QString,zp_netListenThread *> m_map_netListenThreads;
//Internal Threads to hold each listenThreadObjects' message Queue
QMap<QString,QThread *> m_map_netInternalListenThreads;
//This map stores ClientTransThreadObjects
QVector<zp_netTransThread *> m_vec_NetTransThreads;
//Internal Threads to hold each m_map_NetTransThreads' message Queue
QVector<QThread *> m_vec_netInternalTransThreads;
bool TransThreadDel(zp_netTransThread * pThreadObj);
signals:
//These Message is nessery.-------------------------------------
void evt_Message(const QString &);
//The socket error message
void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);
//this event indicates new client connected.
void evt_NewClientConnected(QObject * /*clientHandle*/);
//this event indicates a client disconnected.
void evt_ClientDisconnected(QObject * /*clientHandle*/);
//some data arrival
void evt_Data_recieved(QObject * /*clientHandle*/,const QByteArray & /*datablock*/ );
//a block of data has been successfuly sent
void evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/);
//Internal Message for ctrl.------------------------------------
//Listen Control
void startListen(const QString & id);
void stopListen(const QString & id);
void evt_EstablishConnection(QObject * threadid,qintptr socketDescriptor);
void evt_FireConnection(QObject * threadid,const QHostAddress & hostAddr, quint16 port);
//Trans Control,for intenal thread usage
void evt_SendDataToClient(QObject * objClient,const QByteArray & dtarray);
void evt_BroadcastData(QObject * objFromClient,const QByteArray & dtarray);
void evt_KickClient(QObject *);
void evt_DeactivteImmediately(zp_netTransThread *);
void evt_KickAll(zp_netTransThread *);
protected slots:
void on_New_Arrived_Client(qintptr socketDescriptor);
void on_ListenClosed(const QString & id);
public slots:
//Call this function to send data to client
void SendDataToClient(QObject * objClient,const QByteArray & dtarray);
//Call this function to send data to client
void BroadcastData(QObject * objFromClient,const QByteArray & dtarray);
//Close client Immediatele
void KickClients(QObject * object);
//Possive Connection Methods
bool connectTo (const QHostAddress & address , quint16 nPort,bool bSSLConn = true);
};
class zp_net_ThreadPool : public QObject
{
Q_OBJECT
public:
explicit zp_net_ThreadPool(int nPayLoad = 4096,QObject *parent = 0);
//Listening Ctrl
//Begin a listening socket at special address and port. The socket will be activated as soon as possible
void AddListeningAddress(const QString & id,const QHostAddress & address , quint16 nPort,bool bSSLConn = true);
//Remove a listening socket at special address and port.The socket will be deactivated as soon as possible
void RemoveListeningAddress(const QString & id);
//Remove all listening sockets
void RemoveAllAddresses();
//Trans Control
//Add n client-Trans Thread(s).
void AddClientTransThreads(int nThreads,bool bSSL=true);
//Remove n client-Trans Thread(s).a thread marked reomved will be terminated after its last client socket exited.
void RemoveClientTransThreads(int nThreads,bool bSSL=true);
//Kick All Clients
void KickAllClients();
//Deactive Immediately
void DeactiveImmediately();
//when Program exit, wait for close;
bool CanExit();
//The status
QStringList ListenerNames();
int TransThreadNum();
int TransThreadNum(bool bSSL);
int totalClients(int idxThread);
protected:
int m_nPayLoad;
//QMutex m_mutex_listen;
//QMutex m_mutex_trans;
//This map stores listenThreadObjects
QMap<QString,zp_netListenThread *> m_map_netListenThreads;
//Internal Threads to hold each listenThreadObjects' message Queue
QMap<QString,QThread *> m_map_netInternalListenThreads;
//This map stores ClientTransThreadObjects
QVector<zp_netTransThread *> m_vec_NetTransThreads;
//Internal Threads to hold each m_map_NetTransThreads' message Queue
QVector<QThread *> m_vec_netInternalTransThreads;
bool TransThreadDel(zp_netTransThread * pThreadObj);
signals:
//These Message is nessery.-------------------------------------
void evt_Message(const QString &);
//The socket error message
void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);
//this event indicates new client connected.
void evt_NewClientConnected(QObject * /*clientHandle*/);
//this event indicates a client disconnected.
void evt_ClientDisconnected(QObject * /*clientHandle*/);
//some data arrival
void evt_Data_recieved(QObject * /*clientHandle*/,const QByteArray & /*datablock*/ );
//a block of data has been successfuly sent
void evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/);
//Internal Message for ctrl.------------------------------------
//Listen Control
void startListen(const QString & id);
void stopListen(const QString & id);
void evt_EstablishConnection(QObject * threadid,qintptr socketDescriptor);
void evt_FireConnection(QObject * threadid,const QHostAddress & hostAddr, quint16 port);
//Trans Control,for intenal thread usage
void evt_SendDataToClient(QObject * objClient,const QByteArray & dtarray);
void evt_BroadcastData(QObject * objFromClient,const QByteArray & dtarray);
void evt_KickClient(QObject *);
void evt_DeactivteImmediately(zp_netTransThread *);
void evt_KickAll(zp_netTransThread *);
protected slots:
void on_New_Arrived_Client(qintptr socketDescriptor);
void on_ListenClosed(const QString & id);
public slots:
//Call this function to send data to client
void SendDataToClient(QObject * objClient,const QByteArray & dtarray);
//Call this function to send data to client
void BroadcastData(QObject * objFromClient,const QByteArray & dtarray);
//Close client Immediatele
void KickClients(QObject * object);
//Possive Connection Methods
bool connectTo (const QHostAddress & address , quint16 nPort,bool bSSLConn = true);
};
}
#endif // ZP_NET_THREADPOOL_H
#include "zp_netlistenthread.h"
namespace ZPNetwork{
zp_netListenThread::zp_netListenThread(const QString & id, QHostAddress address ,quint16 port,bool bSSL,QObject *parent)
:QObject(parent)
,m_tcpServer(0)
,m_id(id)
,m_address(address)
,m_port(port)
,m_bSSLConn (bSSL)
{
zp_netListenThread::zp_netListenThread(const QString & id, QHostAddress address ,quint16 port,bool bSSL,QObject *parent)
:QObject(parent)
,m_tcpServer(0)
,m_id(id)
,m_address(address)
,m_port(port)
,m_bSSLConn (bSSL)
{
}
void zp_netListenThread::startListen(const QString & id)
{
if (id==m_id)
{
if (!m_tcpServer)
{
m_tcpServer = new ZP_TcpServer(this);
connect (m_tcpServer,&ZP_TcpServer::evt_NewClientArrived,this,&zp_netListenThread::evt_NewClientArrived,Qt::QueuedConnection);
if (false==m_tcpServer->listen(m_address,m_port))
{
disconnect (m_tcpServer,&ZP_TcpServer::evt_NewClientArrived,this,&zp_netListenThread::evt_NewClientArrived);
emit evt_Message("Error>"+QString(tr("Can not start listen!")));
m_tcpServer->deleteLater();
m_tcpServer = 0;
//Close this thread.
emit evt_ListenClosed(m_id);
}
}
}
}
}
void zp_netListenThread::startListen(const QString & id)
{
if (id==m_id)
{
if (!m_tcpServer)
{
m_tcpServer = new ZP_TcpServer(this);
connect (m_tcpServer,&ZP_TcpServer::evt_NewClientArrived,this,&zp_netListenThread::evt_NewClientArrived,Qt::QueuedConnection);
if (false==m_tcpServer->listen(m_address,m_port))
{
disconnect (m_tcpServer,&ZP_TcpServer::evt_NewClientArrived,this,&zp_netListenThread::evt_NewClientArrived);
emit evt_Message("Error>"+QString(tr("Can not start listen!")));
m_tcpServer->deleteLater();
m_tcpServer = 0;
//Close this thread.
emit evt_ListenClosed(m_id);
}
}
}
}
void zp_netListenThread::stopListen(const QString & id)
{
if (id==m_id)
{
if (m_tcpServer)
{
disconnect (m_tcpServer,&ZP_TcpServer::evt_NewClientArrived,this,&zp_netListenThread::evt_NewClientArrived);
m_tcpServer->close();
m_tcpServer->deleteLater();
m_tcpServer = 0;
//Close this thread.
emit evt_Message("Info>"+QString(tr("Listen Closed!")));
emit evt_ListenClosed(m_id);
}
}
}
void zp_netListenThread::stopListen(const QString & id)
{
if (id==m_id)
{
if (m_tcpServer)
{
disconnect (m_tcpServer,&ZP_TcpServer::evt_NewClientArrived,this,&zp_netListenThread::evt_NewClientArrived);
m_tcpServer->close();
m_tcpServer->deleteLater();
m_tcpServer = 0;
//Close this thread.
emit evt_Message("Info>"+QString(tr("Listen Closed!")));
emit evt_ListenClosed(m_id);
}
}
}
}
......@@ -8,26 +8,26 @@
#include <QHostAddress>
#include "zp_tcpserver.h"
namespace ZPNetwork{
class zp_netListenThread : public QObject
{
Q_OBJECT
protected:
ZP_TcpServer * m_tcpServer;
QString m_id; //The listen ID
QHostAddress m_address;
quint16 m_port;
bool m_bSSLConn;
public:
explicit zp_netListenThread(const QString & id, QHostAddress address ,quint16 port,bool bSSL = true,QObject *parent = 0);
bool bSSLConn(){return m_bSSLConn;}
signals:
void evt_Message(const QString &);
void evt_ListenClosed(const QString &);
//This message will tell thread pool, a new incoming connection has arrived.
void evt_NewClientArrived(qintptr socketDescriptor);
public slots:
void startListen(const QString & id);
void stopListen(const QString & id);
};
class zp_netListenThread : public QObject
{
Q_OBJECT
protected:
ZP_TcpServer * m_tcpServer;
QString m_id; //The listen ID
QHostAddress m_address;
quint16 m_port;
bool m_bSSLConn;
public:
explicit zp_netListenThread(const QString & id, QHostAddress address ,quint16 port,bool bSSL = true,QObject *parent = 0);
bool bSSLConn(){return m_bSSLConn;}
signals:
void evt_Message(const QString &);
void evt_ListenClosed(const QString &);
//This message will tell thread pool, a new incoming connection has arrived.
void evt_NewClientArrived(qintptr socketDescriptor);
public slots:
void startListen(const QString & id);
void stopListen(const QString & id);
};
}
#endif // ZP_NETLISTENTHREAD_H
......@@ -7,66 +7,66 @@
#include <QAbstractSocket>
#include <QMutex>
namespace ZPNetwork{
class zp_net_ThreadPool;
class zp_netTransThread : public QObject
{
Q_OBJECT
public:
explicit zp_netTransThread(zp_net_ThreadPool * pThreadPool,int nPayLoad = 4096,QObject *parent = 0);
class zp_net_ThreadPool;
class zp_netTransThread : public QObject
{
Q_OBJECT
public:
explicit zp_netTransThread(zp_net_ThreadPool * pThreadPool,int nPayLoad = 4096,QObject *parent = 0);
QList <QObject *> clientsList();
int CurrentClients();
void SetPayload(int nPayload);
bool isActive(){return m_bActivated;}
QList <QObject *> clientsList();
int CurrentClients();
void SetPayload(int nPayload);
bool isActive(){return m_bActivated;}
bool CanExit();
bool SSLConnection(){return m_bSSLConnection ;}
void SetSSLConnection(bool bssl){ m_bSSLConnection = bssl;}
bool CanExit();
bool SSLConnection(){return m_bSSLConnection ;}
void SetSSLConnection(bool bssl){ m_bSSLConnection = bssl;}
private:
bool m_bActivated;
bool m_bSSLConnection;
//sending buffer, hold byteArraies.
QMap<QObject *,QList<QByteArray> > m_buffer_sending;
private:
bool m_bActivated;
bool m_bSSLConnection;
//sending buffer, hold byteArraies.
QMap<QObject *,QList<QByteArray> > m_buffer_sending;
QMap<QObject *,QList<qint64> > m_buffer_sending_offset;
QMap<QObject*,int> m_clientList;
int m_nPayLoad;
QMutex m_mutex_protect;
zp_net_ThreadPool * m_pThreadPool;
public slots:
//This slot dealing with multi-thread client socket accept.
void incomingConnection(QObject * threadid,qintptr socketDescriptor);
//This slot dealing with possive connect to method.
void startConnection(QObject * threadid,const QHostAddress & addr, quint16 port);
//sending dtarray to objClient. dtarray will be pushed into m_buffer_sending
void SendDataToClient(QObject * objClient,const QByteArray & dtarray);
//Broadcast dtarray to every client except objFromClient itself
void BroadcastData(QObject * objFromClient,const QByteArray & dtarray);
//Set terminate mark, the thread will quit after last client quit.
void Deactivate(){m_bActivated = false;}
//terminate this thread immediately
void DeactivateImmediately(zp_netTransThread *);
//Kick all clients .
void KickAllClients(zp_netTransThread *);
//Kick client.
void KickClient(QObject *);
QMap<QObject *,QList<qint64> > m_buffer_sending_offset;
QMap<QObject*,int> m_clientList;
int m_nPayLoad;
QMutex m_mutex_protect;
zp_net_ThreadPool * m_pThreadPool;
public slots:
//This slot dealing with multi-thread client socket accept.
void incomingConnection(QObject * threadid,qintptr socketDescriptor);
//This slot dealing with possive connect to method.
void startConnection(QObject * threadid,const QHostAddress & addr, quint16 port);
//sending dtarray to objClient. dtarray will be pushed into m_buffer_sending
void SendDataToClient(QObject * objClient,const QByteArray & dtarray);
//Broadcast dtarray to every client except objFromClient itself
void BroadcastData(QObject * objFromClient,const QByteArray & dtarray);
//Set terminate mark, the thread will quit after last client quit.
void Deactivate(){m_bActivated = false;}
//terminate this thread immediately
void DeactivateImmediately(zp_netTransThread *);
//Kick all clients .
void KickAllClients(zp_netTransThread *);
//Kick client.
void KickClient(QObject *);
protected slots:
//when client closed, this slot will be activated.
void client_closed();
void new_data_recieved();
void some_data_sended(qint64);
void displayError(QAbstractSocket::SocketError socketError);
//SSL Encrypted started
void on_encrypted();
signals:
protected slots:
//when client closed, this slot will be activated.
void client_closed();
void new_data_recieved();
void some_data_sended(qint64);
void displayError(QAbstractSocket::SocketError socketError);
//SSL Encrypted started
void on_encrypted();
signals:
void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);
void evt_NewClientConnected(QObject * client);
void evt_ClientDisconnected(QObject * client);
void evt_Data_recieved(QObject * ,const QByteArray & );
void evt_Data_transferred(QObject * client,qint64);
};
void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);
void evt_NewClientConnected(QObject * client);
void evt_ClientDisconnected(QObject * client);
void evt_Data_recieved(QObject * ,const QByteArray & );
void evt_Data_transferred(QObject * client,qint64);
};
}
#endif // ZP_NETTRANSTHREAD_H
#include "zp_tcpserver.h"
#include <assert.h>
namespace ZPNetwork{
ZP_TcpServer::ZP_TcpServer(QObject *parent )
: QTcpServer(parent)
{
ZP_TcpServer::ZP_TcpServer(QObject *parent )
: QTcpServer(parent)
{
}
}
void ZP_TcpServer::incomingConnection(qintptr socketDescriptor)
{
emit evt_NewClientArrived(socketDescriptor);
}
void ZP_TcpServer::incomingConnection(qintptr socketDescriptor)
{
emit evt_NewClientArrived(socketDescriptor);
}
}
......@@ -3,16 +3,16 @@
#include <QTcpServer>
namespace ZPNetwork{
class ZP_TcpServer : public QTcpServer
{
Q_OBJECT
class ZP_TcpServer : public QTcpServer
{
Q_OBJECT
public:
ZP_TcpServer(QObject *parent);
protected:
void incomingConnection(qintptr socketDescriptor);
signals:
void evt_NewClientArrived(qintptr socketDescriptor);
};
public:
ZP_TcpServer(QObject *parent);
protected:
void incomingConnection(qintptr socketDescriptor);
signals:
void evt_NewClientArrived(qintptr socketDescriptor);
};
}
#endif // ZP_TcpServer_H
#include "zp_pipeline.h"
namespace ZPTaskEngine{
zp_pipeline::zp_pipeline(QObject *parent) :
QObject(parent)
{
m_nExistingThreads = 0;
}
zp_pipeline::zp_pipeline(QObject *parent) :
QObject(parent)
{
m_nExistingThreads = 0;
}
int zp_pipeline::addThreads(int nThreads)
{
if (nThreads>=1 && nThreads <=128)
{
for (int i=0;i<nThreads;i++)
{
zp_plWorkingThread * thread = new zp_plWorkingThread(this);
m_vec_workingThreads.push_back(thread);
QThread * pTh = new QThread(this);
m_vec_InternalworkingThreads.push_back(pTh);
thread->moveToThread(pTh);
connect (this,&zp_pipeline::evt_start_work,thread,&zp_plWorkingThread::FetchNewTask,Qt::QueuedConnection);
connect (this,&zp_pipeline::evt_stop_work,thread,&zp_plWorkingThread::setStopMark,Qt::QueuedConnection);
connect (thread,&zp_plWorkingThread::taskFinished,this,&zp_pipeline::on_finished_task,Qt::QueuedConnection);
pTh->start();
m_mutex_protect.lock();
m_nExistingThreads++;
m_mutex_protect.unlock();
int zp_pipeline::addThreads(int nThreads)
{
if (nThreads>=1 && nThreads <=128)
{
for (int i=0;i<nThreads;i++)
{
zp_plWorkingThread * thread = new zp_plWorkingThread(this);
m_vec_workingThreads.push_back(thread);
QThread * pTh = new QThread(this);
m_vec_InternalworkingThreads.push_back(pTh);
thread->moveToThread(pTh);
connect (this,&zp_pipeline::evt_start_work,thread,&zp_plWorkingThread::FetchNewTask,Qt::QueuedConnection);
connect (this,&zp_pipeline::evt_stop_work,thread,&zp_plWorkingThread::setStopMark,Qt::QueuedConnection);
connect (thread,&zp_plWorkingThread::taskFinished,this,&zp_pipeline::on_finished_task,Qt::QueuedConnection);
pTh->start();
m_mutex_protect.lock();
m_nExistingThreads++;
m_mutex_protect.unlock();
}
}
}
return m_vec_workingThreads.size();
}
}
return m_vec_workingThreads.size();
}
//remove n threads and kill them.nthreads=-1 means kill all.
int zp_pipeline::removeThreads(int nThreads)
{
int nsz = m_vec_workingThreads.size();
if (nThreads<0 || nThreads>nsz)
nThreads = nsz;
//remove n threads and kill them.nthreads=-1 means kill all.
int zp_pipeline::removeThreads(int nThreads)
{
int nsz = m_vec_workingThreads.size();
if (nThreads<0 || nThreads>nsz)
nThreads = nsz;
for (int i=0;i<nThreads;i++ )
{
emit evt_stop_work( m_vec_workingThreads.last());
m_vec_workingThreads.pop_back();
m_vec_InternalworkingThreads.pop_back();
}
return m_vec_workingThreads.size();
}
for (int i=0;i<nThreads;i++ )
{
emit evt_stop_work( m_vec_workingThreads.last());
m_vec_workingThreads.pop_back();
m_vec_InternalworkingThreads.pop_back();
}
return m_vec_workingThreads.size();
}
//Threads call this function to get next task, task will be popped from list.
//Threads call this function to get next task, task will be popped from list.
zp_plTaskBase * zp_pipeline::popTask( bool * bValid)
{
*bValid = false;
zp_plTaskBase * funcres = 0;
m_mutex_protect.lock();
if (m_list_tasks.empty()==false)
{
funcres =* m_list_tasks.begin();
m_list_tasks.pop_front();
*bValid = true;
}
m_mutex_protect.unlock();
return funcres;
}
zp_plTaskBase * zp_pipeline::popTask( bool * bValid)
{
*bValid = false;
zp_plTaskBase * funcres = 0;
m_mutex_protect.lock();
if (m_list_tasks.empty()==false)
{
funcres =* m_list_tasks.begin();
m_list_tasks.pop_front();
*bValid = true;
}
m_mutex_protect.unlock();
return funcres;
}
//Call this function to insert func
void zp_pipeline::pushTask(zp_plTaskBase * task,bool bFire )
{
m_mutex_protect.lock();
m_list_tasks.push_back(task);
task->addRef();
m_mutex_protect.unlock();
//Call this function to insert func
void zp_pipeline::pushTask(zp_plTaskBase * task,bool bFire )
{
m_mutex_protect.lock();
m_list_tasks.push_back(task);
task->addRef();
m_mutex_protect.unlock();
int nsz = m_vec_workingThreads.size();
if (bFire==true)
for (int i=0;i<nsz;i++ )
{
if (m_vec_workingThreads[i]->m_bBusy==false)
{
on_finished_task (m_vec_workingThreads[i]);
break;
}
}
int nsz = m_vec_workingThreads.size();
if (bFire==true)
for (int i=0;i<nsz;i++ )
{
if (m_vec_workingThreads[i]->m_bBusy==false)
{
on_finished_task (m_vec_workingThreads[i]);
break;
}
}
}
}
int zp_pipeline::threadsCount()
{
return m_vec_workingThreads.size();
}
int zp_pipeline::threadsCount()
{
return m_vec_workingThreads.size();
}
int zp_pipeline::payload()
{
int res = 0;
m_mutex_protect.lock();
res = m_list_tasks.size();
m_mutex_protect.unlock();
int zp_pipeline::payload()
{
int res = 0;
m_mutex_protect.lock();
res = m_list_tasks.size();
m_mutex_protect.unlock();
return res;
}
int zp_pipeline::idleThreads()
{
int idle = 0;
int nsz = m_vec_workingThreads.size();
for (int i=0;i<nsz;i++ )
{
if (m_vec_workingThreads[i]->m_bBusy==false)
idle++;
}
return idle;
}
return res;
}
int zp_pipeline::idleThreads()
{
int idle = 0;
int nsz = m_vec_workingThreads.size();
for (int i=0;i<nsz;i++ )
{
if (m_vec_workingThreads[i]->m_bBusy==false)
idle++;
}
return idle;
}
void zp_pipeline::on_finished_task (zp_plWorkingThread * task)
{
int res = 0;
m_mutex_protect.lock();
res = m_list_tasks.size();
m_mutex_protect.unlock();
if (res)
emit evt_start_work(task );
}
void zp_pipeline::on_finished_task (zp_plWorkingThread * task)
{
int res = 0;
m_mutex_protect.lock();
res = m_list_tasks.size();
m_mutex_protect.unlock();
if (res)
emit evt_start_work(task );
}
}
......@@ -16,48 +16,48 @@
namespace ZPTaskEngine{
class zp_plWorkingThread;
class zp_pipeline : public QObject
{
Q_OBJECT
friend class zp_plWorkingThread;
public:
explicit zp_pipeline(QObject *parent = 0);
int addThreads(int nThreads);
//remove n threads and kill them.nthreads=-1 means kill all.
int removeThreads(int nThreads);
int threadsCount();
int payload();
int idleThreads();
bool canClose() {return m_nExistingThreads==0?true:false;}
protected:
//Mutex
QMutex m_mutex_protect;
//working threads
QVector<zp_plWorkingThread *> m_vec_workingThreads;
QVector<QThread *> m_vec_InternalworkingThreads;
//This is a C++11 function pool.
//return -1,the function will be kept in list, return 0 , will be removed.
std::list< zp_plTaskBase * > m_list_tasks;
int m_nExistingThreads;
protected:
//Threads call this function to get next task, task will be popped from list.
zp_plTaskBase * popTask( bool * bValid);
signals:
void evt_start_work(zp_plWorkingThread * task);
void evt_stop_work(zp_plWorkingThread * task);
public slots:
void on_finished_task (zp_plWorkingThread * task);
//Call this function to insert func
void pushTask(zp_plTaskBase * task,bool bFire = true);
};
class zp_plWorkingThread;
class zp_pipeline : public QObject
{
Q_OBJECT
friend class zp_plWorkingThread;
public:
explicit zp_pipeline(QObject *parent = 0);
int addThreads(int nThreads);
//remove n threads and kill them.nthreads=-1 means kill all.
int removeThreads(int nThreads);
int threadsCount();
int payload();
int idleThreads();
bool canClose() {return m_nExistingThreads==0?true:false;}
protected:
//Mutex
QMutex m_mutex_protect;
//working threads
QVector<zp_plWorkingThread *> m_vec_workingThreads;
QVector<QThread *> m_vec_InternalworkingThreads;
//This is a C++11 function pool.
//return -1,the function will be kept in list, return 0 , will be removed.
std::list< zp_plTaskBase * > m_list_tasks;
int m_nExistingThreads;
protected:
//Threads call this function to get next task, task will be popped from list.
zp_plTaskBase * popTask( bool * bValid);
signals:
void evt_start_work(zp_plWorkingThread * task);
void evt_stop_work(zp_plWorkingThread * task);
public slots:
void on_finished_task (zp_plWorkingThread * task);
//Call this function to insert func
void pushTask(zp_plTaskBase * task,bool bFire = true);
};
}
#endif // ZP_PIPELINE_H
#include "zp_pltaskbase.h"
namespace ZPTaskEngine{
zp_plTaskBase::zp_plTaskBase(QObject *parent) :
QObject(parent)
{
refCount = 0;
}
zp_plTaskBase::zp_plTaskBase(QObject *parent) :
QObject(parent)
{
refCount = 0;
}
}
......@@ -5,41 +5,41 @@
#include <QMutex>
#include <QMutexLocker>
namespace ZPTaskEngine{
class zp_plTaskBase : public QObject
{
Q_OBJECT
public:
explicit zp_plTaskBase(QObject *parent = 0);
virtual int run() = 0;
int addRef()
{
QMutexLocker locker(&m_mutex_ref);
refCount++;
return refCount;
}
int delRef()
{
QMutexLocker locker(&m_mutex_ref);
refCount--;
return refCount;
}
int ref()
{
QMutexLocker locker(&m_mutex_ref);
return refCount;
}
private:
int refCount;
QMutex m_mutex_ref;
signals:
public slots:
};
class zp_plTaskBase : public QObject
{
Q_OBJECT
public:
explicit zp_plTaskBase(QObject *parent = 0);
virtual int run() = 0;
int addRef()
{
QMutexLocker locker(&m_mutex_ref);
refCount++;
return refCount;
}
int delRef()
{
QMutexLocker locker(&m_mutex_ref);
refCount--;
return refCount;
}
int ref()
{
QMutexLocker locker(&m_mutex_ref);
return refCount;
}
private:
int refCount;
QMutex m_mutex_ref;
signals:
public slots:
};
}
#endif // ZP_PLTASKBASE_H
......@@ -2,53 +2,53 @@
#include <assert.h>
#include "zp_pipeline.h"
namespace ZPTaskEngine{
zp_plWorkingThread::zp_plWorkingThread(zp_pipeline * pipl,QObject *parent) :
QObject(parent)
{
m_bRuning = true;
m_pipeline = pipl;
assert(m_pipeline != NULL);
m_bBusy = false;
}
void zp_plWorkingThread::setStopMark(zp_plWorkingThread * obj)
{
if (obj != this)
return;
m_bRuning = false;
m_pipeline->m_mutex_protect.lock();
m_pipeline->m_nExistingThreads--;
m_pipeline->m_mutex_protect.unlock();
this->deleteLater();
QThread::currentThread()->quit();
}
void zp_plWorkingThread::FetchNewTask(zp_plWorkingThread * obj)
{
if (obj != this)
return;
if (m_bRuning)
{
bool bValid = false;
zp_plTaskBase * ptr = this->m_pipeline->popTask(&bValid);
if (bValid==true && ptr!=NULL)
{
m_bBusy = true;
int res = ptr->run();
ptr->delRef();
m_bBusy = false;
if (res!=0 )
this->m_pipeline->pushTask(ptr,false);
}
emit taskFinished(this);
}
}
zp_plWorkingThread::zp_plWorkingThread(zp_pipeline * pipl,QObject *parent) :
QObject(parent)
{
m_bRuning = true;
m_pipeline = pipl;
assert(m_pipeline != NULL);
m_bBusy = false;
}
void zp_plWorkingThread::setStopMark(zp_plWorkingThread * obj)
{
if (obj != this)
return;
m_bRuning = false;
m_pipeline->m_mutex_protect.lock();
m_pipeline->m_nExistingThreads--;
m_pipeline->m_mutex_protect.unlock();
this->deleteLater();
QThread::currentThread()->quit();
}
void zp_plWorkingThread::FetchNewTask(zp_plWorkingThread * obj)
{
if (obj != this)
return;
if (m_bRuning)
{
bool bValid = false;
zp_plTaskBase * ptr = this->m_pipeline->popTask(&bValid);
if (bValid==true && ptr!=NULL)
{
m_bBusy = true;
int res = ptr->run();
ptr->delRef();
m_bBusy = false;
if (res!=0 )
this->m_pipeline->pushTask(ptr,false);
}
emit taskFinished(this);
}
}
}
......@@ -5,29 +5,29 @@
#include "zp_pltaskbase.h"
namespace ZPTaskEngine{
class zp_pipeline;
//Working thread, reading functions from queue,
//running tasks
class zp_plWorkingThread : public QObject
{
Q_OBJECT
public:
explicit zp_plWorkingThread(zp_pipeline * pipl,QObject *parent = 0);
bool m_bBusy;
protected:
zp_pipeline * m_pipeline;
class zp_pipeline;
//Working thread, reading functions from queue,
//running tasks
class zp_plWorkingThread : public QObject
{
Q_OBJECT
public:
explicit zp_plWorkingThread(zp_pipeline * pipl,QObject *parent = 0);
bool m_bBusy;
protected:
zp_pipeline * m_pipeline;
bool m_bRuning;
bool m_bRuning;
public slots:
void setStopMark(zp_plWorkingThread *);
public slots:
void setStopMark(zp_plWorkingThread *);
void FetchNewTask(zp_plWorkingThread *);
void FetchNewTask(zp_plWorkingThread *);
signals:
signals:
void taskFinished(zp_plWorkingThread *);
};
void taskFinished(zp_plWorkingThread *);
};
}
#endif // ZP_PLWORKINGTHREAD_H
......@@ -3,166 +3,166 @@
#include <assert.h>
#include <functional>
namespace SmartLink{
st_client_table::st_client_table(
ZPNetwork::zp_net_ThreadPool * pool,
ZPTaskEngine::zp_pipeline * taskeng,
ZPDatabase::DatabaseResource * pDb,
QObject *parent) :
QObject(parent)
,m_pThreadPool(pool)
,m_pTaskEngine(taskeng)
,m_pDatabaseRes(pDb)
{
m_nHeartBeatingDeadThrd = 180;
connect (m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_NewClientConnected,this,&st_client_table::on_evt_NewClientConnected,Qt::QueuedConnection);
connect (m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_ClientDisconnected,this,&st_client_table::on_evt_ClientDisconnected,Qt::QueuedConnection);
connect (m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_Data_recieved,this,&st_client_table::on_evt_Data_recieved,Qt::QueuedConnection);
connect (m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_Data_transferred,this,&st_client_table::on_evt_Data_transferred,Qt::QueuedConnection);
}
st_client_table::~st_client_table()
{
}
void st_client_table::KickDealClients()
{
m_hash_mutex.lock();
for (QMap<QObject *,st_clientNode_baseTrans *>::iterator p =m_hash_sock2node.begin();
p!=m_hash_sock2node.end();p++)
{
p.value()->CheckHeartBeating();
}
m_hash_mutex.unlock();
}
bool st_client_table::regisitClientUUID(st_clientNode_baseTrans * c)
{
if (c->uuidValid()==false)
return false;
m_hash_mutex.lock();
m_hash_uuid2node[c->uuid()] = c;
m_hash_mutex.unlock();
return true;
}
st_clientNode_baseTrans * st_client_table::clientNodeFromUUID(quint32 uuid)
{
m_hash_mutex.lock();
if (m_hash_uuid2node.contains(uuid))
{
m_hash_mutex.unlock();
return m_hash_uuid2node[uuid];
}
m_hash_mutex.unlock();
return NULL;
}
st_clientNode_baseTrans * st_client_table::clientNodeFromSocket(QObject * sock)
{
m_hash_mutex.lock();
if (m_hash_sock2node.contains(sock))
{
m_hash_mutex.unlock();
return m_hash_sock2node[sock];
}
m_hash_mutex.unlock();
return NULL;
}
//this event indicates new client connected.
void st_client_table::on_evt_NewClientConnected(QObject * /*clientHandle*/)
{
}
//this event indicates a client disconnected.
void st_client_table::on_evt_ClientDisconnected(QObject * clientHandle)
{
bool nHashContains = false;
st_clientNode_baseTrans * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
if (nHashContains)
pClientNode = m_hash_sock2node[clientHandle];
if (pClientNode)
{
m_hash_sock2node.remove(clientHandle);
if (pClientNode->uuidValid())
m_hash_uuid2node.remove(pClientNode->uuid());
pClientNode->bTermSet = true;
disconnect (pClientNode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::SendDataToClient);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::KickClients);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message);
m_nodeToBeDel.push_back(pClientNode);
//qDebug()<<QString("%1(ref %2) Node Push in queue.\n").arg((unsigned int)pClientNode).arg(pClientNode->ref());
}
m_hash_mutex.unlock();
//Try to delete objects
QList <st_clientNode_baseTrans *> toBedel;
foreach(st_clientNode_baseTrans * pdelobj,m_nodeToBeDel)
{
if (pdelobj->ref() ==0)
toBedel.push_back(pdelobj);
else
{
//qDebug()<<QString("%1(ref %2) Waiting in del queue.\n").arg((unsigned int)pdelobj).arg(pdelobj->ref());
}
}
foreach(st_clientNode_baseTrans * pdelobj,toBedel)
{
m_nodeToBeDel.removeAll(pdelobj);
//qDebug()<<QString("%1(ref %2) deleting.\n").arg((unsigned int)pdelobj).arg(pdelobj->ref());
pdelobj->deleteLater();
}
}
//some data arrival
void st_client_table::on_evt_Data_recieved(QObject * clientHandle,const QByteArray & datablock )
{
//Push Clients to nodes if it is not exist
bool nHashContains = 0;
st_clientNode_baseTrans * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
if (false==nHashContains)
{
st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0);
//using queued connection of send and revieve;
connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::KickClients,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
pClientNode = pnode;
}
else
{
pClientNode = m_hash_sock2node[clientHandle];
}
assert(nHashContains!=0 && pClientNode !=0);
int nblocks = pClientNode->push_new_data(datablock);
if (nblocks<=1)
m_pTaskEngine->pushTask(pClientNode);
m_hash_mutex.unlock();
}
//a block of data has been successfuly sent
void st_client_table::on_evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/)
{
}
st_client_table::st_client_table(
ZPNetwork::zp_net_ThreadPool * pool,
ZPTaskEngine::zp_pipeline * taskeng,
ZPDatabase::DatabaseResource * pDb,
QObject *parent) :
QObject(parent)
,m_pThreadPool(pool)
,m_pTaskEngine(taskeng)
,m_pDatabaseRes(pDb)
{
m_nHeartBeatingDeadThrd = 180;
connect (m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_NewClientConnected,this,&st_client_table::on_evt_NewClientConnected,Qt::QueuedConnection);
connect (m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_ClientDisconnected,this,&st_client_table::on_evt_ClientDisconnected,Qt::QueuedConnection);
connect (m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_Data_recieved,this,&st_client_table::on_evt_Data_recieved,Qt::QueuedConnection);
connect (m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_Data_transferred,this,&st_client_table::on_evt_Data_transferred,Qt::QueuedConnection);
}
st_client_table::~st_client_table()
{
}
void st_client_table::KickDealClients()
{
m_hash_mutex.lock();
for (QMap<QObject *,st_clientNode_baseTrans *>::iterator p =m_hash_sock2node.begin();
p!=m_hash_sock2node.end();p++)
{
p.value()->CheckHeartBeating();
}
m_hash_mutex.unlock();
}
bool st_client_table::regisitClientUUID(st_clientNode_baseTrans * c)
{
if (c->uuidValid()==false)
return false;
m_hash_mutex.lock();
m_hash_uuid2node[c->uuid()] = c;
m_hash_mutex.unlock();
return true;
}
st_clientNode_baseTrans * st_client_table::clientNodeFromUUID(quint32 uuid)
{
m_hash_mutex.lock();
if (m_hash_uuid2node.contains(uuid))
{
m_hash_mutex.unlock();
return m_hash_uuid2node[uuid];
}
m_hash_mutex.unlock();
return NULL;
}
st_clientNode_baseTrans * st_client_table::clientNodeFromSocket(QObject * sock)
{
m_hash_mutex.lock();
if (m_hash_sock2node.contains(sock))
{
m_hash_mutex.unlock();
return m_hash_sock2node[sock];
}
m_hash_mutex.unlock();
return NULL;
}
//this event indicates new client connected.
void st_client_table::on_evt_NewClientConnected(QObject * /*clientHandle*/)
{
}
//this event indicates a client disconnected.
void st_client_table::on_evt_ClientDisconnected(QObject * clientHandle)
{
bool nHashContains = false;
st_clientNode_baseTrans * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
if (nHashContains)
pClientNode = m_hash_sock2node[clientHandle];
if (pClientNode)
{
m_hash_sock2node.remove(clientHandle);
if (pClientNode->uuidValid())
m_hash_uuid2node.remove(pClientNode->uuid());
pClientNode->bTermSet = true;
disconnect (pClientNode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::SendDataToClient);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::KickClients);
disconnect (pClientNode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message);
m_nodeToBeDel.push_back(pClientNode);
//qDebug()<<QString("%1(ref %2) Node Push in queue.\n").arg((unsigned int)pClientNode).arg(pClientNode->ref());
}
m_hash_mutex.unlock();
//Try to delete objects
QList <st_clientNode_baseTrans *> toBedel;
foreach(st_clientNode_baseTrans * pdelobj,m_nodeToBeDel)
{
if (pdelobj->ref() ==0)
toBedel.push_back(pdelobj);
else
{
//qDebug()<<QString("%1(ref %2) Waiting in del queue.\n").arg((unsigned int)pdelobj).arg(pdelobj->ref());
}
}
foreach(st_clientNode_baseTrans * pdelobj,toBedel)
{
m_nodeToBeDel.removeAll(pdelobj);
//qDebug()<<QString("%1(ref %2) deleting.\n").arg((unsigned int)pdelobj).arg(pdelobj->ref());
pdelobj->deleteLater();
}
}
//some data arrival
void st_client_table::on_evt_Data_recieved(QObject * clientHandle,const QByteArray & datablock )
{
//Push Clients to nodes if it is not exist
bool nHashContains = 0;
st_clientNode_baseTrans * pClientNode = 0;
m_hash_mutex.lock();
nHashContains = m_hash_sock2node.contains(clientHandle);
if (false==nHashContains)
{
st_clientNode_baseTrans * pnode = new st_clientNodeAppLayer(this,clientHandle,0);
//using queued connection of send and revieve;
connect (pnode,&st_clientNode_baseTrans::evt_SendDataToClient,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::SendDataToClient,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_BroadcastData,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::evt_BroadcastData,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_close_client,m_pThreadPool,&ZPNetwork::zp_net_ThreadPool::KickClients,Qt::QueuedConnection);
connect (pnode,&st_clientNode_baseTrans::evt_Message,this,&st_client_table::evt_Message,Qt::QueuedConnection);
m_hash_sock2node[clientHandle] = pnode;
nHashContains = true;
pClientNode = pnode;
}
else
{
pClientNode = m_hash_sock2node[clientHandle];
}
assert(nHashContains!=0 && pClientNode !=0);
int nblocks = pClientNode->push_new_data(datablock);
if (nblocks<=1)
m_pTaskEngine->pushTask(pClientNode);
m_hash_mutex.unlock();
}
//a block of data has been successfuly sent
void st_client_table::on_evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/)
{
}
}
......@@ -10,69 +10,69 @@
#include "./st_message.h"
#include "../database/databaseresource.h"
namespace SmartLink{
class st_clientNode_baseTrans;
class st_client_table : public QObject
{
Q_OBJECT
public:
explicit st_client_table(ZPNetwork::zp_net_ThreadPool * pool, ZPTaskEngine::zp_pipeline * taskeng, ZPDatabase::DatabaseResource *pDb, QObject *parent = 0);
~st_client_table();
class st_clientNode_baseTrans;
class st_client_table : public QObject
{
Q_OBJECT
public:
explicit st_client_table(ZPNetwork::zp_net_ThreadPool * pool, ZPTaskEngine::zp_pipeline * taskeng, ZPDatabase::DatabaseResource *pDb, QObject *parent = 0);
~st_client_table();
bool regisitClientUUID(st_clientNode_baseTrans *);
st_clientNode_baseTrans * clientNodeFromUUID(quint32);
st_clientNode_baseTrans * clientNodeFromSocket(QObject *);
bool regisitClientUUID(st_clientNode_baseTrans *);
st_clientNode_baseTrans * clientNodeFromUUID(quint32);
st_clientNode_baseTrans * clientNodeFromSocket(QObject *);
//Heart beating and healthy
void KickDealClients();
int heartBeatingThrd(){return m_nHeartBeatingDeadThrd;}
void setHeartBeatingThrd(int h) {m_nHeartBeatingDeadThrd = h;}
//Heart beating and healthy
void KickDealClients();
int heartBeatingThrd(){return m_nHeartBeatingDeadThrd;}
void setHeartBeatingThrd(int h) {m_nHeartBeatingDeadThrd = h;}
//Database and disk resources
QString Database_UserAcct(){return m_strDBName_useraccount;}
void setDatabase_UserAcct(const QString & s){m_strDBName_useraccount = s;}
QString Database_Event(){return m_strDBName_event;}
void setDatabase_Event(const QString & s){m_strDBName_event = s;}
QString largeFileFolder(){return m_largeFileFolder;}
void setLargeFileFolder(const QString & s){m_largeFileFolder = s;}
//Database and disk resources
QString Database_UserAcct(){return m_strDBName_useraccount;}
void setDatabase_UserAcct(const QString & s){m_strDBName_useraccount = s;}
QString Database_Event(){return m_strDBName_event;}
void setDatabase_Event(const QString & s){m_strDBName_event = s;}
QString largeFileFolder(){return m_largeFileFolder;}
void setLargeFileFolder(const QString & s){m_largeFileFolder = s;}
ZPDatabase::DatabaseResource * dbRes(){return m_pDatabaseRes;}
protected:
//This list hold dead nodes that still in task queue,avoiding crash
QList<st_clientNode_baseTrans *> m_nodeToBeDel;
ZPDatabase::DatabaseResource * dbRes(){return m_pDatabaseRes;}
protected:
//This list hold dead nodes that still in task queue,avoiding crash
QList<st_clientNode_baseTrans *> m_nodeToBeDel;
//Very important hashes. will be improved for cross-server transfer
QMutex m_hash_mutex;
QMap<quint32,st_clientNode_baseTrans *> m_hash_uuid2node;
QMap<QObject *,st_clientNode_baseTrans *> m_hash_sock2node;
//Very important hashes. will be improved for cross-server transfer
QMutex m_hash_mutex;
QMap<quint32,st_clientNode_baseTrans *> m_hash_uuid2node;
QMap<QObject *,st_clientNode_baseTrans *> m_hash_sock2node;
//Concurrent Network frame work
ZPNetwork::zp_net_ThreadPool * m_pThreadPool;
//The piple-line
ZPTaskEngine::zp_pipeline * m_pTaskEngine;
//The database pool
ZPDatabase::DatabaseResource * m_pDatabaseRes;
//Concurrent Network frame work
ZPNetwork::zp_net_ThreadPool * m_pThreadPool;
//The piple-line
ZPTaskEngine::zp_pipeline * m_pTaskEngine;
//The database pool
ZPDatabase::DatabaseResource * m_pDatabaseRes;
//The max seconds before dead client be kicked out
int m_nHeartBeatingDeadThrd;
//Database Resource Names used by nodes
QString m_strDBName_useraccount;
QString m_strDBName_event;
QString m_largeFileFolder;
//The max seconds before dead client be kicked out
int m_nHeartBeatingDeadThrd;
//Database Resource Names used by nodes
QString m_strDBName_useraccount;
QString m_strDBName_event;
QString m_largeFileFolder;
signals:
void evt_Message (const QString &);
signals:
void evt_Message (const QString &);
public slots:
//this event indicates new client connected.
void on_evt_NewClientConnected(QObject * /*clientHandle*/);
//this event indicates a client disconnected.
void on_evt_ClientDisconnected(QObject * /*clientHandle*/);
//some data arrival
void on_evt_Data_recieved(QObject * /*clientHandle*/,const QByteArray & /*datablock*/ );
//a block of data has been successfuly sent
void on_evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/);
public slots:
//this event indicates new client connected.
void on_evt_NewClientConnected(QObject * /*clientHandle*/);
//this event indicates a client disconnected.
void on_evt_ClientDisconnected(QObject * /*clientHandle*/);
//some data arrival
void on_evt_Data_recieved(QObject * /*clientHandle*/,const QByteArray & /*datablock*/ );
//a block of data has been successfuly sent
void on_evt_Data_transferred(QObject * /*clientHandle*/,qint64 /*bytes sent*/);
};
};
}
#endif // ST_CLIENT_TABLE_H
......@@ -521,7 +521,7 @@ namespace SmartLink{
reply.DoneCode = res==true?0:1;
//if (res==false)
//strcpy(reply.TextInfo,"load Relation failed");
//strcpy(reply.TextInfo,"load Relation failed");
int ii = 0;
foreach (quint32 it, m_matched_nodes)
{
......@@ -576,7 +576,7 @@ namespace SmartLink{
reply.DoneCode = res==true?0:1;
//if (res==false)
//strcpy(reply.TextInfo,"load Relation failed");
//strcpy(reply.TextInfo,"load Relation failed");
//Send back
emit evt_SendDataToClient(this->sock(),array);
return reply.DoneCode==0?true:false;
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册