未验证 提交 bd12ad6a 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #5076 from freemine/mac

taosIterateRef: shall NOT return removed node
...@@ -24,22 +24,22 @@ ...@@ -24,22 +24,22 @@
#define TSDB_REF_STATE_DELETED 2 #define TSDB_REF_STATE_DELETED 2
typedef struct SRefNode { typedef struct SRefNode {
struct SRefNode *prev; // previous node struct SRefNode *prev; // previous node
struct SRefNode *next; // next node struct SRefNode *next; // next node
void *p; // pointer to resource protected, void *p; // pointer to resource protected,
int64_t rid; // reference ID int64_t rid; // reference ID
int32_t count; // number of references int32_t count; // number of references
int removed; // 1: removed int removed; // 1: removed
} SRefNode; } SRefNode;
typedef struct { typedef struct {
SRefNode **nodeList; // array of SRefNode linked list SRefNode **nodeList; // array of SRefNode linked list
int state; // 0: empty, 1: active; 2: deleted int state; // 0: empty, 1: active; 2: deleted
int rsetId; // refSet ID, global unique int rsetId; // refSet ID, global unique
int64_t rid; // increase by one for each new reference int64_t rid; // increase by one for each new reference
int max; // mod int max; // mod
int32_t count; // total number of SRefNodes in this set int32_t count; // total number of SRefNodes in this set
int64_t *lockedBy; int64_t *lockedBy;
void (*fp)(void *); void (*fp)(void *);
} SRefSet; } SRefSet;
...@@ -62,9 +62,9 @@ int taosOpenRef(int max, void (*fp)(void *)) ...@@ -62,9 +62,9 @@ int taosOpenRef(int max, void (*fp)(void *))
SRefSet *pSet; SRefSet *pSet;
int64_t *lockedBy; int64_t *lockedBy;
int i, rsetId; int i, rsetId;
pthread_once(&tsRefModuleInit, taosInitRefModule); pthread_once(&tsRefModuleInit, taosInitRefModule);
nodeList = calloc(sizeof(SRefNode *), (size_t)max); nodeList = calloc(sizeof(SRefNode *), (size_t)max);
if (nodeList == NULL) { if (nodeList == NULL) {
terrno = TSDB_CODE_REF_NO_MEMORY; terrno = TSDB_CODE_REF_NO_MEMORY;
...@@ -79,12 +79,12 @@ int taosOpenRef(int max, void (*fp)(void *)) ...@@ -79,12 +79,12 @@ int taosOpenRef(int max, void (*fp)(void *))
} }
pthread_mutex_lock(&tsRefMutex); pthread_mutex_lock(&tsRefMutex);
for (i = 0; i < TSDB_REF_OBJECTS; ++i) { for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS; tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId if (tsNextId == 0) tsNextId = 1; // dont use 0 as rsetId
if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break; if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
} }
if (i < TSDB_REF_OBJECTS) { if (i < TSDB_REF_OBJECTS) {
rsetId = tsNextId; rsetId = tsNextId;
...@@ -105,7 +105,7 @@ int taosOpenRef(int max, void (*fp)(void *)) ...@@ -105,7 +105,7 @@ int taosOpenRef(int max, void (*fp)(void *))
free (nodeList); free (nodeList);
free (lockedBy); free (lockedBy);
uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum); uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
} }
pthread_mutex_unlock(&tsRefMutex); pthread_mutex_unlock(&tsRefMutex);
...@@ -127,7 +127,7 @@ int taosCloseRef(int rsetId) ...@@ -127,7 +127,7 @@ int taosCloseRef(int rsetId)
pthread_mutex_lock(&tsRefMutex); pthread_mutex_lock(&tsRefMutex);
if (pSet->state == TSDB_REF_STATE_ACTIVE) { if (pSet->state == TSDB_REF_STATE_ACTIVE) {
pSet->state = TSDB_REF_STATE_DELETED; pSet->state = TSDB_REF_STATE_DELETED;
deleted = 1; deleted = 1;
uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count); uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count);
...@@ -142,7 +142,7 @@ int taosCloseRef(int rsetId) ...@@ -142,7 +142,7 @@ int taosCloseRef(int rsetId)
return 0; return 0;
} }
int64_t taosAddRef(int rsetId, void *p) int64_t taosAddRef(int rsetId, void *p)
{ {
int hash; int hash;
SRefNode *pNode; SRefNode *pNode;
...@@ -163,7 +163,7 @@ int64_t taosAddRef(int rsetId, void *p) ...@@ -163,7 +163,7 @@ int64_t taosAddRef(int rsetId, void *p)
terrno = TSDB_CODE_REF_ID_REMOVED; terrno = TSDB_CODE_REF_ID_REMOVED;
return -1; return -1;
} }
pNode = calloc(sizeof(SRefNode), 1); pNode = calloc(sizeof(SRefNode), 1);
if (pNode == NULL) { if (pNode == NULL) {
terrno = TSDB_CODE_REF_NO_MEMORY; terrno = TSDB_CODE_REF_NO_MEMORY;
...@@ -173,7 +173,7 @@ int64_t taosAddRef(int rsetId, void *p) ...@@ -173,7 +173,7 @@ int64_t taosAddRef(int rsetId, void *p)
rid = atomic_add_fetch_64(&pSet->rid, 1); rid = atomic_add_fetch_64(&pSet->rid, 1);
hash = rid % pSet->max; hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy+hash);
pNode->p = p; pNode->p = p;
pNode->rid = rid; pNode->rid = rid;
pNode->count = 1; pNode->count = 1;
...@@ -187,16 +187,16 @@ int64_t taosAddRef(int rsetId, void *p) ...@@ -187,16 +187,16 @@ int64_t taosAddRef(int rsetId, void *p)
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
return rid; return rid;
} }
int taosRemoveRef(int rsetId, int64_t rid) int taosRemoveRef(int rsetId, int64_t rid)
{ {
return taosDecRefCount(rsetId, rid, 1); return taosDecRefCount(rsetId, rid, 1);
} }
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid // if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosAcquireRef(int rsetId, int64_t rid) void *taosAcquireRef(int rsetId, int64_t rid)
{ {
int hash; int hash;
SRefNode *pNode; SRefNode *pNode;
...@@ -223,7 +223,7 @@ void *taosAcquireRef(int rsetId, int64_t rid) ...@@ -223,7 +223,7 @@ void *taosAcquireRef(int rsetId, int64_t rid)
terrno = TSDB_CODE_REF_ID_REMOVED; terrno = TSDB_CODE_REF_ID_REMOVED;
return NULL; return NULL;
} }
hash = rid % pSet->max; hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy+hash);
...@@ -233,8 +233,8 @@ void *taosAcquireRef(int rsetId, int64_t rid) ...@@ -233,8 +233,8 @@ void *taosAcquireRef(int rsetId, int64_t rid)
if (pNode->rid == rid) { if (pNode->rid == rid) {
break; break;
} }
pNode = pNode->next; pNode = pNode->next;
} }
if (pNode) { if (pNode) {
...@@ -258,7 +258,7 @@ void *taosAcquireRef(int rsetId, int64_t rid) ...@@ -258,7 +258,7 @@ void *taosAcquireRef(int rsetId, int64_t rid)
return p; return p;
} }
int taosReleaseRef(int rsetId, int64_t rid) int taosReleaseRef(int rsetId, int64_t rid)
{ {
return taosDecRefCount(rsetId, rid, 0); return taosDecRefCount(rsetId, rid, 0);
} }
...@@ -280,6 +280,7 @@ void *taosIterateRef(int rsetId, int64_t rid) { ...@@ -280,6 +280,7 @@ void *taosIterateRef(int rsetId, int64_t rid) {
return NULL; return NULL;
} }
void *newP = NULL;
pSet = tsRefSetList + rsetId; pSet = tsRefSetList + rsetId;
taosIncRsetCount(pSet); taosIncRsetCount(pSet);
if (pSet->state != TSDB_REF_STATE_ACTIVE) { if (pSet->state != TSDB_REF_STATE_ACTIVE) {
...@@ -289,52 +290,68 @@ void *taosIterateRef(int rsetId, int64_t rid) { ...@@ -289,52 +290,68 @@ void *taosIterateRef(int rsetId, int64_t rid) {
return NULL; return NULL;
} }
int hash = 0; do {
if (rid > 0) { newP = NULL;
hash = rid % pSet->max; int hash = 0;
taosLockList(pSet->lockedBy+hash); if (rid > 0) {
hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash];
while (pNode) {
if (pNode->rid == rid) break;
pNode = pNode->next;
}
pNode = pSet->nodeList[hash]; if (pNode == NULL) {
while (pNode) { uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid);
if (pNode->rid == rid) break; terrno = TSDB_CODE_REF_NOT_EXIST;
pNode = pNode->next; taosUnlockList(pSet->lockedBy+hash);
} taosDecRsetCount(pSet);
return NULL;
}
if (pNode == NULL) { // rid is there
uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid); pNode = pNode->next;
terrno = TSDB_CODE_REF_NOT_EXIST; // check first place
taosUnlockList(pSet->lockedBy+hash); while (pNode) {
return NULL; if (!pNode->removed) break;
pNode = pNode->next;
}
if (pNode == NULL) {
taosUnlockList(pSet->lockedBy+hash);
hash++;
}
} }
// rid is there if (pNode == NULL) {
pNode = pNode->next; for (; hash < pSet->max; ++hash) {
if (pNode == NULL) { taosLockList(pSet->lockedBy+hash);
taosUnlockList(pSet->lockedBy+hash); pNode = pSet->nodeList[hash];
hash++; if (pNode) {
// check first place
while (pNode) {
if (!pNode->removed) break;
pNode = pNode->next;
}
if (pNode) break;
}
taosUnlockList(pSet->lockedBy+hash);
}
} }
}
if (pNode == NULL) { if (pNode) {
for (; hash < pSet->max; ++hash) { pNode->count++; // acquire it
taosLockList(pSet->lockedBy+hash); newP = pNode->p;
pNode = pSet->nodeList[hash];
if (pNode) break;
taosUnlockList(pSet->lockedBy+hash); taosUnlockList(pSet->lockedBy+hash);
uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid);
} else {
uTrace("rsetId:%d the list is over", rsetId);
} }
}
void *newP = NULL; if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one
if (pNode) { if (pNode) rid = pNode->rid;
pNode->count++; // acquire it } while (newP && pNode->removed);
newP = pNode->p;
taosUnlockList(pSet->lockedBy+hash);
uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid);
} else {
uTrace("rsetId:%d the list is over", rsetId);
}
if (rid > 0) taosReleaseRef(rsetId, rid); // release the current one
taosDecRsetCount(pSet); taosDecRsetCount(pSet);
...@@ -350,22 +367,22 @@ int taosListRef() { ...@@ -350,22 +367,22 @@ int taosListRef() {
for (int i = 0; i < TSDB_REF_OBJECTS; ++i) { for (int i = 0; i < TSDB_REF_OBJECTS; ++i) {
pSet = tsRefSetList + i; pSet = tsRefSetList + i;
if (pSet->state == TSDB_REF_STATE_EMPTY) if (pSet->state == TSDB_REF_STATE_EMPTY)
continue; continue;
uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count); uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count);
for (int j=0; j < pSet->max; ++j) { for (int j=0; j < pSet->max; ++j) {
pNode = pSet->nodeList[j]; pNode = pSet->nodeList[j];
while (pNode) { while (pNode) {
uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count); uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count);
pNode = pNode->next; pNode = pNode->next;
num++; num++;
} }
} }
} }
pthread_mutex_unlock(&tsRefMutex); pthread_mutex_unlock(&tsRefMutex);
...@@ -397,16 +414,16 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { ...@@ -397,16 +414,16 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
terrno = TSDB_CODE_REF_ID_REMOVED; terrno = TSDB_CODE_REF_ID_REMOVED;
return -1; return -1;
} }
hash = rid % pSet->max; hash = rid % pSet->max;
taosLockList(pSet->lockedBy+hash); taosLockList(pSet->lockedBy+hash);
pNode = pSet->nodeList[hash]; pNode = pSet->nodeList[hash];
while (pNode) { while (pNode) {
if (pNode->rid == rid) if (pNode->rid == rid)
break; break;
pNode = pNode->next; pNode = pNode->next;
} }
if (pNode) { if (pNode) {
...@@ -417,18 +434,18 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { ...@@ -417,18 +434,18 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
if (pNode->prev) { if (pNode->prev) {
pNode->prev->next = pNode->next; pNode->prev->next = pNode->next;
} else { } else {
pSet->nodeList[hash] = pNode->next; pSet->nodeList[hash] = pNode->next;
} }
if (pNode->next) { if (pNode->next) {
pNode->next->prev = pNode->prev; pNode->next->prev = pNode->prev;
} }
released = 1; released = 1;
} else { } else {
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid); uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid);
} }
} else { } else {
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST; terrno = TSDB_CODE_REF_NOT_EXIST;
code = -1; code = -1;
} }
...@@ -437,11 +454,11 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) { ...@@ -437,11 +454,11 @@ static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
if (released) { if (released) {
uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode); uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
(*pSet->fp)(pNode->p); (*pSet->fp)(pNode->p);
free(pNode); free(pNode);
taosDecRsetCount(pSet); taosDecRsetCount(pSet);
} }
return code; return code;
} }
...@@ -493,5 +510,5 @@ static void taosDecRsetCount(SRefSet *pSet) { ...@@ -493,5 +510,5 @@ static void taosDecRsetCount(SRefSet *pSet) {
} }
pthread_mutex_unlock(&tsRefMutex); pthread_mutex_unlock(&tsRefMutex);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册