diff --git a/src/util/src/tref.c b/src/util/src/tref.c index ebae8ece14e2f3bd045fee92482afa4c6c403949..5c7bdec01eb57cb8ee561d10a8417daf15d90bcd 100644 --- a/src/util/src/tref.c +++ b/src/util/src/tref.c @@ -24,22 +24,22 @@ #define TSDB_REF_STATE_DELETED 2 typedef struct SRefNode { - struct SRefNode *prev; // previous node + struct SRefNode *prev; // previous node struct SRefNode *next; // next node - void *p; // pointer to resource protected, + void *p; // pointer to resource protected, int64_t rid; // reference ID int32_t count; // number of references - int removed; // 1: removed + int removed; // 1: removed } SRefNode; typedef struct { SRefNode **nodeList; // array of SRefNode linked list int state; // 0: empty, 1: active; 2: deleted - int rsetId; // refSet ID, global unique + int rsetId; // refSet ID, global unique int64_t rid; // increase by one for each new reference - int max; // mod + int max; // mod int32_t count; // total number of SRefNodes in this set - int64_t *lockedBy; + int64_t *lockedBy; void (*fp)(void *); } SRefSet; @@ -62,9 +62,9 @@ int taosOpenRef(int max, void (*fp)(void *)) SRefSet *pSet; int64_t *lockedBy; int i, rsetId; - + pthread_once(&tsRefModuleInit, taosInitRefModule); - + nodeList = calloc(sizeof(SRefNode *), (size_t)max); if (nodeList == NULL) { terrno = TSDB_CODE_REF_NO_MEMORY; @@ -79,12 +79,12 @@ int taosOpenRef(int max, void (*fp)(void *)) } pthread_mutex_lock(&tsRefMutex); - + for (i = 0; i < TSDB_REF_OBJECTS; ++i) { tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS; if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break; - } + } if (i < TSDB_REF_OBJECTS) { rsetId = tsNextId; @@ -105,7 +105,7 @@ int taosOpenRef(int max, void (*fp)(void *)) free (nodeList); free (lockedBy); uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum); - } + } pthread_mutex_unlock(&tsRefMutex); @@ -127,7 +127,7 @@ int taosCloseRef(int rsetId) pthread_mutex_lock(&tsRefMutex); - if (pSet->state == TSDB_REF_STATE_ACTIVE) { + if (pSet->state == TSDB_REF_STATE_ACTIVE) { pSet->state = TSDB_REF_STATE_DELETED; deleted = 1; uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count); @@ -142,7 +142,7 @@ int taosCloseRef(int rsetId) return 0; } -int64_t taosAddRef(int rsetId, void *p) +int64_t taosAddRef(int rsetId, void *p) { int hash; SRefNode *pNode; @@ -163,7 +163,7 @@ int64_t taosAddRef(int rsetId, void *p) terrno = TSDB_CODE_REF_ID_REMOVED; return -1; } - + pNode = calloc(sizeof(SRefNode), 1); if (pNode == NULL) { terrno = TSDB_CODE_REF_NO_MEMORY; @@ -173,7 +173,7 @@ int64_t taosAddRef(int rsetId, void *p) rid = atomic_add_fetch_64(&pSet->rid, 1); hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); - + pNode->p = p; pNode->rid = rid; pNode->count = 1; @@ -187,16 +187,16 @@ int64_t taosAddRef(int rsetId, void *p) taosUnlockList(pSet->lockedBy+hash); - return rid; + return rid; } -int taosRemoveRef(int rsetId, int64_t rid) +int taosRemoveRef(int rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 1); } // if rid is 0, return the first p in hash list, otherwise, return the next after current rid -void *taosAcquireRef(int rsetId, int64_t rid) +void *taosAcquireRef(int rsetId, int64_t rid) { int hash; SRefNode *pNode; @@ -223,7 +223,7 @@ void *taosAcquireRef(int rsetId, int64_t rid) terrno = TSDB_CODE_REF_ID_REMOVED; return NULL; } - + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); @@ -233,8 +233,8 @@ void *taosAcquireRef(int rsetId, int64_t rid) if (pNode->rid == rid) { break; } - - pNode = pNode->next; + + pNode = pNode->next; } if (pNode) { @@ -258,7 +258,7 @@ void *taosAcquireRef(int rsetId, int64_t rid) return p; } -int taosReleaseRef(int rsetId, int64_t rid) +int taosReleaseRef(int rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0); } @@ -280,6 +280,7 @@ void *taosIterateRef(int rsetId, int64_t rid) { return NULL; } + void *newP = NULL; pSet = tsRefSetList + rsetId; taosIncRsetCount(pSet); if (pSet->state != TSDB_REF_STATE_ACTIVE) { @@ -289,52 +290,68 @@ void *taosIterateRef(int rsetId, int64_t rid) { return NULL; } - int hash = 0; - if (rid > 0) { - hash = rid % pSet->max; - taosLockList(pSet->lockedBy+hash); + do { + newP = NULL; + int hash = 0; + if (rid > 0) { + hash = rid % pSet->max; + taosLockList(pSet->lockedBy+hash); + + pNode = pSet->nodeList[hash]; + while (pNode) { + if (pNode->rid == rid) break; + pNode = pNode->next; + } - pNode = pSet->nodeList[hash]; - while (pNode) { - if (pNode->rid == rid) break; - pNode = pNode->next; - } + if (pNode == NULL) { + uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid); + terrno = TSDB_CODE_REF_NOT_EXIST; + taosUnlockList(pSet->lockedBy+hash); + taosDecRsetCount(pSet); + return NULL; + } - if (pNode == NULL) { - uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid); - terrno = TSDB_CODE_REF_NOT_EXIST; - taosUnlockList(pSet->lockedBy+hash); - return NULL; + // rid is there + pNode = pNode->next; + // check first place + while (pNode) { + if (!pNode->removed) break; + pNode = pNode->next; + } + if (pNode == NULL) { + taosUnlockList(pSet->lockedBy+hash); + hash++; + } } - // rid is there - pNode = pNode->next; - if (pNode == NULL) { - taosUnlockList(pSet->lockedBy+hash); - hash++; + if (pNode == NULL) { + for (; hash < pSet->max; ++hash) { + taosLockList(pSet->lockedBy+hash); + pNode = pSet->nodeList[hash]; + if (pNode) { + // check first place + while (pNode) { + if (!pNode->removed) break; + pNode = pNode->next; + } + if (pNode) break; + } + taosUnlockList(pSet->lockedBy+hash); + } } - } - if (pNode == NULL) { - for (; hash < pSet->max; ++hash) { - taosLockList(pSet->lockedBy+hash); - pNode = pSet->nodeList[hash]; - if (pNode) break; + if (pNode) { + pNode->count++; // acquire it + newP = pNode->p; taosUnlockList(pSet->lockedBy+hash); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid); + } else { + uTrace("rsetId:%d the list is over", rsetId); } - } - void *newP = NULL; - if (pNode) { - pNode->count++; // acquire it - newP = pNode->p; - taosUnlockList(pSet->lockedBy+hash); - uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid); - } else { - uTrace("rsetId:%d the list is over", rsetId); - } - - if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one + if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one + if (pNode) rid = pNode->rid; + } while (newP && pNode->removed); taosDecRsetCount(pSet); @@ -350,22 +367,22 @@ int taosListRef() { for (int i = 0; i < TSDB_REF_OBJECTS; ++i) { pSet = tsRefSetList + i; - - if (pSet->state == TSDB_REF_STATE_EMPTY) + + if (pSet->state == TSDB_REF_STATE_EMPTY) continue; uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count); for (int j=0; j < pSet->max; ++j) { pNode = pSet->nodeList[j]; - + while (pNode) { uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count); pNode = pNode->next; num++; } - } - } + } + } pthread_mutex_unlock(&tsRefMutex); @@ -397,16 +414,16 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { terrno = TSDB_CODE_REF_ID_REMOVED; return -1; } - + hash = rid % pSet->max; taosLockList(pSet->lockedBy+hash); - + pNode = pSet->nodeList[hash]; while (pNode) { if (pNode->rid == rid) break; - pNode = pNode->next; + pNode = pNode->next; } if (pNode) { @@ -417,18 +434,18 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { if (pNode->prev) { pNode->prev->next = pNode->next; } else { - pSet->nodeList[hash] = pNode->next; + pSet->nodeList[hash] = pNode->next; } if (pNode->next) { - pNode->next->prev = pNode->prev; - } + pNode->next->prev = pNode->prev; + } released = 1; } else { - uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid); + uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid); } } else { - uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); + uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); terrno = TSDB_CODE_REF_NOT_EXIST; code = -1; } @@ -437,11 +454,11 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { if (released) { uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); - (*pSet->fp)(pNode->p); + (*pSet->fp)(pNode->p); free(pNode); taosDecRsetCount(pSet); - } + } return code; } @@ -493,5 +510,5 @@ static void taosDecRsetCount(SRefSet *pSet) { } pthread_mutex_unlock(&tsRefMutex); -} +}