diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 81d43daf133ab0613b3cc56ec68d82e87bc0326c..24a4e99970692b202ab36fd1d1a83a45a09bcaa4 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -131,8 +131,7 @@ typedef struct TFileCacheKey { char* colName; int32_t nColName; } ICacheKey; - -int indexFlushCacheToTFile(SIndex* sIdx, void*); +int indexFlushCacheToTFile(SIndex* sIdx, void*, bool quit); int64_t indexAddRef(void* p); int32_t indexRemoveRef(int64_t ref); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 3d905303d1068adfbde12d587773af44edc9b13d..ba3aea969f6c8a5214a3999a7d4ca2c68ec503ac 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -150,6 +150,7 @@ void indexClose(SIndex* sIdx) { indexCacheForceToMerge((void*)(*pCache)); indexInfo("%s wait to merge", (*pCache)->colName); indexWait((void*)(sIdx)); + indexInfo("%s finish to wait", (*pCache)->colName); iter = taosHashIterate(sIdx->colObj, iter); indexCacheUnRef(*pCache); } @@ -454,7 +455,7 @@ static void indexDestroyFinalResult(SArray* result) { taosArrayDestroy(result); } -int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { +int indexFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { if (sIdx == NULL) { return -1; } @@ -464,7 +465,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { IndexCache* pCache = (IndexCache*)cache; - while (sIdx->quit && atomic_load_32(&pCache->merging) == 1) { + while (quit && atomic_load_32(&pCache->merging) == 1) { } TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); if (pReader == NULL) { @@ -476,11 +477,11 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { indexError("%p immtable is empty, ignore merge opera", pCache); indexCacheDestroyImm(pCache); tfileReaderUnRef(pReader); - if (sIdx->quit) { + atomic_store_32(&pCache->merging, 0); + if (quit) { indexPost(sIdx); } indexReleaseRef(sIdx->refId); - atomic_store_32(&pCache->merging, 0); return 0; } @@ -539,10 +540,10 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { } else { indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); } - if (sIdx->quit) { + atomic_store_32(&pCache->merging, 0); + if (quit) { indexPost(sIdx); } - atomic_store_32(&pCache->merging, 0); indexReleaseRef(sIdx->refId); return ret; diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 586a3ae573362355ac40021e5c933f3189ea5a8e..4e7be245ef7fb0a4c383a0abf0b242ebbb46522c 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -728,9 +728,9 @@ static void doMergeWork(SSchedMsg* msg) { IndexCache* pCache = msg->ahandle; SIndex* sidx = (SIndex*)pCache->index; - sidx->quit = msg->thandle ? true : false; + int quit = msg->thandle ? true : false; taosMemoryFree(msg->thandle); - indexFlushCacheToTFile(sidx, pCache); + indexFlushCacheToTFile(sidx, pCache, quit); } static bool indexCacheIteratorNext(Iterate* itera) { SSkipListIterator* iter = itera->iter; diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index cd5a5d9b0f192883f67e9dfecdbcb3854669fdf3..48ce8839c459bb2c523d710f1804346f2bede33a 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -51,6 +51,7 @@ class JsonEnv : public ::testing::Test { tIndexJsonClose(index); indexOptsDestroy(opts); printf("destory\n"); + taosMsleep(1000); } SIndexJsonOpts* opts; SIndexJson* index; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a8093f46a25499cb9d073d9a7cb0aad2cdf90c04..e680e3004283684b0d95c1fd0124f33e99d59d3b 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -351,6 +351,23 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b); */ void transThreadOnce(); +// ref mgt +// handle +typedef struct SExHandle { + void* handle; + int64_t refId; + void* pThrd; +} SExHandle; + +void transInitEnv(); +int32_t transOpenExHandleMgt(int size); +void transCloseExHandleMgt(int32_t mgt); +int64_t transAddExHandle(int32_t mgt, void* p); +int32_t transRemoveExHandle(int32_t mgt, int64_t refId); +SExHandle* transAcquireExHandle(int32_t mgt, int64_t refId); +int32_t transReleaseExHandle(int32_t mgt, int64_t refId); +void transDestoryExHandle(void* handle); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 8aeae1b5ade26a1a320dae37cbfe67f676f66eeb..c328629c4b1ba18564918ede4b5b9e4ecc62ad83 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -22,13 +22,13 @@ #include "lz4.h" #include "os.h" #include "taoserror.h" +#include "tglobal.h" #include "thash.h" -#include "tref.h" #include "tmsg.h" #include "transLog.h" +#include "tref.h" #include "trpc.h" #include "tutil.h" -#include "tglobal.h" #ifdef __cplusplus extern "C" { @@ -55,9 +55,9 @@ typedef struct { bool (*retry)(int32_t code); int index; - int32_t refCount; void* parent; void* tcphandle; // returned handle from TCP initialization + int32_t refMgt; TdThreadMutex mutex; } SRpcInfo; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 84b0156e3697996a81f7743940b04d73c20d0a05..6f6f335ce1956721aba98ad556148c7867c2eded 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -36,6 +36,8 @@ static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { return 0; } void* rpcOpen(const SRpcInit* pInit) { + transInitEnv(); + SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) { return NULL; @@ -74,12 +76,15 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->user) { memcpy(pRpc->user, pInit->user, strlen(pInit->user)); } + // pRpc->refMgt = transOpenExHandleMgt(50000); return pRpc; } void rpcClose(void* arg) { SRpcInfo* pRpc = (SRpcInfo*)arg; (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); + transCloseExHandleMgt(pRpc->refMgt); taosMemoryFree(pRpc); + return; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 333ec44fe40246254ef03e6646e4e7e7a932d93a..d962ceb1424c5f61692d03a7c6d741038843f2b8 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -470,4 +470,41 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { } return true; } + +void transInitEnv() { + uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); + // uvOpenExHandleMgt(10000); +} +int32_t transOpenExHandleMgt(int size) { + // added into once later + return taosOpenRef(size, transDestoryExHandle); +} +void transCloseExHandleMgt(int32_t mgt) { + // close ref + taosCloseRef(mgt); +} +int64_t transAddExHandle(int32_t mgt, void* p) { + // acquire extern handle + return taosAddRef(mgt, p); +} +int32_t transRemoveExHandle(int32_t mgt, int64_t refId) { + // acquire extern handle + return taosRemoveRef(mgt, refId); +} + +SExHandle* transAcquireExHandle(int32_t mgt, int64_t refId) { + // acquire extern handle + return (SExHandle*)taosAcquireRef(mgt, refId); +} + +int32_t transReleaseExHandle(int32_t mgt, int64_t refId) { + // release extern handle + return taosReleaseRef(mgt, refId); +} +void transDestoryExHandle(void* handle) { + if (handle == NULL) { + return; + } + taosMemoryFree(handle); +} #endif diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 52b36433bb45ace6b0fa4224fb80b65e0e5e2627..479cee63af7ba0b6ce3dfd2a404670ac89a518e4 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -19,8 +19,9 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; -static char* notify = "a"; -static int tranSSvrInst = 0; +static char* notify = "a"; +static int tranSSvrInst = 0; +static int32_t refMgt = 0; typedef struct { int notifyCount; // @@ -99,13 +100,6 @@ typedef struct SServerObj { bool inited; } SServerObj; -// handle -typedef struct SExHandle { - void* handle; - int64_t refId; - SWorkThrdObj* pThrd; -} SExHandle; - static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); @@ -150,14 +144,14 @@ static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrdObj* thrd) = {uvHandleR static int32_t exHandlesMgt; -void uvInitEnv(); -void uvOpenExHandleMgt(int size); -void uvCloseExHandleMgt(); -int64_t uvAddExHandle(void* p); -int32_t uvRemoveExHandle(int64_t refId); -int32_t uvReleaseExHandle(int64_t refId); -void uvDestoryExHandle(void* handle); -SExHandle* uvAcquireExHandle(int64_t refId); +// void uvInitEnv(); +// void uvOpenExHandleMgt(int size); +// void uvCloseExHandleMgt(); +// int64_t uvAddExHandle(void* p); +// int32_t uvRemoveExHandle(int64_t refId); +// int32_t uvReleaseExHandle(int64_t refId); +// void uvDestoryExHandle(void* handle); +// SExHandle* uvAcquireExHandle(int64_t refId); static void uvDestroyConn(uv_handle_t* handle); @@ -210,7 +204,7 @@ static bool addHandleToAcceptloop(void* arg); do { \ if (refId > 0) { \ tTrace("server handle step1"); \ - SExHandle* exh2 = uvAcquireExHandle(refId); \ + SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \ if (exh2 == NULL || refId != exh2->refId) { \ tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ exh2 ? exh2->refId : 0, refId); \ @@ -218,7 +212,7 @@ static bool addHandleToAcceptloop(void* arg); } \ } else if (refId == 0) { \ tTrace("server handle step2"); \ - SExHandle* exh2 = uvAcquireExHandle(refId); \ + SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \ if (exh2 == NULL || refId != exh2->refId) { \ tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ refId, exh2 ? exh2->refId : 0); \ @@ -300,14 +294,14 @@ static void uvHandleReq(SSvrConn* pConn) { // 2. once send out data, cli conn released to conn pool immediately // 3. not mixed with persist - transMsg.info.handle = (void*)uvAcquireExHandle(pConn->refId); + transMsg.info.handle = (void*)transAcquireExHandle(refMgt, pConn->refId); transMsg.info.refId = pConn->refId; tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId); assert(transMsg.info.handle != NULL); if (pHead->noResp == 1) { transMsg.info.refId = -1; } - uvReleaseExHandle(pConn->refId); + transReleaseExHandle(refMgt, pConn->refId); STrans* pTransInst = pConn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); @@ -535,15 +529,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SExHandle* exh1 = transMsg.info.handle; int64_t refId = transMsg.info.refId; - SExHandle* exh2 = uvAcquireExHandle(refId); + SExHandle* exh2 = transAcquireExHandle(refMgt, refId); if (exh2 == NULL || exh1 != exh2) { tTrace("server handle except msg %p, ignore it", exh1); - uvReleaseExHandle(refId); + transReleaseExHandle(refMgt, refId); destroySmsg(msg); continue; } msg->pConn = exh1->handle; - uvReleaseExHandle(refId); + transReleaseExHandle(refMgt, refId); (*transAsyncHandle[msg->type])(msg, pThrd); } } @@ -785,8 +779,8 @@ static SSvrConn* createConn(void* hThrd) { SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); exh->handle = pConn; exh->pThrd = pThrd; - exh->refId = uvAddExHandle(exh); - uvAcquireExHandle(exh->refId); + exh->refId = transAddExHandle(refMgt, exh); + transAcquireExHandle(refMgt, exh->refId); pConn->refId = exh->refId; transRefSrvHandle(pConn); @@ -815,14 +809,14 @@ static void destroyConnRegArg(SSvrConn* conn) { } } static int reallocConnRefHandle(SSvrConn* conn) { - uvReleaseExHandle(conn->refId); - uvRemoveExHandle(conn->refId); + transReleaseExHandle(refMgt, conn->refId); + transRemoveExHandle(refMgt, conn->refId); // avoid app continue to send msg on invalid handle SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); exh->handle = conn; exh->pThrd = conn->hostThrd; - exh->refId = uvAddExHandle(exh); - uvAcquireExHandle(exh->refId); + exh->refId = transAddExHandle(refMgt, exh); + transAcquireExHandle(refMgt, exh->refId); conn->refId = exh->refId; return 0; @@ -834,8 +828,8 @@ static void uvDestroyConn(uv_handle_t* handle) { } SWorkThrdObj* thrd = conn->hostThrd; - uvReleaseExHandle(conn->refId); - uvRemoveExHandle(conn->refId); + transReleaseExHandle(refMgt, conn->refId); + transRemoveExHandle(refMgt, conn->refId); tDebug("server conn %p destroy", conn); // uv_timer_stop(&conn->pTimer); @@ -883,8 +877,11 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->port = port; uv_loop_init(srv->loop); - taosThreadOnce(&transModuleInit, uvInitEnv); + // taosThreadOnce(&transModuleInit, uvInitEnv); tranSSvrInst++; + if (tranSSvrInst == 1) { + refMgt = transOpenExHandleMgt(50000); + } assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); #ifdef WINDOWS @@ -944,42 +941,42 @@ End: return NULL; } -void uvInitEnv() { - uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); - uvOpenExHandleMgt(10000); -} -void uvOpenExHandleMgt(int size) { - // added into once later - exHandlesMgt = taosOpenRef(size, uvDestoryExHandle); -} -void uvCloseExHandleMgt() { - // close ref - taosCloseRef(exHandlesMgt); -} -int64_t uvAddExHandle(void* p) { - // acquire extern handle - return taosAddRef(exHandlesMgt, p); -} -int32_t uvRemoveExHandle(int64_t refId) { - // acquire extern handle - return taosRemoveRef(exHandlesMgt, refId); -} - -SExHandle* uvAcquireExHandle(int64_t refId) { - // acquire extern handle - return (SExHandle*)taosAcquireRef(exHandlesMgt, refId); -} - -int32_t uvReleaseExHandle(int64_t refId) { - // release extern handle - return taosReleaseRef(exHandlesMgt, refId); -} -void uvDestoryExHandle(void* handle) { - if (handle == NULL) { - return; - } - taosMemoryFree(handle); -} +// void uvInitEnv() { +// uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); +// uvOpenExHandleMgt(10000); +//} +// void uvOpenExHandleMgt(int size) { +// // added into once later +// exHandlesMgt = taosOpenRef(size, uvDestoryExHandle); +//} +// void uvCloseExHandleMgt() { +// // close ref +// taosCloseRef(exHandlesMgt); +//} +// int64_t uvAddExHandle(void* p) { +// // acquire extern handle +// return taosAddRef(exHandlesMgt, p); +//} +// int32_t uvRemoveExHandle(int64_t refId) { +// // acquire extern handle +// return taosRemoveRef(exHandlesMgt, refId); +//} +// +// SExHandle* uvAcquireExHandle(int64_t refId) { +// // acquire extern handle +// return (SExHandle*)taosAcquireRef(exHandlesMgt, refId); +//} +// +// int32_t uvReleaseExHandle(int64_t refId) { +// // release extern handle +// return taosReleaseRef(exHandlesMgt, refId); +//} +// void uvDestoryExHandle(void* handle) { +// if (handle == NULL) { +// return; +// } +// taosMemoryFree(handle); +//} void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) { thrd->quit = true; @@ -1077,9 +1074,9 @@ void transCloseServer(void* arg) { tranSSvrInst--; if (tranSSvrInst == 0) { - TdThreadOnce tmpInit = PTHREAD_ONCE_INIT; - memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce)); - uvCloseExHandleMgt(); + // TdThreadOnce tmpInit = PTHREAD_ONCE_INIT; + // memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce)); + transCloseExHandleMgt(refMgt); } } @@ -1119,11 +1116,11 @@ void transReleaseSrvHandle(void* handle) { tTrace("server conn %p start to release", exh->handle); transSendAsync(pThrd->asyncPool, &m->q); - uvReleaseExHandle(refId); + transReleaseExHandle(refMgt, refId); return; _return1: tTrace("server handle %p failed to send to release handle", exh); - uvReleaseExHandle(refId); + transReleaseExHandle(refMgt, refId); return; _return2: tTrace("server handle %p failed to send to release handle", exh); @@ -1146,12 +1143,12 @@ void transSendResponse(const STransMsg* msg) { m->type = Normal; tDebug("server conn %p start to send resp (1/2)", exh->handle); transSendAsync(pThrd->asyncPool, &m->q); - uvReleaseExHandle(refId); + transReleaseExHandle(refMgt, refId); return; _return1: tTrace("server handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - uvReleaseExHandle(refId); + transReleaseExHandle(refMgt, refId); return; _return2: tTrace("server handle %p failed to send resp", exh); @@ -1174,13 +1171,13 @@ void transRegisterMsg(const STransMsg* msg) { m->type = Register; tTrace("server conn %p start to register brokenlink callback", exh->handle); transSendAsync(pThrd->asyncPool, &m->q); - uvReleaseExHandle(refId); + transReleaseExHandle(refMgt, refId); return; _return1: tTrace("server handle %p failed to send to register brokenlink", exh); rpcFreeCont(msg->pCont); - uvReleaseExHandle(refId); + transReleaseExHandle(refMgt, refId); return; _return2: tTrace("server handle %p failed to send to register brokenlink", exh);