提交 7747ab23 编写于 作者: S Shengliang Guan

ref

上级 f680976d
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_UTIL_REF_H #ifndef _TD_UTIL_REF_H_
#define _TD_UTIL_REF_H #define _TD_UTIL_REF_H_
#include "os.h" #include "os.h"
...@@ -25,45 +25,45 @@ extern "C" { ...@@ -25,45 +25,45 @@ extern "C" {
// open a reference set, max is the mod used by hash, fp is the pointer to free resource function // open a reference set, max is the mod used by hash, fp is the pointer to free resource function
// return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately // return rsetId which will be used by other APIs. On error, -1 is returned, and terrno is set appropriately
int taosOpenRef(int max, void (*fp)(void *)); int32_t taosOpenRef(int32_t max, void (*fp)(void *));
// close the reference set, refId is the return value by taosOpenRef // close the reference set, refId is the return value by taosOpenRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosCloseRef(int refId); int32_t taosCloseRef(int32_t refId);
// add ref, p is the pointer to resource or pointer ID // add ref, p is the pointer to resource or pointer ID
// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately // return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
int64_t taosAddRef(int refId, void *p); int64_t taosAddRef(int32_t refId, void *p);
// remove ref, rid is the reference ID returned by taosAddRef // remove ref, rid is the reference ID returned by taosAddRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosRemoveRef(int rsetId, int64_t rid); int32_t taosRemoveRef(int32_t rsetId, int64_t rid);
// acquire ref, rid is the reference ID returned by taosAddRef // acquire ref, rid is the reference ID returned by taosAddRef
// return the resource p. On error, NULL is returned, and terrno is set appropriately // return the resource p. On error, NULL is returned, and terrno is set appropriately
void *taosAcquireRef(int rsetId, int64_t rid); void *taosAcquireRef(int32_t rsetId, int64_t rid);
// release ref, rid is the reference ID returned by taosAddRef // release ref, rid is the reference ID returned by taosAddRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately // return 0 if success. On error, -1 is returned, and terrno is set appropriately
int taosReleaseRef(int rsetId, int64_t rid); int32_t taosReleaseRef(int32_t rsetId, int64_t rid);
// return the first reference if rid is 0, otherwise return the next after current reference. // return the first reference if rid is 0, otherwise return the next after current reference.
// if return value is NULL, it means list is over(if terrno is set, it means error happens) // if return value is NULL, it means list is over(if terrno is set, it means error happens)
void *taosIterateRef(int rsetId, int64_t rid); void *taosIterateRef(int32_t rsetId, int64_t rid);
// return the number of references in system // return the number of references in system
int taosListRef(); int32_t taosListRef();
#define RID_VALID(x) ((x) > 0) #define RID_VALID(x) ((x) > 0)
/* sample code to iterate the refs /* sample code to iterate the refs
void demoIterateRefs(int rsetId) { void demoIterateRefs(int32_t rsetId) {
void *p = taosIterateRef(refId, 0); void *p = taosIterateRef(refId, 0);
while (p) { while (p) {
// process P // process P
// get the rid from p // get the rid from p
p = taosIterateRef(rsetId, rid); p = taosIterateRef(rsetId, rid);
...@@ -76,4 +76,4 @@ void demoIterateRefs(int rsetId) { ...@@ -76,4 +76,4 @@ void demoIterateRefs(int rsetId) {
} }
#endif #endif
#endif /*_TD_UTIL_REF_H*/ #endif /*_TD_UTIL_REF_H_*/
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #define _DEFAULT_SOURCE
#include "tref.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "tutil.h" #include "tutil.h"
...@@ -25,49 +25,48 @@ ...@@ -25,49 +25,48 @@
#define TSDB_REF_STATE_DELETED 2 #define TSDB_REF_STATE_DELETED 2
typedef struct SRefNode { typedef struct SRefNode {
struct SRefNode *prev; // previous node struct SRefNode *prev; // previous node
struct SRefNode *next; // next node struct SRefNode *next; // next node
void *p; // pointer to resource protected, void *p; // pointer to resource protected,
int64_t rid; // reference ID int64_t rid; // reference ID
int32_t count; // number of references int32_t count; // number of references
int removed; // 1: removed int32_t removed; // 1: removed
} SRefNode; } SRefNode;
typedef struct { typedef struct {
SRefNode **nodeList; // array of SRefNode linked list SRefNode **nodeList; // array of SRefNode linked list
int state; // 0: empty, 1: active; 2: deleted int32_t state; // 0: empty, 1: active; 2: deleted
int rsetId; // refSet ID, global unique int32_t rsetId; // refSet ID, global unique
int64_t rid; // increase by one for each new reference int64_t rid; // increase by one for each new reference
int max; // mod int32_t max; // mod
int32_t count; // total number of SRefNodes in this set int32_t count; // total number of SRefNodes in this set
int64_t *lockedBy; int64_t *lockedBy;
void (*fp)(void *); void (*fp)(void *);
} SRefSet; } SRefSet;
static SRefSet tsRefSetList[TSDB_REF_OBJECTS]; static SRefSet tsRefSetList[TSDB_REF_OBJECTS];
static pthread_once_t tsRefModuleInit = PTHREAD_ONCE_INIT; static pthread_once_t tsRefModuleInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tsRefMutex; static pthread_mutex_t tsRefMutex;
static int tsRefSetNum = 0; static int32_t tsRefSetNum = 0;
static int tsNextId = 0; static int32_t tsNextId = 0;
static void taosInitRefModule(void); static void taosInitRefModule(void);
static void taosLockList(int64_t *lockedBy); static void taosLockList(int64_t *lockedBy);
static void taosUnlockList(int64_t *lockedBy); static void taosUnlockList(int64_t *lockedBy);
static void taosIncRsetCount(SRefSet *pSet); static void taosIncRsetCount(SRefSet *pSet);
static void taosDecRsetCount(SRefSet *pSet); static void taosDecRsetCount(SRefSet *pSet);
static int taosDecRefCount(int rsetId, int64_t rid, int remove); static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove);
int taosOpenRef(int max, void (*fp)(void *)) int32_t taosOpenRef(int32_t max, void (*fp)(void *)) {
{
SRefNode **nodeList; SRefNode **nodeList;
SRefSet *pSet; SRefSet *pSet;
int64_t *lockedBy; int64_t *lockedBy;
int i, rsetId; int32_t i, rsetId;
pthread_once(&tsRefModuleInit, taosInitRefModule); pthread_once(&tsRefModuleInit, taosInitRefModule);
nodeList = calloc(sizeof(SRefNode *), (size_t)max); nodeList = calloc(sizeof(SRefNode *), (size_t)max);
if (nodeList == NULL) { if (nodeList == NULL) {
terrno = TSDB_CODE_REF_NO_MEMORY; terrno = TSDB_CODE_REF_NO_MEMORY;
return -1; return -1;
} }
...@@ -83,7 +82,7 @@ int taosOpenRef(int max, void (*fp)(void *)) ...@@ -83,7 +82,7 @@ int taosOpenRef(int max, void (*fp)(void *))
for (i = 0; i < TSDB_REF_OBJECTS; ++i) { for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS; tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId
if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break; if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
} }
...@@ -103,8 +102,8 @@ int taosOpenRef(int max, void (*fp)(void *)) ...@@ -103,8 +102,8 @@ int taosOpenRef(int max, void (*fp)(void *))
uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum); uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum);
} else { } else {
rsetId = TSDB_CODE_REF_FULL; rsetId = TSDB_CODE_REF_FULL;
free (nodeList); free(nodeList);
free (lockedBy); free(lockedBy);
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum); uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
} }
...@@ -113,10 +112,9 @@ int taosOpenRef(int max, void (*fp)(void *)) ...@@ -113,10 +112,9 @@ int taosOpenRef(int max, void (*fp)(void *))
return rsetId; return rsetId;
} }
int taosCloseRef(int rsetId) int32_t taosCloseRef(int32_t rsetId) {
{ SRefSet *pSet;
SRefSet *pSet; int32_t deleted = 0;
int deleted = 0;
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("rsetId:%d is invalid, out of range", rsetId); uTrace("rsetId:%d is invalid, out of range", rsetId);
...@@ -143,9 +141,8 @@ int taosCloseRef(int rsetId) ...@@ -143,9 +141,8 @@ int taosCloseRef(int rsetId)
return 0; return 0;
} }
int64_t taosAddRef(int rsetId, void *p) int64_t taosAddRef(int32_t rsetId, void *p) {
{ int32_t hash;
int hash;
SRefNode *pNode; SRefNode *pNode;
SRefSet *pSet; SRefSet *pSet;
int64_t rid = 0; int64_t rid = 0;
...@@ -173,7 +170,7 @@ int64_t taosAddRef(int rsetId, void *p) ...@@ -173,7 +170,7 @@ int64_t taosAddRef(int rsetId, void *p)
rid = atomic_add_fetch_64(&pSet->rid, 1); rid = atomic_add_fetch_64(&pSet->rid, 1);
hash = rid % pSet->max; hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy + hash);
pNode->p = p; pNode->p = p;
pNode->rid = rid; pNode->rid = rid;
...@@ -186,26 +183,22 @@ int64_t taosAddRef(int rsetId, void *p) ...@@ -186,26 +183,22 @@ int64_t taosAddRef(int rsetId, void *p)
uTrace("rsetId:%d p:%p rid:%" PRId64 " is added, count:%d", rsetId, p, rid, pSet->count); uTrace("rsetId:%d p:%p rid:%" PRId64 " is added, count:%d", rsetId, p, rid, pSet->count);
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy + hash);
return rid; return rid;
} }
int taosRemoveRef(int rsetId, int64_t rid) int32_t taosRemoveRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 1); }
{
return taosDecRefCount(rsetId, rid, 1);
}
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid // if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosAcquireRef(int rsetId, int64_t rid) void *taosAcquireRef(int32_t rsetId, int64_t rid) {
{ int32_t hash;
int hash;
SRefNode *pNode; SRefNode *pNode;
SRefSet *pSet; SRefSet *pSet;
void *p = NULL; void *p = NULL;
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
//uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rsetId not valid", rsetId, rid); // uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rsetId not valid", rsetId, rid);
terrno = TSDB_CODE_REF_INVALID_ID; terrno = TSDB_CODE_REF_INVALID_ID;
return NULL; return NULL;
} }
...@@ -226,7 +219,7 @@ void *taosAcquireRef(int rsetId, int64_t rid) ...@@ -226,7 +219,7 @@ void *taosAcquireRef(int rsetId, int64_t rid)
} }
hash = rid % pSet->max; hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy + hash);
pNode = pSet->nodeList[hash]; pNode = pSet->nodeList[hash];
...@@ -252,20 +245,17 @@ void *taosAcquireRef(int rsetId, int64_t rid) ...@@ -252,20 +245,17 @@ void *taosAcquireRef(int rsetId, int64_t rid)
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid); uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid);
} }
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy + hash);
taosDecRsetCount(pSet); taosDecRsetCount(pSet);
return p; return p;
} }
int taosReleaseRef(int rsetId, int64_t rid) int32_t taosReleaseRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0); }
{
return taosDecRefCount(rsetId, rid, 0);
}
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid // if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosIterateRef(int rsetId, int64_t rid) { void *taosIterateRef(int32_t rsetId, int64_t rid) {
SRefNode *pNode = NULL; SRefNode *pNode = NULL;
SRefSet *pSet; SRefSet *pSet;
...@@ -293,10 +283,10 @@ void *taosIterateRef(int rsetId, int64_t rid) { ...@@ -293,10 +283,10 @@ void *taosIterateRef(int rsetId, int64_t rid) {
do { do {
newP = NULL; newP = NULL;
int hash = 0; int32_t hash = 0;
if (rid > 0) { if (rid > 0) {
hash = rid % pSet->max; hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy + hash);
pNode = pSet->nodeList[hash]; pNode = pSet->nodeList[hash];
while (pNode) { while (pNode) {
...@@ -307,7 +297,7 @@ void *taosIterateRef(int rsetId, int64_t rid) { ...@@ -307,7 +297,7 @@ void *taosIterateRef(int rsetId, int64_t rid) {
if (pNode == NULL) { if (pNode == NULL) {
uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid); uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST; terrno = TSDB_CODE_REF_NOT_EXIST;
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy + hash);
taosDecRsetCount(pSet); taosDecRsetCount(pSet);
return NULL; return NULL;
} }
...@@ -320,14 +310,14 @@ void *taosIterateRef(int rsetId, int64_t rid) { ...@@ -320,14 +310,14 @@ void *taosIterateRef(int rsetId, int64_t rid) {
pNode = pNode->next; pNode = pNode->next;
} }
if (pNode == NULL) { if (pNode == NULL) {
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy + hash);
hash++; hash++;
} }
} }
if (pNode == NULL) { if (pNode == NULL) {
for (; hash < pSet->max; ++hash) { for (; hash < pSet->max; ++hash) {
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy + hash);
pNode = pSet->nodeList[hash]; pNode = pSet->nodeList[hash];
if (pNode) { if (pNode) {
// check first place // check first place
...@@ -337,14 +327,14 @@ void *taosIterateRef(int rsetId, int64_t rid) { ...@@ -337,14 +327,14 @@ void *taosIterateRef(int rsetId, int64_t rid) {
} }
if (pNode) break; if (pNode) break;
} }
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy + hash);
} }
} }
if (pNode) { if (pNode) {
pNode->count++; // acquire it pNode->count++; // acquire it
newP = pNode->p; newP = pNode->p;
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy + hash);
uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid); uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid);
} else { } else {
uTrace("rsetId:%d the list is over", rsetId); uTrace("rsetId:%d the list is over", rsetId);
...@@ -359,22 +349,21 @@ void *taosIterateRef(int rsetId, int64_t rid) { ...@@ -359,22 +349,21 @@ void *taosIterateRef(int rsetId, int64_t rid) {
return newP; return newP;
} }
int taosListRef() { int32_t taosListRef() {
SRefSet *pSet; SRefSet *pSet;
SRefNode *pNode; SRefNode *pNode;
int num = 0; int32_t num = 0;
pthread_mutex_lock(&tsRefMutex); pthread_mutex_lock(&tsRefMutex);
for (int i = 0; i < TSDB_REF_OBJECTS; ++i) { for (int32_t i = 0; i < TSDB_REF_OBJECTS; ++i) {
pSet = tsRefSetList + i; pSet = tsRefSetList + i;
if (pSet->state == TSDB_REF_STATE_EMPTY) if (pSet->state == TSDB_REF_STATE_EMPTY) continue;
continue;
uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count); uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count);
for (int j=0; j < pSet->max; ++j) { for (int32_t j = 0; j < pSet->max; ++j) {
pNode = pSet->nodeList[j]; pNode = pSet->nodeList[j];
while (pNode) { while (pNode) {
...@@ -390,12 +379,12 @@ int taosListRef() { ...@@ -390,12 +379,12 @@ int taosListRef() {
return num; return num;
} }
static int taosDecRefCount(int rsetId, int64_t rid, int remove) { static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) {
int hash; int32_t hash;
SRefSet *pSet; SRefSet *pSet;
SRefNode *pNode; SRefNode *pNode;
int released = 0; int32_t released = 0;
int code = 0; int32_t code = 0;
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) { if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rsetId not valid", rsetId, rid); uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rsetId not valid", rsetId, rid);
...@@ -417,12 +406,11 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { ...@@ -417,12 +406,11 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
} }
hash = rid % pSet->max; hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy + hash);
pNode = pSet->nodeList[hash]; pNode = pSet->nodeList[hash];
while (pNode) { while (pNode) {
if (pNode->rid == rid) if (pNode->rid == rid) break;
break;
pNode = pNode->next; pNode = pNode->next;
} }
...@@ -437,13 +425,13 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { ...@@ -437,13 +425,13 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
} else { } else {
pSet->nodeList[hash] = pNode->next; pSet->nodeList[hash] = pNode->next;
} }
if (pNode->next) { if (pNode->next) {
pNode->next->prev = pNode->prev; pNode->next->prev = pNode->prev;
} }
released = 1; released = 1;
} else { } else {
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid); uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid);
} }
} else { } else {
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
...@@ -451,10 +439,11 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { ...@@ -451,10 +439,11 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
code = -1; code = -1;
} }
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy + hash);
if (released) { if (released) {
uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count,
pNode);
(*pSet->fp)(pNode->p); (*pSet->fp)(pNode->p);
free(pNode); free(pNode);
...@@ -466,7 +455,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { ...@@ -466,7 +455,7 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
static void taosLockList(int64_t *lockedBy) { static void taosLockList(int64_t *lockedBy) {
int64_t tid = taosGetSelfPthreadId(); int64_t tid = taosGetSelfPthreadId();
int i = 0; int32_t i = 0;
while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) { while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
if (++i % 100 == 0) { if (++i % 100 == 0) {
sched_yield(); sched_yield();
...@@ -481,9 +470,7 @@ static void taosUnlockList(int64_t *lockedBy) { ...@@ -481,9 +470,7 @@ static void taosUnlockList(int64_t *lockedBy) {
} }
} }
static void taosInitRefModule(void) { static void taosInitRefModule(void) { pthread_mutex_init(&tsRefMutex, NULL); }
pthread_mutex_init(&tsRefMutex, NULL);
}
static void taosIncRsetCount(SRefSet *pSet) { static void taosIncRsetCount(SRefSet *pSet) {
atomic_add_fetch_32(&pSet->count, 1); atomic_add_fetch_32(&pSet->count, 1);
...@@ -512,4 +499,3 @@ static void taosDecRsetCount(SRefSet *pSet) { ...@@ -512,4 +499,3 @@ static void taosDecRsetCount(SRefSet *pSet) {
pthread_mutex_unlock(&tsRefMutex); pthread_mutex_unlock(&tsRefMutex);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册