diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index d01c34c810a5c5d8dd172bf9065565ab0dfa7ae5..05330ebff890ad71c37b1e72f4332e489eab6ac8 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -82,6 +82,7 @@ typedef struct { int8_t oldInUse; // server EP inUse passed by app int8_t redirect; // flag to indicate redirect int8_t connType; // connection type + int32_t rid; // refId returned by taosAddRef SRpcMsg *pRsp; // for synchronous API tsem_t *pSem; // for synchronous API SRpcEpSet *pSet; // for synchronous API @@ -374,7 +375,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { +int64_t rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -403,10 +404,11 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) { // set the handle to pContext, so app can cancel the request if (pMsg->handle) *((void **)pMsg->handle) = pContext; - taosAddRef(tsRpcRefId, pContext); + pContext->rid = taosAddRef(tsRpcRefId, pContext); + rpcSendReqToServer(pRpc, pContext); - return; + return pContext->rid; } void rpcSendResponse(const SRpcMsg *pRsp) { @@ -551,11 +553,10 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { return code; } -void rpcCancelRequest(void *handle) { - SRpcReqContext *pContext = handle; +void rpcCancelRequest(int64_t rid) { - int code = taosAcquireRef(tsRpcRefId, pContext); - if (code < 0) return; + SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rid); + if (pContext == NULL) return; rpcCloseConn(pContext->pConn); diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h index 3e5db33cf7c53b637de7642d87d2ba211d58d27a..cd5092f30a290c51de49e38b0226bbed637dc0e6 100644 --- a/src/util/inc/tref.h +++ b/src/util/inc/tref.h @@ -32,7 +32,10 @@ int taosCloseRef(int refId); // add ref, p is the pointer to resource or pointer ID // return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately int64_t taosAddRef(int refId, void *p); -#define taosRemoveRef taosReleaseRef + +// remove ref, rid is the reference ID returned by taosAddRef +// return 0 if success. On error, -1 is returned, and terrno is set appropriately +int taosRemoveRef(int rsetId, int64_t rid); // acquire ref, rid is the reference ID returned by taosAddRef // return the resource p. On error, NULL is returned, and terrno is set appropriately diff --git a/src/util/src/tref.c b/src/util/src/tref.c index c2adb6d15b2ddde8c79acd767c8b964d136f78fc..915ed53193a0ef3c79a40e4ced2fdacdda2b0a86 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -29,6 +29,7 @@ typedef struct SRefNode { void *p; // pointer to resource protected, int64_t rid; // reference ID int32_t count; // number of references + int removed; // 1: removed } SRefNode; typedef struct { @@ -51,8 +52,9 @@ static int tsNextId = 0; static void taosInitRefModule(void); static void taosLockList(int64_t *lockedBy); static void taosUnlockList(int64_t *lockedBy); -static void taosIncRefCount(SRefSet *pSet); -static void taosDecRefCount(SRefSet *pSet); +static void taosIncRsetCount(SRefSet *pSet); +static void taosDecRsetCount(SRefSet *pSet); +static int taosDecRefCount(int rsetId, int64_t rid, int remove); int taosOpenRef(int max, void (*fp)(void *)) { @@ -86,7 +88,7 @@ int taosOpenRef(int max, void (*fp)(void *)) if (i < TSDB_REF_OBJECTS) { rsetId = tsNextId; pSet = tsRefSetList + rsetId; - taosIncRefCount(pSet); + taosIncRsetCount(pSet); pSet->max = max; pSet->nodeList = nodeList; pSet->lockedBy = lockedBy; @@ -134,7 +136,7 @@ int taosCloseRef(int rsetId) pthread_mutex_unlock(&tsRefMutex); - if (deleted) taosDecRefCount(pSet); + if (deleted) taosDecRsetCount(pSet); return 0; } @@ -153,9 +155,9 @@ int64_t taosAddRef(int rsetId, void *p) } pSet = tsRefSetList + rsetId; - taosIncRefCount(pSet); + taosIncRsetCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { - taosDecRefCount(pSet); + taosDecRsetCount(pSet); uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p); terrno = TSDB_CODE_REF_ID_REMOVED; return -1; @@ -187,6 +189,12 @@ int64_t taosAddRef(int rsetId, void *p) return rid; } +int taosRemoveRef(int rsetId, int64_t rid) +{ + return taosDecRefCount(rsetId, rid, 1); +} + +// if rid is 0, return the first p in hash list, otherwise, return the next after current rid void *taosAcquireRef(int rsetId, int64_t rid) { int hash; @@ -200,11 +208,17 @@ void *taosAcquireRef(int rsetId, int64_t rid) return NULL; } + if (rid <= 0) { + uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rid not valid", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + return NULL; + } + pSet = tsRefSetList + rsetId; - taosIncRefCount(pSet); + taosIncRsetCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid); - taosDecRefCount(pSet); + taosDecRsetCount(pSet); terrno = TSDB_CODE_REF_ID_REMOVED; return NULL; } @@ -223,9 +237,14 @@ void *taosAcquireRef(int rsetId, int64_t rid) } if (pNode) { - pNode->count++; - p = pNode->p; - uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid); + if (pNode->removed == 0) { + pNode->count++; + p = pNode->p; + uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid); + } else { + terrno = TSDB_CODE_REF_NOT_EXIST; + uTrace("rsetId:%d p:%p rid:%" PRId64 " is already removed, failed to acquire", rsetId, pNode->p, rid); + } } else { terrno = TSDB_CODE_REF_NOT_EXIST; uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid); @@ -233,75 +252,14 @@ void *taosAcquireRef(int rsetId, int64_t rid) taosUnlockList(pSet->lockedBy+hash); - taosDecRefCount(pSet); + taosDecRsetCount(pSet); return p; } int taosReleaseRef(int rsetId, int64_t rid) { - int hash; - SRefNode *pNode; - SRefSet *pSet; - int released = 0; - - if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { - uTrace("rsetId:%d rid:%" PRId64 " failed to release, rsetId not valid", rsetId, rid); - terrno = TSDB_CODE_REF_INVALID_ID; - return -1; - } - - pSet = tsRefSetList + rsetId; - if (pSet->state == TSDB_REF_STATE_EMPTY) { - uTrace("rsetId:%d rid:%" PRId64 " failed to release, cleaned", rsetId, rid); - terrno = TSDB_CODE_REF_ID_REMOVED; - return -1; - } - - terrno = 0; - hash = rid % pSet->max; - taosLockList(pSet->lockedBy+hash); - - pNode = pSet->nodeList[hash]; - while (pNode) { - if (pNode->rid == rid) - break; - - pNode = pNode->next; - } - - if (pNode) { - pNode->count--; - - if (pNode->count == 0) { - if ( pNode->prev ) { - pNode->prev->next = pNode->next; - } else { - pSet->nodeList[hash] = pNode->next; - } - - if ( pNode->next ) { - pNode->next->prev = pNode->prev; - } - - (*pSet->fp)(pNode->p); - - uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); - free(pNode); - released = 1; - } else { - uTrace("rsetId:%d p:%p rid:%" PRId64 "is released", rsetId, pNode->p, rid); - } - } else { - uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release", rsetId, rid); - terrno = TSDB_CODE_REF_NOT_EXIST; - } - - taosUnlockList(pSet->lockedBy+hash); - - if (released) taosDecRefCount(pSet); - - return terrno; + return taosDecRefCount(rsetId, rid, 0); } // if rid is 0, return the first p in hash list, otherwise, return the next after current rid @@ -315,12 +273,18 @@ void *taosIterateRef(int rsetId, int64_t rid) { return NULL; } + if (rid <= 0) { + uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rid not valid", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + return NULL; + } + pSet = tsRefSetList + rsetId; - taosIncRefCount(pSet); + taosIncRsetCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid); terrno = TSDB_CODE_REF_ID_REMOVED; - taosDecRefCount(pSet); + taosDecRsetCount(pSet); return NULL; } @@ -371,7 +335,7 @@ void *taosIterateRef(int rsetId, int64_t rid) { if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one - taosDecRefCount(pSet); + taosDecRsetCount(pSet); return newP; } @@ -407,6 +371,81 @@ int taosListRef() { return num; } +static int taosDecRefCount(int rsetId, int64_t rid, int remove) { + int hash; + SRefSet *pSet; + SRefNode *pNode; + int released = 0; + int code = 0; + + if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { + uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rsetId not valid", rsetId, rid); + terrno = TSDB_CODE_REF_INVALID_ID; + return -1; + } + + if (rid <= 0) { + uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rid not valid", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + return -1; + } + + pSet = tsRefSetList + rsetId; + if (pSet->state == TSDB_REF_STATE_EMPTY) { + uTrace("rsetId:%d rid:%" PRId64 " failed to remove, cleaned", rsetId, rid); + terrno = TSDB_CODE_REF_ID_REMOVED; + return -1; + } + + terrno = 0; + hash = rid % pSet->max; + + taosLockList(pSet->lockedBy+hash); + + pNode = pSet->nodeList[hash]; + while (pNode) { + if (pNode->rid == rid) + break; + + pNode = pNode->next; + } + + if (pNode) { + pNode->count--; + if (remove) pNode->removed = 1; + + if (pNode->count <= 0) { + if (pNode->prev) { + pNode->prev->next = pNode->next; + } else { + pSet->nodeList[hash] = pNode->next; + } + + if (pNode->next) { + pNode->next->prev = pNode->prev; + } + + (*pSet->fp)(pNode->p); + + uTrace("rsetId:%d p:%p rid:%" PRId64 "is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); + free(pNode); + released = 1; + } else { + uTrace("rsetId:%d p:%p rid:%" PRId64 "is released, count:%d", rsetId, pNode->p, rid, pNode->count); + } + } else { + uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + code = -1; + } + + taosUnlockList(pSet->lockedBy+hash); + + if (released) taosDecRsetCount(pSet); + + return code; +} + static void taosLockList(int64_t *lockedBy) { int64_t tid = taosGetPthreadId(); int i = 0; @@ -428,12 +467,12 @@ static void taosInitRefModule(void) { pthread_mutex_init(&tsRefMutex, NULL); } -static void taosIncRefCount(SRefSet *pSet) { +static void taosIncRsetCount(SRefSet *pSet) { atomic_add_fetch_32(&pSet->count, 1); uTrace("rsetId:%d inc count:%d", pSet->rsetId, pSet->count); } -static void taosDecRefCount(SRefSet *pSet) { +static void taosDecRsetCount(SRefSet *pSet) { int32_t count = atomic_sub_fetch_32(&pSet->count, 1); uTrace("rsetId:%d dec count:%d", pSet->rsetId, pSet->count); diff --git a/src/util/tests/trefTest.c b/src/util/tests/trefTest.c index 09ffccd7b5fc4ea1c2a645a8ff6d8ec0c70cc8f2..6887b24abdae9f5d9949fee0073a35eb717eb4f7 100644 --- a/src/util/tests/trefTest.c +++ b/src/util/tests/trefTest.c @@ -11,106 +11,119 @@ #include "tulog.h" typedef struct { - int refNum; - int steps; - int refId; - void **p; + int refNum; + int steps; + int rsetId; + int64_t rid; + void **p; } SRefSpace; -void iterateRefs(int refId) { +void iterateRefs(int rsetId) { int count = 0; - void *p = taosIterateRef(refId, NULL); + void *p = taosIterateRef(rsetId, NULL); while (p) { // process P count++; - p = taosIterateRef(refId, p); + p = taosIterateRef(rsetId, p); } printf(" %d ", count); } -void *takeRefActions(void *param) { +void *addRef(void *param) { SRefSpace *pSpace = (SRefSpace *)param; - int code, id; + int id; + int64_t rid; for (int i=0; i < pSpace->steps; ++i) { - printf("s"); + printf("a"); id = random() % pSpace->refNum; - code = taosAddRef(pSpace->refId, pSpace->p[id]); - usleep(1); - - id = random() % pSpace->refNum; - code = taosAcquireRef(pSpace->refId, pSpace->p[id]); - if (code >= 0) { - usleep(id % 5 + 1); - taosReleaseRef(pSpace->refId, pSpace->p[id]); + if (pSpace->rid[id] <= 0) { + pSpace->p[id] = malloc(128); + pSpace->rid[id] = taosAddRef(pSpace->rsetId, pSpace->p[id]); } + usleep(100); + } + return NULL; +} + +void *removeRef(void *param) { + SRefSpace *pSpace = (SRefSpace *)param; + int id; + int64_t rid; + + for (int i=0; i < pSpace->steps; ++i) { + printf("d"); id = random() % pSpace->refNum; - taosRemoveRef(pSpace->refId, pSpace->p[id]); - usleep(id %5 + 1); + if (pSpace->rid[id] > 0) { + code = taosRemoveRef(pSpace->rsetId, pSpace->rid[id]); + if (code == 0) pSpace->rid[id] = 0; + } + usleep(100); + } + + return NULL; +} + +void *acquireRelease(void *param) { + SRefSpace *pSpace = (SRefSpace *)param; + int id; + int64_t rid; + + for (int i=0; i < pSpace->steps; ++i) { + printf("a"); + id = random() % pSpace->refNum; - code = taosAcquireRef(pSpace->refId, pSpace->p[id]); + code = taosAcquireRef(pSpace->rsetId, pSpace->p[id]); if (code >= 0) { usleep(id % 5 + 1); - taosReleaseRef(pSpace->refId, pSpace->p[id]); + taosReleaseRef(pSpace->rsetId, pSpace->p[id]); } - - id = random() % pSpace->refNum; - iterateRefs(id); } - for (int i=0; i < pSpace->refNum; ++i) { - taosRemoveRef(pSpace->refId, pSpace->p[i]); - } - - //uInfo("refId:%d thread exits", pSpace->refId); - return NULL; } void myfree(void *p) { - return; + free(p); } void *openRefSpace(void *param) { SRefSpace *pSpace = (SRefSpace *)param; printf("c"); - pSpace->refId = taosOpenRef(50, myfree); + pSpace->rsetId = taosOpenRef(50, myfree); - if (pSpace->refId < 0) { - printf("failed to open ref, reson:%s\n", tstrerror(pSpace->refId)); + if (pSpace->rsetId < 0) { + printf("failed to open ref, reson:%s\n", tstrerror(pSpace->rsetId)); return NULL; } pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum); - for (int i=0; irefNum; ++i) { - pSpace->p[i] = (void *) malloc(128); - } pthread_attr_t thattr; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_t thread1, thread2, thread3; - pthread_create(&(thread1), &thattr, takeRefActions, (void *)(pSpace)); - pthread_create(&(thread2), &thattr, takeRefActions, (void *)(pSpace)); - pthread_create(&(thread3), &thattr, takeRefActions, (void *)(pSpace)); + pthread_create(&(thread1), &thattr, addRef, (void *)(pSpace)); + pthread_create(&(thread2), &thattr, removeRef, (void *)(pSpace)); + pthread_create(&(thread3), &thattr, acquireRelease, (void *)(pSpace)); pthread_join(thread1, NULL); pthread_join(thread2, NULL); pthread_join(thread3, NULL); - taosCloseRef(pSpace->refId); - for (int i=0; irefNum; ++i) { - free(pSpace->p[i]); + taosRemoveRef(pSpace->rsetId, pSpace->rid[i]); } - uInfo("refId:%d main thread exit", pSpace->refId); + taosCloseRef(pSpace->rsetId); + + uInfo("rsetId:%d main thread exit", pSpace->rsetId); free(pSpace->p); pSpace->p = NULL; @@ -140,7 +153,7 @@ int main(int argc, char *argv[]) { printf("\nusage: %s [options] \n", argv[0]); printf(" [-n]: number of references, default: %d\n", refNum); printf(" [-s]: steps to run for each reference, default: %d\n", steps); - printf(" [-t]: number of refIds running in parallel, default: %d\n", threads); + printf(" [-t]: number of rsetIds running in parallel, default: %d\n", threads); printf(" [-l]: number of loops, default: %d\n", loops); printf(" [-d]: debugFlag, default: %d\n", uDebugFlag); exit(0);