提交 e7fe3577 编写于 作者: dengyihao's avatar dengyihao

enh: refactor index/trans

上级 55c7d1c1
......@@ -131,8 +131,7 @@ typedef struct TFileCacheKey {
char* colName;
int32_t nColName;
} ICacheKey;
int indexFlushCacheToTFile(SIndex* sIdx, void*);
int indexFlushCacheToTFile(SIndex* sIdx, void*, bool quit);
int64_t indexAddRef(void* p);
int32_t indexRemoveRef(int64_t ref);
......
......@@ -150,6 +150,7 @@ void indexClose(SIndex* sIdx) {
indexCacheForceToMerge((void*)(*pCache));
indexInfo("%s wait to merge", (*pCache)->colName);
indexWait((void*)(sIdx));
indexInfo("%s finish to wait", (*pCache)->colName);
iter = taosHashIterate(sIdx->colObj, iter);
indexCacheUnRef(*pCache);
}
......@@ -454,7 +455,7 @@ static void indexDestroyFinalResult(SArray* result) {
taosArrayDestroy(result);
}
int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
int indexFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
if (sIdx == NULL) {
return -1;
}
......@@ -464,7 +465,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
IndexCache* pCache = (IndexCache*)cache;
while (sIdx->quit && atomic_load_32(&pCache->merging) == 1) {
while (quit && atomic_load_32(&pCache->merging) == 1) {
}
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
if (pReader == NULL) {
......@@ -476,11 +477,11 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
indexError("%p immtable is empty, ignore merge opera", pCache);
indexCacheDestroyImm(pCache);
tfileReaderUnRef(pReader);
if (sIdx->quit) {
atomic_store_32(&pCache->merging, 0);
if (quit) {
indexPost(sIdx);
}
indexReleaseRef(sIdx->refId);
atomic_store_32(&pCache->merging, 0);
return 0;
}
......@@ -539,10 +540,10 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
} else {
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
}
if (sIdx->quit) {
atomic_store_32(&pCache->merging, 0);
if (quit) {
indexPost(sIdx);
}
atomic_store_32(&pCache->merging, 0);
indexReleaseRef(sIdx->refId);
return ret;
......
......@@ -728,9 +728,9 @@ static void doMergeWork(SSchedMsg* msg) {
IndexCache* pCache = msg->ahandle;
SIndex* sidx = (SIndex*)pCache->index;
sidx->quit = msg->thandle ? true : false;
int quit = msg->thandle ? true : false;
taosMemoryFree(msg->thandle);
indexFlushCacheToTFile(sidx, pCache);
indexFlushCacheToTFile(sidx, pCache, quit);
}
static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter;
......
......@@ -51,6 +51,7 @@ class JsonEnv : public ::testing::Test {
tIndexJsonClose(index);
indexOptsDestroy(opts);
printf("destory\n");
taosMsleep(1000);
}
SIndexJsonOpts* opts;
SIndexJson* index;
......
......@@ -351,6 +351,23 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
*/
void transThreadOnce();
// ref mgt
// handle
typedef struct SExHandle {
void* handle;
int64_t refId;
void* pThrd;
} SExHandle;
void transInitEnv();
int32_t transOpenExHandleMgt(int size);
void transCloseExHandleMgt(int32_t mgt);
int64_t transAddExHandle(int32_t mgt, void* p);
int32_t transRemoveExHandle(int32_t mgt, int64_t refId);
SExHandle* transAcquireExHandle(int32_t mgt, int64_t refId);
int32_t transReleaseExHandle(int32_t mgt, int64_t refId);
void transDestoryExHandle(void* handle);
#ifdef __cplusplus
}
#endif
......
......@@ -22,13 +22,13 @@
#include "lz4.h"
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tref.h"
#include "tmsg.h"
#include "transLog.h"
#include "tref.h"
#include "trpc.h"
#include "tutil.h"
#include "tglobal.h"
#ifdef __cplusplus
extern "C" {
......@@ -55,9 +55,9 @@ typedef struct {
bool (*retry)(int32_t code);
int index;
int32_t refCount;
void* parent;
void* tcphandle; // returned handle from TCP initialization
int32_t refMgt;
TdThreadMutex mutex;
} SRpcInfo;
......
......@@ -36,6 +36,8 @@ static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) {
return 0;
}
void* rpcOpen(const SRpcInit* pInit) {
transInitEnv();
SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) {
return NULL;
......@@ -74,12 +76,15 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user));
}
// pRpc->refMgt = transOpenExHandleMgt(50000);
return pRpc;
}
void rpcClose(void* arg) {
SRpcInfo* pRpc = (SRpcInfo*)arg;
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
transCloseExHandleMgt(pRpc->refMgt);
taosMemoryFree(pRpc);
return;
}
......
......@@ -470,4 +470,41 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
}
return true;
}
void transInitEnv() {
uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
// uvOpenExHandleMgt(10000);
}
int32_t transOpenExHandleMgt(int size) {
// added into once later
return taosOpenRef(size, transDestoryExHandle);
}
void transCloseExHandleMgt(int32_t mgt) {
// close ref
taosCloseRef(mgt);
}
int64_t transAddExHandle(int32_t mgt, void* p) {
// acquire extern handle
return taosAddRef(mgt, p);
}
int32_t transRemoveExHandle(int32_t mgt, int64_t refId) {
// acquire extern handle
return taosRemoveRef(mgt, refId);
}
SExHandle* transAcquireExHandle(int32_t mgt, int64_t refId) {
// acquire extern handle
return (SExHandle*)taosAcquireRef(mgt, refId);
}
int32_t transReleaseExHandle(int32_t mgt, int64_t refId) {
// release extern handle
return taosReleaseRef(mgt, refId);
}
void transDestoryExHandle(void* handle) {
if (handle == NULL) {
return;
}
taosMemoryFree(handle);
}
#endif
......@@ -19,8 +19,9 @@
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static char* notify = "a";
static int tranSSvrInst = 0;
static char* notify = "a";
static int tranSSvrInst = 0;
static int32_t refMgt = 0;
typedef struct {
int notifyCount; //
......@@ -99,13 +100,6 @@ typedef struct SServerObj {
bool inited;
} SServerObj;
// handle
typedef struct SExHandle {
void* handle;
int64_t refId;
SWorkThrdObj* pThrd;
} SExHandle;
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
......@@ -150,14 +144,14 @@ static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrdObj* thrd) = {uvHandleR
static int32_t exHandlesMgt;
void uvInitEnv();
void uvOpenExHandleMgt(int size);
void uvCloseExHandleMgt();
int64_t uvAddExHandle(void* p);
int32_t uvRemoveExHandle(int64_t refId);
int32_t uvReleaseExHandle(int64_t refId);
void uvDestoryExHandle(void* handle);
SExHandle* uvAcquireExHandle(int64_t refId);
// void uvInitEnv();
// void uvOpenExHandleMgt(int size);
// void uvCloseExHandleMgt();
// int64_t uvAddExHandle(void* p);
// int32_t uvRemoveExHandle(int64_t refId);
// int32_t uvReleaseExHandle(int64_t refId);
// void uvDestoryExHandle(void* handle);
// SExHandle* uvAcquireExHandle(int64_t refId);
static void uvDestroyConn(uv_handle_t* handle);
......@@ -210,7 +204,7 @@ static bool addHandleToAcceptloop(void* arg);
do { \
if (refId > 0) { \
tTrace("server handle step1"); \
SExHandle* exh2 = uvAcquireExHandle(refId); \
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
if (exh2 == NULL || refId != exh2->refId) { \
tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, refId); \
......@@ -218,7 +212,7 @@ static bool addHandleToAcceptloop(void* arg);
} \
} else if (refId == 0) { \
tTrace("server handle step2"); \
SExHandle* exh2 = uvAcquireExHandle(refId); \
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); \
if (exh2 == NULL || refId != exh2->refId) { \
tTrace("server handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
refId, exh2 ? exh2->refId : 0); \
......@@ -300,14 +294,14 @@ static void uvHandleReq(SSvrConn* pConn) {
// 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist
transMsg.info.handle = (void*)uvAcquireExHandle(pConn->refId);
transMsg.info.handle = (void*)transAcquireExHandle(refMgt, pConn->refId);
transMsg.info.refId = pConn->refId;
tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId);
assert(transMsg.info.handle != NULL);
if (pHead->noResp == 1) {
transMsg.info.refId = -1;
}
uvReleaseExHandle(pConn->refId);
transReleaseExHandle(refMgt, pConn->refId);
STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
......@@ -535,15 +529,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId;
SExHandle* exh2 = uvAcquireExHandle(refId);
SExHandle* exh2 = transAcquireExHandle(refMgt, refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("server handle except msg %p, ignore it", exh1);
uvReleaseExHandle(refId);
transReleaseExHandle(refMgt, refId);
destroySmsg(msg);
continue;
}
msg->pConn = exh1->handle;
uvReleaseExHandle(refId);
transReleaseExHandle(refMgt, refId);
(*transAsyncHandle[msg->type])(msg, pThrd);
}
}
......@@ -785,8 +779,8 @@ static SSvrConn* createConn(void* hThrd) {
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = pConn;
exh->pThrd = pThrd;
exh->refId = uvAddExHandle(exh);
uvAcquireExHandle(exh->refId);
exh->refId = transAddExHandle(refMgt, exh);
transAcquireExHandle(refMgt, exh->refId);
pConn->refId = exh->refId;
transRefSrvHandle(pConn);
......@@ -815,14 +809,14 @@ static void destroyConnRegArg(SSvrConn* conn) {
}
}
static int reallocConnRefHandle(SSvrConn* conn) {
uvReleaseExHandle(conn->refId);
uvRemoveExHandle(conn->refId);
transReleaseExHandle(refMgt, conn->refId);
transRemoveExHandle(refMgt, conn->refId);
// avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = conn;
exh->pThrd = conn->hostThrd;
exh->refId = uvAddExHandle(exh);
uvAcquireExHandle(exh->refId);
exh->refId = transAddExHandle(refMgt, exh);
transAcquireExHandle(refMgt, exh->refId);
conn->refId = exh->refId;
return 0;
......@@ -834,8 +828,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
}
SWorkThrdObj* thrd = conn->hostThrd;
uvReleaseExHandle(conn->refId);
uvRemoveExHandle(conn->refId);
transReleaseExHandle(refMgt, conn->refId);
transRemoveExHandle(refMgt, conn->refId);
tDebug("server conn %p destroy", conn);
// uv_timer_stop(&conn->pTimer);
......@@ -883,8 +877,11 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->port = port;
uv_loop_init(srv->loop);
taosThreadOnce(&transModuleInit, uvInitEnv);
// taosThreadOnce(&transModuleInit, uvInitEnv);
tranSSvrInst++;
if (tranSSvrInst == 1) {
refMgt = transOpenExHandleMgt(50000);
}
assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
#ifdef WINDOWS
......@@ -944,42 +941,42 @@ End:
return NULL;
}
void uvInitEnv() {
uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
uvOpenExHandleMgt(10000);
}
void uvOpenExHandleMgt(int size) {
// added into once later
exHandlesMgt = taosOpenRef(size, uvDestoryExHandle);
}
void uvCloseExHandleMgt() {
// close ref
taosCloseRef(exHandlesMgt);
}
int64_t uvAddExHandle(void* p) {
// acquire extern handle
return taosAddRef(exHandlesMgt, p);
}
int32_t uvRemoveExHandle(int64_t refId) {
// acquire extern handle
return taosRemoveRef(exHandlesMgt, refId);
}
SExHandle* uvAcquireExHandle(int64_t refId) {
// acquire extern handle
return (SExHandle*)taosAcquireRef(exHandlesMgt, refId);
}
int32_t uvReleaseExHandle(int64_t refId) {
// release extern handle
return taosReleaseRef(exHandlesMgt, refId);
}
void uvDestoryExHandle(void* handle) {
if (handle == NULL) {
return;
}
taosMemoryFree(handle);
}
// void uvInitEnv() {
// uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
// uvOpenExHandleMgt(10000);
//}
// void uvOpenExHandleMgt(int size) {
// // added into once later
// exHandlesMgt = taosOpenRef(size, uvDestoryExHandle);
//}
// void uvCloseExHandleMgt() {
// // close ref
// taosCloseRef(exHandlesMgt);
//}
// int64_t uvAddExHandle(void* p) {
// // acquire extern handle
// return taosAddRef(exHandlesMgt, p);
//}
// int32_t uvRemoveExHandle(int64_t refId) {
// // acquire extern handle
// return taosRemoveRef(exHandlesMgt, refId);
//}
//
// SExHandle* uvAcquireExHandle(int64_t refId) {
// // acquire extern handle
// return (SExHandle*)taosAcquireRef(exHandlesMgt, refId);
//}
//
// int32_t uvReleaseExHandle(int64_t refId) {
// // release extern handle
// return taosReleaseRef(exHandlesMgt, refId);
//}
// void uvDestoryExHandle(void* handle) {
// if (handle == NULL) {
// return;
// }
// taosMemoryFree(handle);
//}
void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) {
thrd->quit = true;
......@@ -1077,9 +1074,9 @@ void transCloseServer(void* arg) {
tranSSvrInst--;
if (tranSSvrInst == 0) {
TdThreadOnce tmpInit = PTHREAD_ONCE_INIT;
memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce));
uvCloseExHandleMgt();
// TdThreadOnce tmpInit = PTHREAD_ONCE_INIT;
// memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce));
transCloseExHandleMgt(refMgt);
}
}
......@@ -1119,11 +1116,11 @@ void transReleaseSrvHandle(void* handle) {
tTrace("server conn %p start to release", exh->handle);
transSendAsync(pThrd->asyncPool, &m->q);
uvReleaseExHandle(refId);
transReleaseExHandle(refMgt, refId);
return;
_return1:
tTrace("server handle %p failed to send to release handle", exh);
uvReleaseExHandle(refId);
transReleaseExHandle(refMgt, refId);
return;
_return2:
tTrace("server handle %p failed to send to release handle", exh);
......@@ -1146,12 +1143,12 @@ void transSendResponse(const STransMsg* msg) {
m->type = Normal;
tDebug("server conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &m->q);
uvReleaseExHandle(refId);
transReleaseExHandle(refMgt, refId);
return;
_return1:
tTrace("server handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont);
uvReleaseExHandle(refId);
transReleaseExHandle(refMgt, refId);
return;
_return2:
tTrace("server handle %p failed to send resp", exh);
......@@ -1174,13 +1171,13 @@ void transRegisterMsg(const STransMsg* msg) {
m->type = Register;
tTrace("server conn %p start to register brokenlink callback", exh->handle);
transSendAsync(pThrd->asyncPool, &m->q);
uvReleaseExHandle(refId);
transReleaseExHandle(refMgt, refId);
return;
_return1:
tTrace("server handle %p failed to send to register brokenlink", exh);
rpcFreeCont(msg->pCont);
uvReleaseExHandle(refId);
transReleaseExHandle(refMgt, refId);
return;
_return2:
tTrace("server handle %p failed to send to register brokenlink", exh);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册