提交 4c19f566 编写于 作者: dengyihao's avatar dengyihao

fix bug

上级 adb813f0
...@@ -47,13 +47,13 @@ option( ...@@ -47,13 +47,13 @@ option(
option( option(
BUILD_WITH_UV BUILD_WITH_UV
"If build with libuv" "If build with libuv"
OFF ON
) )
option( option(
BUILD_WITH_UV_TRANS BUILD_WITH_UV_TRANS
"If build with libuv_trans " "If build with libuv_trans "
OFF ON
) )
option( option(
......
...@@ -64,6 +64,7 @@ typedef struct SRpcInit { ...@@ -64,6 +64,7 @@ typedef struct SRpcInit {
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled int idleTime; // milliseconds, 0 means idle timer is disabled
bool noPool; // create conn pool or not
// the following is for client app ecurity only // the following is for client app ecurity only
char *user; // user name char *user; // user name
char spi; // security parameter index char spi; // security parameter index
......
...@@ -154,7 +154,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -154,7 +154,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
} }
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent; SDnode * pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pRsp->msgType; tmsg_t msgType = pRsp->msgType;
...@@ -192,6 +192,7 @@ static int32_t dndInitClient(SDnode *pDnode) { ...@@ -192,6 +192,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
rpcInit.ckey = INTERNAL_CKEY; rpcInit.ckey = INTERNAL_CKEY;
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.parent = pDnode; rpcInit.parent = pDnode;
rpcInit.noPool = true;
char pass[TSDB_PASSWORD_LEN + 1] = {0}; char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
...@@ -217,7 +218,7 @@ static void dndCleanupClient(SDnode *pDnode) { ...@@ -217,7 +218,7 @@ static void dndCleanupClient(SDnode *pDnode) {
} }
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode *pDnode = param; SDnode * pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pReq->msgType; tmsg_t msgType = pReq->msgType;
...@@ -311,7 +312,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char ...@@ -311,7 +312,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
SAuthReq authReq = {0}; SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN); tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void *pReq = rpcMallocCont(contLen); void * pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq); tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
......
...@@ -21,16 +21,21 @@ class TestClient { ...@@ -21,16 +21,21 @@ class TestClient {
bool Init(const char* user, const char* pass, const char* fqdn, uint16_t port); bool Init(const char* user, const char* pass, const char* fqdn, uint16_t port);
void Cleanup(); void Cleanup();
void DoInit();
SRpcMsg* SendReq(SRpcMsg* pReq); SRpcMsg* SendReq(SRpcMsg* pReq);
void SetRpcRsp(SRpcMsg* pRsp); void SetRpcRsp(SRpcMsg* pRsp);
tsem_t* GetSem(); tsem_t* GetSem();
void Restart();
private: private:
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
uint16_t port; uint16_t port;
char user[128];
char pass[128];
void* clientRpc; void* clientRpc;
SRpcMsg* pRsp; SRpcMsg* pRsp;
tsem_t sem; tsem_t sem;
}; };
#endif /* _TD_TEST_CLIENT_H_ */ #endif /* _TD_TEST_CLIENT_H_ */
\ No newline at end of file
...@@ -20,10 +20,10 @@ ...@@ -20,10 +20,10 @@
#include "os.h" #include "os.h"
#include "dnode.h" #include "dnode.h"
#include "tmsg.h"
#include "tconfig.h" #include "tconfig.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h"
#include "tnote.h" #include "tnote.h"
#include "trpc.h" #include "trpc.h"
#include "tthread.h" #include "tthread.h"
...@@ -39,6 +39,7 @@ class Testbase { ...@@ -39,6 +39,7 @@ class Testbase {
void Restart(); void Restart();
void ServerStop(); void ServerStop();
void ServerStart(); void ServerStart();
void ClientRestart();
SRpcMsg* SendReq(tmsg_t msgType, void* pCont, int32_t contLen); SRpcMsg* SendReq(tmsg_t msgType, void* pCont, int32_t contLen);
private: private:
...@@ -100,7 +101,7 @@ class Testbase { ...@@ -100,7 +101,7 @@ class Testbase {
{ \ { \
char* bytes = (char*)calloc(1, len); \ char* bytes = (char*)calloc(1, len); \
for (int32_t i = 0; i < len - 1; ++i) { \ for (int32_t i = 0; i < len - 1; ++i) { \
bytes[i] = b; \ bytes[i] = b; \
} \ } \
EXPECT_STREQ(test.GetShowBinary(len), bytes); \ EXPECT_STREQ(test.GetShowBinary(len), bytes); \
} }
...@@ -138,4 +139,4 @@ class Testbase { ...@@ -138,4 +139,4 @@ class Testbase {
#define IgnoreTimestamp() \ #define IgnoreTimestamp() \
{ test.GetShowTimestamp(); } { test.GetShowTimestamp(); }
#endif /* _TD_TEST_BASE_H_ */ #endif /* _TD_TEST_BASE_H_ */
\ No newline at end of file
...@@ -13,33 +13,38 @@ ...@@ -13,33 +13,38 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tep.h"
#include "sut.h" #include "sut.h"
#include "tep.h"
static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) {
TestClient* client = (TestClient*)parent; TestClient* client = (TestClient*)parent;
client->SetRpcRsp(pRsp); client->SetRpcRsp(pRsp);
uInfo("response:%s from dnode, code:0x%x", TMSG_INFO(pRsp->msgType), pRsp->code); uInfo("x response:%s from dnode, code:0x%x, msgSize: %d", TMSG_INFO(pRsp->msgType), pRsp->code, pRsp->contLen);
tsem_post(client->GetSem()); tsem_post(client->GetSem());
} }
void TestClient::SetRpcRsp(SRpcMsg* pRsp) { this->pRsp = pRsp; }; void TestClient::SetRpcRsp(SRpcMsg* rsp) {
this->pRsp = (SRpcMsg*)calloc(1, sizeof(SRpcMsg));
this->pRsp->msgType = rsp->msgType;
this->pRsp->code = rsp->code;
this->pRsp->pCont = rsp->pCont;
this->pRsp->contLen = rsp->contLen;
};
tsem_t* TestClient::GetSem() { return &sem; } tsem_t* TestClient::GetSem() { return &sem; }
bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint16_t port) { void TestClient::DoInit() {
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0}; char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt); taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = (char*)"DND-C"; rpcInit.label = (char*)"shell";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processClientRsp; rpcInit.cfp = processClientRsp;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000; rpcInit.idleTime = 30 * 1000;
rpcInit.user = (char*)user; rpcInit.user = (char*)this->user;
rpcInit.ckey = (char*)"key"; rpcInit.ckey = (char*)"key";
rpcInit.parent = this; rpcInit.parent = this;
rpcInit.secret = (char*)secretEncrypt; rpcInit.secret = (char*)secretEncrypt;
...@@ -47,11 +52,16 @@ bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint ...@@ -47,11 +52,16 @@ bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint
clientRpc = rpcOpen(&rpcInit); clientRpc = rpcOpen(&rpcInit);
ASSERT(clientRpc); ASSERT(clientRpc);
tsem_init(&this->sem, 0, 0);
}
tsem_init(&sem, 0, 0); bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint16_t port) {
strcpy(this->fqdn, fqdn); strcpy(this->fqdn, fqdn);
strcpy(this->user, user);
strcpy(this->pass, pass);
this->port = port; this->port = port;
// this->pRsp = NULL;
this->DoInit();
return true; return true;
} }
...@@ -60,11 +70,16 @@ void TestClient::Cleanup() { ...@@ -60,11 +70,16 @@ void TestClient::Cleanup() {
rpcClose(clientRpc); rpcClose(clientRpc);
} }
void TestClient::Restart() {
this->Cleanup();
this->DoInit();
}
SRpcMsg* TestClient::SendReq(SRpcMsg* pReq) { SRpcMsg* TestClient::SendReq(SRpcMsg* pReq) {
SEpSet epSet = {0}; SEpSet epSet = {0};
addEpIntoEpSet(&epSet, fqdn, port); addEpIntoEpSet(&epSet, fqdn, port);
rpcSendRequest(clientRpc, &epSet, pReq, NULL); rpcSendRequest(clientRpc, &epSet, pReq, NULL);
tsem_wait(&sem); tsem_wait(&sem);
uInfo("y response:%s from dnode, code:0x%x, msgSize: %d", TMSG_INFO(pRsp->msgType), pRsp->code, pRsp->contLen);
return pRsp; return pRsp;
} }
...@@ -21,9 +21,9 @@ void Testbase::InitLog(const char* path) { ...@@ -21,9 +21,9 @@ void Testbase::InitLog(const char* path) {
mDebugFlag = 143; mDebugFlag = 143;
cDebugFlag = 0; cDebugFlag = 0;
jniDebugFlag = 0; jniDebugFlag = 0;
tmrDebugFlag = 0; tmrDebugFlag = 143;
uDebugFlag = 0; uDebugFlag = 143;
rpcDebugFlag = 0; rpcDebugFlag = 143;
qDebugFlag = 0; qDebugFlag = 0;
wDebugFlag = 0; wDebugFlag = 0;
sDebugFlag = 0; sDebugFlag = 0;
...@@ -66,16 +66,21 @@ void Testbase::Init(const char* path, int16_t port) { ...@@ -66,16 +66,21 @@ void Testbase::Init(const char* path, int16_t port) {
void Testbase::Cleanup() { void Testbase::Cleanup() {
tFreeSTableMetaRsp(&metaRsp); tFreeSTableMetaRsp(&metaRsp);
server.Stop();
client.Cleanup(); client.Cleanup();
taosMsleep(10);
server.Stop();
dndCleanup(); dndCleanup();
} }
void Testbase::Restart() { server.Restart(); } void Testbase::Restart() {
server.Restart();
client.Restart();
}
void Testbase::ServerStop() { server.Stop(); } void Testbase::ServerStop() { server.Stop(); }
void Testbase::ServerStart() { server.DoStart(); } void Testbase::ServerStart() { server.DoStart(); }
void Testbase::ClientRestart() { client.Restart(); }
SRpcMsg* Testbase::SendReq(tmsg_t msgType, void* pCont, int32_t contLen) { SRpcMsg* Testbase::SendReq(tmsg_t msgType, void* pCont, int32_t contLen) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
...@@ -194,4 +199,4 @@ int32_t Testbase::GetShowRows() { return pRetrieveRsp->numOfRows; } ...@@ -194,4 +199,4 @@ int32_t Testbase::GetShowRows() { return pRetrieveRsp->numOfRows; }
STableMetaRsp* Testbase::GetShowMeta() { return &metaRsp; } STableMetaRsp* Testbase::GetShowMeta() { return &metaRsp; }
SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; } SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; }
\ No newline at end of file
...@@ -77,7 +77,7 @@ static void mndTransReExecute(void *param, void *tmrId) { ...@@ -77,7 +77,7 @@ static void mndTransReExecute(void *param, void *tmrId) {
SMnode *pMnode = param; SMnode *pMnode = param;
if (mndIsMaster(pMnode)) { if (mndIsMaster(pMnode)) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
} }
...@@ -89,7 +89,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { ...@@ -89,7 +89,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
SMnode *pMnode = param; SMnode *pMnode = param;
if (mndIsMaster(pMnode)) { if (mndIsMaster(pMnode)) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void * pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg);
} }
...@@ -404,7 +404,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { ...@@ -404,7 +404,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
return NULL; return NULL;
} }
if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE) { if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER &&
pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE) {
SRpcConnInfo connInfo = {0}; SRpcConnInfo connInfo = {0};
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
...@@ -439,7 +440,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) { ...@@ -439,7 +440,7 @@ void mndProcessMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
int32_t code = 0; int32_t code = 0;
tmsg_t msgType = pMsg->rpcMsg.msgType; tmsg_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle; void * ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType & 1U); bool isReq = (msgType & 1U);
mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle); mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
......
...@@ -190,6 +190,9 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) { ...@@ -190,6 +190,9 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) {
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq); tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
server2.Stop(); server2.Stop();
taosMsleep(1000);
// test.ClientRestart();
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
...@@ -226,6 +229,7 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) { ...@@ -226,6 +229,7 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) {
{ {
// server start, wait until the rollback finished // server start, wait until the rollback finished
server2.DoStart(); server2.DoStart();
test.ClientRestart();
taosMsleep(1000); taosMsleep(1000);
int32_t retry = 0; int32_t retry = 0;
...@@ -248,7 +252,6 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) { ...@@ -248,7 +252,6 @@ TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) {
ASSERT_NE(retry, retryMax); ASSERT_NE(retry, retryMax);
} }
} }
TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) { TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) {
{ {
// send message first, then dnode2 crash, result is returned, and rollback is started // send message first, then dnode2 crash, result is returned, and rollback is started
...@@ -315,4 +318,4 @@ TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) { ...@@ -315,4 +318,4 @@ TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) {
ASSERT_NE(retry, retryMax); ASSERT_NE(retry, retryMax);
} }
} }
\ No newline at end of file
...@@ -46,8 +46,10 @@ class MndTestTrans : public ::testing::Test { ...@@ -46,8 +46,10 @@ class MndTestTrans : public ::testing::Test {
free(buffer); free(buffer);
taosFsyncFile(fd); taosFsyncFile(fd);
taosCloseFile(fd); taosCloseFile(fd);
taosMsleep(1000);
test.ServerStart(); test.ServerStart();
test.ClientRestart();
} }
static Testbase test; static Testbase test;
...@@ -200,4 +202,4 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { ...@@ -200,4 +202,4 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
test.SendShowRetrieveReq(); test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 2); EXPECT_EQ(test.GetShowRows(), 2);
} }
} }
\ No newline at end of file
...@@ -617,6 +617,7 @@ TEST_F(MndTestUser, 06_Create_Drop_Alter_User) { ...@@ -617,6 +617,7 @@ TEST_F(MndTestUser, 06_Create_Drop_Alter_User) {
// restart // restart
test.Restart(); test.Restart();
taosMsleep(1000);
test.SendShowMetaReq(TSDB_MGMT_TABLE_USER, ""); test.SendShowMetaReq(TSDB_MGMT_TABLE_USER, "");
CHECK_META("show users", 4); CHECK_META("show users", 4);
...@@ -631,4 +632,4 @@ TEST_F(MndTestUser, 06_Create_Drop_Alter_User) { ...@@ -631,4 +632,4 @@ TEST_F(MndTestUser, 06_Create_Drop_Alter_User) {
CheckTimestamp(); CheckTimestamp();
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
} }
\ No newline at end of file
...@@ -217,7 +217,7 @@ typedef struct SConnBuffer { ...@@ -217,7 +217,7 @@ typedef struct SConnBuffer {
char* buf; char* buf;
int len; int len;
int cap; int cap;
int left; int total;
} SConnBuffer; } SConnBuffer;
typedef void (*AsyncCB)(uv_async_t* handle); typedef void (*AsyncCB)(uv_async_t* handle);
......
...@@ -56,6 +56,7 @@ typedef struct { ...@@ -56,6 +56,7 @@ typedef struct {
int8_t connType; int8_t connType;
int64_t index; int64_t index;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
bool noPool; // pool or not
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index char spi; // security parameter index
......
...@@ -64,6 +64,7 @@ typedef struct { ...@@ -64,6 +64,7 @@ typedef struct {
void (*cfp)(void *parent, SRpcMsg *, SEpSet *); void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
bool noPool;
int32_t refCount; int32_t refCount;
void * parent; void * parent;
void * idPool; // handle to ID pool void * idPool; // handle to ID pool
......
...@@ -27,7 +27,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -27,7 +27,7 @@ void* rpcOpen(const SRpcInit* pInit) {
return NULL; return NULL;
} }
if (pInit->label) { if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1);
} }
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
...@@ -35,6 +35,8 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -35,6 +35,8 @@ void* rpcOpen(const SRpcInit* pInit) {
} else { } else {
pRpc->numOfThreads = pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads;
} }
pRpc->noPool = pInit->noPool;
pRpc->connType = pInit->connType; pRpc->connType = pInit->connType;
pRpc->idleTime = pInit->idleTime; pRpc->idleTime = pInit->idleTime;
pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
......
...@@ -126,6 +126,9 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -126,6 +126,9 @@ static void clientHandleResp(SCliConn* conn) {
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
// buf's mem alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.contLen = transContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = transContFromHead((char*)pHead); rpcMsg.pCont = transContFromHead((char*)pHead);
...@@ -140,9 +143,9 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -140,9 +143,9 @@ static void clientHandleResp(SCliConn* conn) {
tDebug("client conn %p persist by app", conn); tDebug("client conn %p persist by app", conn);
} }
tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType), tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pRpc->label, conn,
inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
ntohs(conn->locaddr.sin_port)); inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen);
conn->secured = pHead->secured; conn->secured = pHead->secured;
if (conn->push != NULL && conn->ctnRdCnt != 0) { if (conn->push != NULL && conn->ctnRdCnt != 0) {
...@@ -150,26 +153,26 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -150,26 +153,26 @@ static void clientHandleResp(SCliConn* conn) {
conn->push = NULL; conn->push = NULL;
} else { } else {
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
tTrace("client conn %p handle resp", conn); tTrace("%s client conn %p handle resp", pRpc->label, conn);
(pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL);
} else { } else {
tTrace("client conn(sync) %p handle resp", conn); tTrace("%s client conn(sync) %p handle resp", pRpc->label, conn);
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
} }
conn->ctnRdCnt += 1; conn->ctnRdCnt += 1;
// buf's mem alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf);
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
// user owns conn->persist = 1 // user owns conn->persist = 1
if (conn->push == NULL && conn->persist == 0) { if (conn->push == NULL && conn->persist == 0) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); if (pRpc->noPool == true) {
} else {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
}
} }
destroyCmsg(conn->data); destroyCmsg(conn->data);
conn->data = NULL; conn->data = NULL;
...@@ -184,7 +187,6 @@ static void clientHandleExcept(SCliConn* pConn) { ...@@ -184,7 +187,6 @@ static void clientHandleExcept(SCliConn* pConn) {
clientConnDestroy(pConn, true); clientConnDestroy(pConn, true);
return; return;
} }
tTrace("client conn %p start to destroy", pConn);
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
tmsg_t msgType = TDMT_MND_CONNECT; tmsg_t msgType = TDMT_MND_CONNECT;
...@@ -213,6 +215,7 @@ static void clientHandleExcept(SCliConn* pConn) { ...@@ -213,6 +215,7 @@ static void clientHandleExcept(SCliConn* pConn) {
} }
pConn->push = NULL; pConn->push = NULL;
} }
tTrace("%s client conn %p start to destroy", pCtx->pTransInst->label, pConn);
if (pConn->push == NULL) { if (pConn->push == NULL) {
destroyCmsg(pConn->data); destroyCmsg(pConn->data);
pConn->data = NULL; pConn->data = NULL;
...@@ -226,7 +229,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { ...@@ -226,7 +229,7 @@ static void clientTimeoutCb(uv_timer_t* handle) {
SCliThrdObj* pThrd = handle->data; SCliThrdObj* pThrd = handle->data;
SRpcInfo* pRpc = pThrd->pTransInst; SRpcInfo* pRpc = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout; int64_t currentTime = pThrd->nextTimeout;
tTrace("client conn timeout, try to remove expire conn from conn pool"); tTrace("%s, client conn timeout, try to remove expire conn from conn pool", pRpc->label);
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) { while (p != NULL) {
...@@ -307,21 +310,30 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { ...@@ -307,21 +310,30 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
} }
static bool clientReadComplete(SConnBuffer* data) { static bool clientReadComplete(SConnBuffer* data) {
STransMsgHead head; if (data->len >= sizeof(STransMsgHead)) {
int32_t headLen = sizeof(head); STransMsgHead head;
if (data->len >= headLen) { memcpy((char*)&head, data->buf, sizeof(head));
memcpy((char*)&head, data->buf, headLen); int32_t msgLen = (int32_t)htonl(head.msgLen);
int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); data->total = msgLen;
if (msgLen > data->len) { }
data->left = msgLen - data->len;
return false; if (data->len == data->cap && data->total == data->cap) {
} else if (msgLen == data->len) { return true;
data->left = 0; }
return true; return false;
} // if (data->len >= headLen) {
} else { // memcpy((char*)&head, data->buf, headLen);
return false; // int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
} // if (msgLen > data->len) {
// data->left = msgLen - data->len;
// return false;
// } else if (msgLen == data->len) {
// data->left = 0;
// return true;
// }
//} else {
// return false;
//}
} }
static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
...@@ -338,7 +350,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -338,7 +350,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (clientReadComplete(pBuf)) { if (clientReadComplete(pBuf)) {
uv_read_stop((uv_stream_t*)conn->stream); // uv_read_stop((uv_stream_t*)conn->stream);
tTrace("client conn %p read complete", conn); tTrace("client conn %p read complete", conn);
clientHandleResp(conn); clientHandleResp(conn);
} else { } else {
...@@ -346,6 +358,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -346,6 +358,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
} }
return; return;
} }
if (nread == UV_EOF) {
tError("client conn %p read error: %s", conn, uv_err_name(nread));
clientHandleExcept(conn);
}
assert(nread <= 0); assert(nread <= 0);
if (nread == 0) { if (nread == 0) {
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
...@@ -353,7 +369,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -353,7 +369,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
// read(2). // read(2).
return; return;
} }
if (nread < 0 || nread == UV_EOF) { if (nread < 0) {
tError("client conn %p read error: %s", conn, uv_err_name(nread)); tError("client conn %p read error: %s", conn, uv_err_name(nread));
clientHandleExcept(conn); clientHandleExcept(conn);
} }
...@@ -467,6 +483,7 @@ static void clientConnCb(uv_connect_t* req, int status) { ...@@ -467,6 +483,7 @@ static void clientConnCb(uv_connect_t* req, int status) {
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("client work thread %p start to quit", pThrd); tDebug("client work thread %p start to quit", pThrd);
destroyCmsg(pMsg); destroyCmsg(pMsg);
destroyConnPool(pThrd->pool);
// transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL); // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
uv_timer_stop(pThrd->timer); uv_timer_stop(pThrd->timer);
pThrd->quit = true; pThrd->quit = true;
...@@ -483,7 +500,10 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -483,7 +500,10 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = NULL; SCliConn* conn = NULL;
if (pMsg->msg.handle == NULL) { if (pMsg->msg.handle == NULL) {
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (pCtx->pTransInst->noPool == true) {
} else {
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
}
if (conn != NULL) { if (conn != NULL) {
tTrace("client conn %p get from conn pool", conn); tTrace("client conn %p get from conn pool", conn);
} }
...@@ -512,7 +532,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -512,7 +532,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn; conn->stream->data = conn;
uv_tcp_nodelay((uv_tcp_t*)conn->stream, 1);
int ret = uv_tcp_keepalive((uv_tcp_t*)conn->stream, 1, 1);
if (ret) {
tTrace("client conn %p failed to set keepalive, %s", conn, uv_err_name(ret));
}
// write req handle // write req handle
conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq = malloc(sizeof(uv_write_t));
conn->writeReq->data = conn; conn->writeReq->data = conn;
......
...@@ -205,6 +205,7 @@ int transInitBuffer(SConnBuffer* buf) { ...@@ -205,6 +205,7 @@ int transInitBuffer(SConnBuffer* buf) {
} }
int transClearBuffer(SConnBuffer* buf) { int transClearBuffer(SConnBuffer* buf) {
memset(buf, 0, sizeof(*buf)); memset(buf, 0, sizeof(*buf));
buf->total = -1;
return 0; return 0;
} }
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
...@@ -214,27 +215,25 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { ...@@ -214,27 +215,25 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
* |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user
* info--->| * info--->|
*/ */
static const int CAPACITY = 1024; static const int CAPACITY = sizeof(STransMsgHead);
SConnBuffer* p = connBuf; SConnBuffer* p = connBuf;
if (p->cap == 0) { if (p->cap == 0) {
p->buf = (char*)calloc(CAPACITY, sizeof(char)); p->buf = (char*)calloc(CAPACITY, sizeof(char));
p->len = 0; p->len = 0;
p->cap = CAPACITY; p->cap = CAPACITY;
p->left = -1; p->total = 0;
uvBuf->base = p->buf; uvBuf->base = p->buf;
uvBuf->len = CAPACITY; uvBuf->len = CAPACITY;
} else { } else {
if (p->len >= p->cap) { STransMsgHead head;
if (p->left == -1) { memcpy((char*)&head, p->buf, sizeof(head));
p->cap *= 2; int32_t msgLen = (int32_t)htonl(head.msgLen);
p->buf = realloc(p->buf, p->cap);
} else if (p->len + p->left > p->cap) { p->total = msgLen;
p->cap = p->len + p->left; p->cap = msgLen;
p->buf = realloc(p->buf, p->len + p->left); p->buf = realloc(p->buf, p->cap);
}
}
uvBuf->base = p->buf + p->len; uvBuf->base = p->buf + p->len;
uvBuf->len = p->cap - p->len; uvBuf->len = p->cap - p->len;
} }
......
...@@ -61,6 +61,7 @@ typedef struct SWorkThrdObj { ...@@ -61,6 +61,7 @@ typedef struct SWorkThrdObj {
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
// uv_async_t* workerAsync; // // uv_async_t* workerAsync; //
queue msg; queue msg;
queue conn;
pthread_mutex_t msgMtx; pthread_mutex_t msgMtx;
void* pTransInst; void* pTransInst;
} SWorkThrdObj; } SWorkThrdObj;
...@@ -103,7 +104,7 @@ static void uvStartSendResp(SSrvMsg* msg); ...@@ -103,7 +104,7 @@ static void uvStartSendResp(SSrvMsg* msg);
static void destroySmsg(SSrvMsg* smsg); static void destroySmsg(SSrvMsg* smsg);
// check whether already read complete packet // check whether already read complete packet
static bool readComplete(SConnBuffer* buf); static bool readComplete(SConnBuffer* buf);
static SSrvConn* createConn(); static SSrvConn* createConn(void* hThrd);
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
static void uvDestroyConn(uv_handle_t* handle); static void uvDestroyConn(uv_handle_t* handle);
...@@ -117,11 +118,6 @@ static bool addHandleToWorkloop(void* arg); ...@@ -117,11 +118,6 @@ static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg); static bool addHandleToAcceptloop(void* arg);
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
/*
* formate of data buffer:
* |<--------------------------data from socket------------------------------->|
* |<------STransMsgHead------->|<-------------------other data--------------->|
*/
SSrvConn* conn = handle->data; SSrvConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
...@@ -131,23 +127,27 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b ...@@ -131,23 +127,27 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
// //
static bool readComplete(SConnBuffer* data) { static bool readComplete(SConnBuffer* data) {
// TODO(yihao): handle pipeline later // TODO(yihao): handle pipeline later
STransMsgHead head; if (data->len == data->cap && data->total == data->cap) {
int32_t headLen = sizeof(head); return true;
if (data->len >= headLen) { }
memcpy((char*)&head, data->buf, headLen); return false;
int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen); // STransMsgHead head;
if (msgLen > data->len) { // int32_t headLen = sizeof(head);
data->left = msgLen - data->len; // if (data->len >= headLen) {
return false; // memcpy((char*)&head, data->buf, headLen);
} else if (msgLen == data->len) { // int32_t msgLen = (int32_t)htonl((uint32_t)head.msgLen);
return true; // if (msgLen > data->len) {
} else if (msgLen < data->len) { // data->left = msgLen - data->len;
return false; // return false;
// handle other packet later // } else if (msgLen == data->len) {
} // return true;
} else { // } else if (msgLen < data->len) {
return false; // return false;
} // // handle other packet later
// }
//} else {
// return false;
//}
} }
// static void uvDoProcess(SRecvInfo* pRecv) { // static void uvDoProcess(SRecvInfo* pRecv) {
...@@ -241,7 +241,7 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -241,7 +241,7 @@ static void uvHandleReq(SSrvConn* pConn) {
} }
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
assert(transIsReq(pHead->msgType)); // assert(transIsReq(pHead->msgType));
SRpcInfo* pRpc = (SRpcInfo*)p->shandle; SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
...@@ -266,9 +266,9 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -266,9 +266,9 @@ static void uvHandleReq(SSrvConn* pConn) {
transClearBuffer(&pConn->readBuf); transClearBuffer(&pConn->readBuf);
pConn->ref++; pConn->ref++;
tDebug("server conn %p %s received from %s:%d, local info: %s:%d", pConn, TMSG_INFO(rpcMsg.msgType), tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port)); ntohs(pConn->locaddr.sin_port), rpcMsg.contLen);
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth // auth
...@@ -290,6 +290,14 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -290,6 +290,14 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
} }
return; return;
} }
if (nread == UV_EOF) {
tError("server conn %p read error: %s", conn, uv_err_name(nread));
if (conn->ref > 1) {
conn->ref++; // ref > 1 signed that write is in progress
}
destroyConn(conn, true);
return;
}
if (nread == 0) { if (nread == 0) {
return; return;
} }
...@@ -302,8 +310,8 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -302,8 +310,8 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
} }
} }
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(sizeof(char));
buf->len = 2; buf->len = 2;
buf->base = calloc(1, sizeof(char) * buf->len);
} }
void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnTimeoutCb(uv_timer_t* handle) {
...@@ -386,6 +394,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { ...@@ -386,6 +394,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) {
static void uvStartSendResp(SSrvMsg* smsg) { static void uvStartSendResp(SSrvMsg* smsg) {
// impl // impl
SSrvConn* pConn = smsg->pConn; SSrvConn* pConn = smsg->pConn;
pConn->ref--; //
if (taosArrayGetSize(pConn->srvMsgs) > 0) { if (taosArrayGetSize(pConn->srvMsgs) > 0) {
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
...@@ -403,6 +412,16 @@ static void destroySmsg(SSrvMsg* smsg) { ...@@ -403,6 +412,16 @@ static void destroySmsg(SSrvMsg* smsg) {
transFreeMsg(smsg->msg.pCont); transFreeMsg(smsg->msg.pCont);
free(smsg); free(smsg);
} }
static void destroyAllConn(SWorkThrdObj* pThrd) {
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
queue* h = QUEUE_HEAD(&pThrd->conn);
QUEUE_REMOVE(h);
QUEUE_INIT(h);
SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
destroyConn(c, true);
}
}
void uvWorkerAsyncCb(uv_async_t* handle) { void uvWorkerAsyncCb(uv_async_t* handle) {
SAsyncItem* item = handle->data; SAsyncItem* item = handle->data;
SWorkThrdObj* pThrd = item->pThrd; SWorkThrdObj* pThrd = item->pThrd;
...@@ -424,8 +443,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -424,8 +443,9 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
continue; continue;
} }
if (msg->pConn == NULL) { if (msg->pConn == NULL) {
//
free(msg); free(msg);
destroyAllConn(pThrd);
uv_stop(pThrd->loop); uv_stop(pThrd->loop);
} else { } else {
uvStartSendResp(msg); uvStartSendResp(msg);
...@@ -439,6 +459,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -439,6 +459,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
} }
static void uvAcceptAsyncCb(uv_async_t* async) { static void uvAcceptAsyncCb(uv_async_t* async) {
SServerObj* srv = async->data; SServerObj* srv = async->data;
uv_close((uv_handle_t*)&srv->server, NULL);
uv_stop(srv->loop); uv_stop(srv->loop);
} }
...@@ -491,7 +512,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -491,7 +512,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_handle_type pending = uv_pipe_pending_type(pipe); uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP); assert(pending == UV_TCP);
SSrvConn* pConn = createConn(); SSrvConn* pConn = createConn(pThrd);
pConn->pTransInst = pThrd->pTransInst; pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/ /* init conn timer*/
...@@ -507,6 +528,9 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -507,6 +528,9 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_tcp_init(pThrd->loop, pConn->pTcp); uv_tcp_init(pThrd->loop, pConn->pTcp);
pConn->pTcp->data = pConn; pConn->pTcp->data = pConn;
uv_tcp_nodelay(pConn->pTcp, 1);
uv_tcp_keepalive(pConn->pTcp, 1, 1);
// init write request, just // init write request, just
pConn->pWriter = calloc(1, sizeof(uv_write_t)); pConn->pWriter = calloc(1, sizeof(uv_write_t));
pConn->pWriter->data = pConn; pConn->pWriter->data = pConn;
...@@ -560,6 +584,9 @@ static bool addHandleToWorkloop(void* arg) { ...@@ -560,6 +584,9 @@ static bool addHandleToWorkloop(void* arg) {
QUEUE_INIT(&pThrd->msg); QUEUE_INIT(&pThrd->msg);
pthread_mutex_init(&pThrd->msgMtx, NULL); pthread_mutex_init(&pThrd->msgMtx, NULL);
// conn set
QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true; return true;
...@@ -598,8 +625,13 @@ void* workerThread(void* arg) { ...@@ -598,8 +625,13 @@ void* workerThread(void* arg) {
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
} }
static SSrvConn* createConn() { static SSrvConn* createConn(void* hThrd) {
SWorkThrdObj* pThrd = hThrd;
SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn)); SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn));
QUEUE_INIT(&pConn->queue);
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); //
tTrace("conn %p created", pConn); tTrace("conn %p created", pConn);
++pConn->ref; ++pConn->ref;
...@@ -610,7 +642,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { ...@@ -610,7 +642,7 @@ static void destroyConn(SSrvConn* conn, bool clear) {
if (conn == NULL) { if (conn == NULL) {
return; return;
} }
tTrace("server conn %p try to destroy", conn); tTrace("server conn %p try to destroy, ref: %d", conn, conn->ref);
if (--conn->ref > 0) { if (--conn->ref > 0) {
return; return;
} }
...@@ -621,19 +653,18 @@ static void destroyConn(SSrvConn* conn, bool clear) { ...@@ -621,19 +653,18 @@ static void destroyConn(SSrvConn* conn, bool clear) {
destroySmsg(msg); destroySmsg(msg);
} }
taosArrayDestroy(conn->srvMsgs); taosArrayDestroy(conn->srvMsgs);
QUEUE_REMOVE(&conn->queue);
// destroySmsg(conn->pSrvMsg);
// conn->pSrvMsg = NULL;
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); uv_tcp_close_reset(conn->pTcp, uvDestroyConn);
// uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
} }
} }
static void uvDestroyConn(uv_handle_t* handle) { static void uvDestroyConn(uv_handle_t* handle) {
SSrvConn* conn = handle->data; SSrvConn* conn = handle->data;
tDebug("server conn %p destroy", conn); tDebug("server conn %p destroy", conn);
uv_timer_stop(conn->pTimer); uv_timer_stop(conn->pTimer);
free(conn->pTimer); // free(conn->pTimer);
// free(conn->pTcp); // free(conn->pTcp);
free(conn->pWriter); free(conn->pWriter);
free(conn); free(conn);
......
...@@ -22,13 +22,12 @@ ...@@ -22,13 +22,12 @@
* windows implementation * windows implementation
*/ */
#include <Windows.h>
#include <Mmsystem.h> #include <Mmsystem.h>
#include <stdio.h> #include <Windows.h>
#include <stdint.h> #include <stdint.h>
#include <stdio.h>
#pragma warning( disable : 4244 ) #pragma warning(disable : 4244)
typedef void (*win_timer_f)(int signo); typedef void (*win_timer_f)(int signo);
...@@ -40,8 +39,8 @@ void WINAPI taosWinOnTimer(UINT wTimerID, UINT msg, DWORD_PTR dwUser, DWORD_PTR ...@@ -40,8 +39,8 @@ void WINAPI taosWinOnTimer(UINT wTimerID, UINT msg, DWORD_PTR dwUser, DWORD_PTR
} }
static MMRESULT timerId; static MMRESULT timerId;
int taosInitTimer(win_timer_f callback, int ms) { int taosInitTimer(win_timer_f callback, int ms) {
DWORD_PTR param = *((int64_t *) & callback); DWORD_PTR param = *((int64_t *)&callback);
timerId = timeSetEvent(ms, 1, (LPTIMECALLBACK)taosWinOnTimer, param, TIME_PERIODIC); timerId = timeSetEvent(ms, 1, (LPTIMECALLBACK)taosWinOnTimer, param, TIME_PERIODIC);
if (timerId == 0) { if (timerId == 0) {
...@@ -50,9 +49,7 @@ int taosInitTimer(win_timer_f callback, int ms) { ...@@ -50,9 +49,7 @@ int taosInitTimer(win_timer_f callback, int ms) {
return 0; return 0;
} }
void taosUninitTimer() { void taosUninitTimer() { timeKillEvent(timerId); }
timeKillEvent(timerId);
}
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
...@@ -60,32 +57,32 @@ void taosUninitTimer() { ...@@ -60,32 +57,32 @@ void taosUninitTimer() {
* darwin implementation * darwin implementation
*/ */
#include <sys/syscall.h>
#include <sys/event.h> #include <sys/event.h>
#include <sys/syscall.h>
#include <unistd.h> #include <unistd.h>
static void (*timer_callback)(int); static void (*timer_callback)(int);
static int timer_ms = 0; static int timer_ms = 0;
static pthread_t timer_thread; static pthread_t timer_thread;
static int timer_kq = -1; static int timer_kq = -1;
static volatile int timer_stop = 0; static volatile int timer_stop = 0;
static void* timer_routine(void *arg) { static void* timer_routine(void* arg) {
(void)arg; (void)arg;
setThreadName("timer"); setThreadName("timer");
int r = 0; int r = 0;
struct timespec to = {0}; struct timespec to = {0};
to.tv_sec = timer_ms / 1000; to.tv_sec = timer_ms / 1000;
to.tv_nsec = (timer_ms % 1000) * 1000000; to.tv_nsec = (timer_ms % 1000) * 1000000;
while (!timer_stop) { while (!timer_stop) {
struct kevent64_s kev[10] = {0}; struct kevent64_s kev[10] = {0};
r = kevent64(timer_kq, NULL, 0, kev, sizeof(kev)/sizeof(kev[0]), 0, &to); r = kevent64(timer_kq, NULL, 0, kev, sizeof(kev) / sizeof(kev[0]), 0, &to);
if (r!=0) { if (r != 0) {
fprintf(stderr, "==%s[%d]%s()==kevent64 failed\n", basename(__FILE__), __LINE__, __func__); fprintf(stderr, "==%s[%d]%s()==kevent64 failed\n", basename(__FILE__), __LINE__, __func__);
abort(); abort();
} }
timer_callback(SIGALRM); // just mock timer_callback(SIGALRM); // just mock
} }
return NULL; return NULL;
...@@ -93,11 +90,13 @@ static void* timer_routine(void *arg) { ...@@ -93,11 +90,13 @@ static void* timer_routine(void *arg) {
int taosInitTimer(void (*callback)(int), int ms) { int taosInitTimer(void (*callback)(int), int ms) {
int r = 0; int r = 0;
timer_ms = ms; timer_kq = -1;
timer_stop = 0;
timer_ms = ms;
timer_callback = callback; timer_callback = callback;
timer_kq = kqueue(); timer_kq = kqueue();
if (timer_kq==-1) { if (timer_kq == -1) {
fprintf(stderr, "==%s[%d]%s()==failed to create timer kq\n", basename(__FILE__), __LINE__, __func__); fprintf(stderr, "==%s[%d]%s()==failed to create timer kq\n", basename(__FILE__), __LINE__, __func__);
// since no caller of this func checks the return value for the moment // since no caller of this func checks the return value for the moment
abort(); abort();
...@@ -144,10 +143,10 @@ static void taosDeleteTimer(void *tharg) { ...@@ -144,10 +143,10 @@ static void taosDeleteTimer(void *tharg) {
timer_delete(*pTimer); timer_delete(*pTimer);
} }
static pthread_t timerThread; static pthread_t timerThread;
static timer_t timerId; static timer_t timerId;
static volatile bool stopTimer = false; static volatile bool stopTimer = false;
static void *taosProcessAlarmSignal(void *tharg) { static void * taosProcessAlarmSignal(void *tharg) {
// Block the signal // Block the signal
sigset_t sigset; sigset_t sigset;
sigemptyset(&sigset); sigemptyset(&sigset);
...@@ -159,18 +158,18 @@ static void *taosProcessAlarmSignal(void *tharg) { ...@@ -159,18 +158,18 @@ static void *taosProcessAlarmSignal(void *tharg) {
setThreadName("tmr"); setThreadName("tmr");
#ifdef _ALPINE #ifdef _ALPINE
sevent.sigev_notify = SIGEV_THREAD; sevent.sigev_notify = SIGEV_THREAD;
sevent.sigev_value.sival_int = syscall(__NR_gettid); sevent.sigev_value.sival_int = syscall(__NR_gettid);
#else #else
sevent.sigev_notify = SIGEV_THREAD_ID; sevent.sigev_notify = SIGEV_THREAD_ID;
sevent._sigev_un._tid = syscall(__NR_gettid); sevent._sigev_un._tid = syscall(__NR_gettid);
#endif #endif
sevent.sigev_signo = SIGALRM; sevent.sigev_signo = SIGALRM;
if (timer_create(CLOCK_REALTIME, &sevent, &timerId) == -1) { if (timer_create(CLOCK_REALTIME, &sevent, &timerId) == -1) {
//printf("Failed to create timer"); // printf("Failed to create timer");
} }
pthread_cleanup_push(taosDeleteTimer, &timerId); pthread_cleanup_push(taosDeleteTimer, &timerId);
...@@ -182,36 +181,37 @@ static void *taosProcessAlarmSignal(void *tharg) { ...@@ -182,36 +181,37 @@ static void *taosProcessAlarmSignal(void *tharg) {
ts.it_interval.tv_nsec = 1000000 * MSECONDS_PER_TICK; ts.it_interval.tv_nsec = 1000000 * MSECONDS_PER_TICK;
if (timer_settime(timerId, 0, &ts, NULL)) { if (timer_settime(timerId, 0, &ts, NULL)) {
//printf("Failed to init timer"); // printf("Failed to init timer");
return NULL; return NULL;
} }
int signo; int signo;
while (!stopTimer) { while (!stopTimer) {
if (sigwait(&sigset, &signo)) { if (sigwait(&sigset, &signo)) {
//printf("Failed to wait signal: number %d", signo); // printf("Failed to wait signal: number %d", signo);
continue; continue;
} }
/* //printf("Signal handling: number %d ......\n", signo); */ /* //printf("Signal handling: number %d ......\n", signo); */
callback(0); callback(0);
} }
pthread_cleanup_pop(1); pthread_cleanup_pop(1);
return NULL; return NULL;
} }
int taosInitTimer(void (*callback)(int), int ms) { int taosInitTimer(void (*callback)(int), int ms) {
stopTimer = false;
pthread_attr_t tattr; pthread_attr_t tattr;
pthread_attr_init(&tattr); pthread_attr_init(&tattr);
int code = pthread_create(&timerThread, &tattr, taosProcessAlarmSignal, callback); int code = pthread_create(&timerThread, &tattr, taosProcessAlarmSignal, callback);
pthread_attr_destroy(&tattr); pthread_attr_destroy(&tattr);
if (code != 0) { if (code != 0) {
//printf("failed to create timer thread"); // printf("failed to create timer thread");
return -1; return -1;
} else { } else {
//printf("timer thread:0x%08" PRIx64 " is created", taosGetPthreadId(timerThread)); // printf("timer thread:0x%08" PRIx64 " is created", taosGetPthreadId(timerThread));
} }
return 0; return 0;
...@@ -220,7 +220,7 @@ int taosInitTimer(void (*callback)(int), int ms) { ...@@ -220,7 +220,7 @@ int taosInitTimer(void (*callback)(int), int ms) {
void taosUninitTimer() { void taosUninitTimer() {
stopTimer = true; stopTimer = true;
//printf("join timer thread:0x%08" PRIx64, taosGetPthreadId(timerThread)); // printf("join timer thread:0x%08" PRIx64, taosGetPthreadId(timerThread));
pthread_join(timerThread, NULL); pthread_join(timerThread, NULL);
} }
......
...@@ -13,19 +13,49 @@ ...@@ -13,19 +13,49 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "ttimer.h"
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "tsched.h" #include "tsched.h"
#include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "taoserror.h"
#define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", tmrDebugFlag, __VA_ARGS__); }} #define tmrFatal(...) \
#define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", tmrDebugFlag, __VA_ARGS__); }} { \
#define tmrWarn(...) { if (tmrDebugFlag & DEBUG_WARN) { taosPrintLog("TMR WARN ", tmrDebugFlag, __VA_ARGS__); }} if (tmrDebugFlag & DEBUG_FATAL) { \
#define tmrInfo(...) { if (tmrDebugFlag & DEBUG_INFO) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} taosPrintLog("TMR FATAL ", tmrDebugFlag, __VA_ARGS__); \
#define tmrDebug(...) { if (tmrDebugFlag & DEBUG_DEBUG) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} } \
#define tmrTrace(...) { if (tmrDebugFlag & DEBUG_TRACE) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }} }
#define tmrError(...) \
{ \
if (tmrDebugFlag & DEBUG_ERROR) { \
taosPrintLog("TMR ERROR ", tmrDebugFlag, __VA_ARGS__); \
} \
}
#define tmrWarn(...) \
{ \
if (tmrDebugFlag & DEBUG_WARN) { \
taosPrintLog("TMR WARN ", tmrDebugFlag, __VA_ARGS__); \
} \
}
#define tmrInfo(...) \
{ \
if (tmrDebugFlag & DEBUG_INFO) { \
taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \
} \
}
#define tmrDebug(...) \
{ \
if (tmrDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \
} \
}
#define tmrTrace(...) \
{ \
if (tmrDebugFlag & DEBUG_TRACE) { \
taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); \
} \
}
#define TIMER_STATE_WAITING 0 #define TIMER_STATE_WAITING 0
#define TIMER_STATE_EXPIRED 1 #define TIMER_STATE_EXPIRED 1
...@@ -81,7 +111,7 @@ typedef struct time_wheel_t { ...@@ -81,7 +111,7 @@ typedef struct time_wheel_t {
tmr_obj_t** slots; tmr_obj_t** slots;
} time_wheel_t; } time_wheel_t;
int32_t tmrDebugFlag = 131; int32_t tmrDebugFlag = 131;
uint32_t tsMaxTmrCtrl = 512; uint32_t tsMaxTmrCtrl = 512;
static pthread_once_t tmrModuleInit = PTHREAD_ONCE_INIT; static pthread_once_t tmrModuleInit = PTHREAD_ONCE_INIT;
...@@ -91,7 +121,7 @@ static tmr_ctrl_t* unusedTmrCtrl = NULL; ...@@ -91,7 +121,7 @@ static tmr_ctrl_t* unusedTmrCtrl = NULL;
static void* tmrQhandle; static void* tmrQhandle;
static int numOfTmrCtrl = 0; static int numOfTmrCtrl = 0;
int taosTmrThreads = 1; int taosTmrThreads = 1;
static uintptr_t nextTimerId = 0; static uintptr_t nextTimerId = 0;
static time_wheel_t wheels[] = { static time_wheel_t wheels[] = {
...@@ -119,7 +149,7 @@ static void timerDecRef(tmr_obj_t* timer) { ...@@ -119,7 +149,7 @@ static void timerDecRef(tmr_obj_t* timer) {
static void lockTimerList(timer_list_t* list) { static void lockTimerList(timer_list_t* list) {
int64_t tid = taosGetSelfPthreadId(); int64_t tid = taosGetSelfPthreadId();
int i = 0; int i = 0;
while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) { while (atomic_val_compare_exchange_64(&(list->lockedBy), 0, tid) != 0) {
if (++i % 1000 == 0) { if (++i % 1000 == 0) {
sched_yield(); sched_yield();
...@@ -276,11 +306,11 @@ static void addToExpired(tmr_obj_t* head) { ...@@ -276,11 +306,11 @@ static void addToExpired(tmr_obj_t* head) {
const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue."; const char* fmt = "%s adding expired timer[id=%" PRIuPTR ", fp=%p, param=%p] to queue.";
while (head != NULL) { while (head != NULL) {
uintptr_t id = head->id; uintptr_t id = head->id;
tmr_obj_t* next = head->next; tmr_obj_t* next = head->next;
tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param); tmrDebug(fmt, head->ctrl->label, id, head->fp, head->param);
SSchedMsg schedMsg; SSchedMsg schedMsg;
schedMsg.fp = NULL; schedMsg.fp = NULL;
schedMsg.tfp = processExpiredTimer; schedMsg.tfp = processExpiredTimer;
schedMsg.msg = NULL; schedMsg.msg = NULL;
...@@ -491,6 +521,8 @@ static void taosTmrModuleInit(void) { ...@@ -491,6 +521,8 @@ static void taosTmrModuleInit(void) {
return; return;
} }
memset(&timerMap, 0, sizeof(timerMap));
for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) { for (uint32_t i = 0; i < tsMaxTmrCtrl - 1; ++i) {
tmr_ctrl_t* ctrl = tmrCtrls + i; tmr_ctrl_t* ctrl = tmrCtrls + i;
ctrl->next = ctrl + 1; ctrl->next = ctrl + 1;
...@@ -570,7 +602,8 @@ void taosTmrCleanUp(void* handle) { ...@@ -570,7 +602,8 @@ void taosTmrCleanUp(void* handle) {
unusedTmrCtrl = ctrl; unusedTmrCtrl = ctrl;
pthread_mutex_unlock(&tmrCtrlMutex); pthread_mutex_unlock(&tmrCtrlMutex);
if (numOfTmrCtrl <=0) { tmrDebug("time controller's tmr ctrl size: %d", numOfTmrCtrl);
if (numOfTmrCtrl <= 0) {
taosUninitTimer(); taosUninitTimer();
taosCleanUpScheduler(tmrQhandle); taosCleanUpScheduler(tmrQhandle);
...@@ -585,7 +618,7 @@ void taosTmrCleanUp(void* handle) { ...@@ -585,7 +618,7 @@ void taosTmrCleanUp(void* handle) {
for (size_t i = 0; i < timerMap.size; i++) { for (size_t i = 0; i < timerMap.size; i++) {
timer_list_t* list = timerMap.slots + i; timer_list_t* list = timerMap.slots + i;
tmr_obj_t* t = list->timers; tmr_obj_t* t = list->timers;
while (t != NULL) { while (t != NULL) {
tmr_obj_t* next = t->mnext; tmr_obj_t* next = t->mnext;
free(t); free(t);
...@@ -595,6 +628,8 @@ void taosTmrCleanUp(void* handle) { ...@@ -595,6 +628,8 @@ void taosTmrCleanUp(void* handle) {
free(timerMap.slots); free(timerMap.slots);
free(tmrCtrls); free(tmrCtrls);
tmrDebug("timer module is cleaned up"); tmrCtrls = NULL;
unusedTmrCtrl = NULL;
tmrModuleInit = PTHREAD_ONCE_INIT; // to support restart
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册