提交 a9899065 编写于 作者: S Shengliang Guan

TD-1038 compile for windows client

上级 73e0e84b
...@@ -31,6 +31,7 @@ extern "C" { ...@@ -31,6 +31,7 @@ extern "C" {
// TAOS_OS_FUNC_SEMPHONE_PTHREAD // TAOS_OS_FUNC_SEMPHONE_PTHREAD
bool taosCheckPthreadValid(pthread_t thread); bool taosCheckPthreadValid(pthread_t thread);
int64_t taosGetPthreadId(); int64_t taosGetPthreadId();
void taosResetPthread(pthread_t *thread);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -160,6 +160,9 @@ char * strndup(const char *s, size_t n); ...@@ -160,6 +160,9 @@ char * strndup(const char *s, size_t n);
#define TCP_KEEPCNT 0x1234 #define TCP_KEEPCNT 0x1234
#define TCP_KEEPIDLE 0x1234 #define TCP_KEEPIDLE 0x1234
#define TCP_KEEPINTVL 0x1234 #define TCP_KEEPINTVL 0x1234
#define SHUT_RDWR SD_BOTH
#define SHUT_RD SD_RECEIVE
#define SHUT_WR SD_SEND
#define LOCK_EX 1 #define LOCK_EX 1
#define LOCK_NB 2 #define LOCK_NB 2
......
...@@ -20,5 +20,6 @@ ...@@ -20,5 +20,6 @@
bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; } bool taosCheckPthreadValid(pthread_t thread) { return thread != 0; }
int64_t taosGetPthreadId() { return (int64_t)pthread_self(); } int64_t taosGetPthreadId() { return (int64_t)pthread_self(); }
void taosResetPthread(pthread_t *thread) { *thread = 0; }
#endif #endif
\ No newline at end of file
...@@ -82,7 +82,7 @@ void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, ...@@ -82,7 +82,7 @@ void *rpcOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl,
pCache->cleanFp = cleanFp; pCache->cleanFp = cleanFp;
pCache->tmrCtrl = tmrCtrl; pCache->tmrCtrl = tmrCtrl;
pCache->lockedBy = calloc(sizeof(int64_t), maxSessions); pCache->lockedBy = calloc(sizeof(int64_t), maxSessions);
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer); taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
pthread_mutex_init(&pCache->mutex, NULL); pthread_mutex_init(&pCache->mutex, NULL);
...@@ -226,7 +226,7 @@ static void rpcCleanConnCache(void *handle, void *tmrId) { ...@@ -226,7 +226,7 @@ static void rpcCleanConnCache(void *handle, void *tmrId) {
} }
// tTrace("timer, total connections in cache:%d", pCache->total); // tTrace("timer, total connections in cache:%d", pCache->total);
taosTmrReset(rpcCleanConnCache, pCache->keepTimer * 2, pCache, pCache->tmrCtrl, &pCache->pTimer); taosTmrReset(rpcCleanConnCache, (int32_t)(pCache->keepTimer * 2), pCache, pCache->tmrCtrl, &pCache->pTimer);
} }
static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) { static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash, uint64_t time) {
......
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
#include "rpcHead.h" #include "rpcHead.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead))) #define rpcHeadFromCont(cont) ((SRpcHead *) ((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
...@@ -359,7 +359,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -359,7 +359,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
SRpcReqContext *pContext; SRpcReqContext *pContext;
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
pContext->ahandle = pMsg->ahandle; pContext->ahandle = pMsg->ahandle;
pContext->signature = pContext; pContext->signature = pContext;
pContext->pRpc = (SRpcInfo *)shandle; pContext->pRpc = (SRpcInfo *)shandle;
...@@ -492,7 +492,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { ...@@ -492,7 +492,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SRpcReqContext *pContext; SRpcReqContext *pContext;
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
memset(pRsp, 0, sizeof(SRpcMsg)); memset(pRsp, 0, sizeof(SRpcMsg));
...@@ -654,7 +654,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) { ...@@ -654,7 +654,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
pConn->pRpc = pRpc; pConn->pRpc = pRpc;
pConn->sid = sid; pConn->sid = sid;
pConn->tranId = (uint16_t)(random() & 0xFFFF); pConn->tranId = (uint16_t)(taosRand() & 0xFFFF);
pConn->ownId = htonl(pConn->sid); pConn->ownId = htonl(pConn->sid);
pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid() + (int64_t)pConn->tranId); pConn->linkUid = (uint32_t)((int64_t)pConn + (int64_t)getpid() + (int64_t)pConn->tranId);
pConn->spi = pRpc->spi; pConn->spi = pRpc->spi;
......
...@@ -81,7 +81,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -81,7 +81,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
pServerObj->fd = -1; pServerObj->fd = -1;
pServerObj->thread = 0; taosResetPthread(&pServerObj->thread);
pServerObj->ip = ip; pServerObj->ip = ip;
pServerObj->port = port; pServerObj->port = port;
tstrncpy(pServerObj->label, label, sizeof(pServerObj->label)); tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
...@@ -104,7 +104,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -104,7 +104,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj = pServerObj->pThreadObj; pThreadObj = pServerObj->pThreadObj;
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pThreadObj->pollFd = -1; pThreadObj->pollFd = -1;
pThreadObj->thread = 0; taosResetPthread(&pThreadObj->thread);
pThreadObj->processData = fp; pThreadObj->processData = fp;
tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label)); tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
pThreadObj->shandle = shandle; pThreadObj->shandle = shandle;
......
...@@ -144,7 +144,9 @@ void taosStopUdpConnection(void *handle) { ...@@ -144,7 +144,9 @@ void taosStopUdpConnection(void *handle) {
for (int i = 0; i < pSet->threads; ++i) { for (int i = 0; i < pSet->threads; ++i) {
pConn = pSet->udpConn + i; pConn = pSet->udpConn + i;
if (pConn->thread) pthread_join(pConn->thread, NULL); if (taosCheckPthreadValid(pConn->thread)) {
pthread_join(pConn->thread, NULL);
}
taosTFree(pConn->buffer); taosTFree(pConn->buffer);
// tTrace("%s UDP thread is closed, index:%d", pConn->label, i); // tTrace("%s UDP thread is closed, index:%d", pConn->label, i);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册