tref.c 13.2 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
ref  
Shengliang Guan 已提交
16 17
#define _DEFAULT_SOURCE
#include "tref.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
18
#include "taoserror.h"
S
log  
Shengliang Guan 已提交
19
#include "tlog.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
20 21 22 23 24 25 26 27
#include "tutil.h"

#define TSDB_REF_OBJECTS       50
#define TSDB_REF_STATE_EMPTY   0
#define TSDB_REF_STATE_ACTIVE  1
#define TSDB_REF_STATE_DELETED 2

typedef struct SRefNode {
S
ref  
Shengliang Guan 已提交
28 29 30 31 32 33
  struct SRefNode *prev;     // previous node
  struct SRefNode *next;     // next node
  void            *p;        // pointer to resource protected,
  int64_t          rid;      // reference ID
  int32_t          count;    // number of references
  int32_t          removed;  // 1: removed
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
34
} SRefNode;
S
ref  
Shengliang Guan 已提交
35

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
36
typedef struct {
S
ref  
Shengliang Guan 已提交
37 38 39 40 41 42
  SRefNode **nodeList;  // array of SRefNode linked list
  int32_t    state;     // 0: empty, 1: active;  2: deleted
  int32_t    rsetId;    // refSet ID, global unique
  int64_t    rid;       // increase by one for each new reference
  int32_t    max;       // mod
  int32_t    count;     // total number of SRefNodes in this set
43
  int64_t   *lockedBy;
S
ref  
Shengliang Guan 已提交
44
  void (*fp)(void *);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45 46 47
} SRefSet;

static SRefSet         tsRefSetList[TSDB_REF_OBJECTS];
wafwerar's avatar
wafwerar 已提交
48 49
static TdThreadOnce  tsRefModuleInit = PTHREAD_ONCE_INIT;
static TdThreadMutex tsRefMutex;
S
ref  
Shengliang Guan 已提交
50 51 52 53 54 55 56 57 58 59 60
static int32_t         tsRefSetNum = 0;
static int32_t         tsNextId = 0;

static void    taosInitRefModule(void);
static void    taosLockList(int64_t *lockedBy);
static void    taosUnlockList(int64_t *lockedBy);
static void    taosIncRsetCount(SRefSet *pSet);
static void    taosDecRsetCount(SRefSet *pSet);
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove);

int32_t taosOpenRef(int32_t max, void (*fp)(void *)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61 62 63
  SRefNode **nodeList;
  SRefSet   *pSet;
  int64_t   *lockedBy;
S
ref  
Shengliang Guan 已提交
64
  int32_t    i, rsetId;
65

wafwerar's avatar
wafwerar 已提交
66
  taosThreadOnce(&tsRefModuleInit, taosInitRefModule);
67

wafwerar's avatar
wafwerar 已提交
68
  nodeList = taosMemoryCalloc(sizeof(SRefNode *), (size_t)max);
S
ref  
Shengliang Guan 已提交
69
  if (nodeList == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70 71
    terrno = TSDB_CODE_REF_NO_MEMORY;
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72 73
  }

wafwerar's avatar
wafwerar 已提交
74
  lockedBy = taosMemoryCalloc(sizeof(int64_t), (size_t)max);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75
  if (lockedBy == NULL) {
wafwerar's avatar
wafwerar 已提交
76
    taosMemoryFree(nodeList);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77 78
    terrno = TSDB_CODE_REF_NO_MEMORY;
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79 80
  }

wafwerar's avatar
wafwerar 已提交
81
  taosThreadMutexLock(&tsRefMutex);
82

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83 84
  for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
    tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
S
ref  
Shengliang Guan 已提交
85
    if (tsNextId == 0) tsNextId = 1;  // dont use 0 as rsetId
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86
    if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
87
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
88 89

  if (i < TSDB_REF_OBJECTS) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90 91
    rsetId = tsNextId;
    pSet = tsRefSetList + rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
92 93 94 95
    pSet->max = max;
    pSet->nodeList = nodeList;
    pSet->lockedBy = lockedBy;
    pSet->fp = fp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97
    pSet->rid = 1;
    pSet->rsetId = rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
98
    pSet->state = TSDB_REF_STATE_ACTIVE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99
    taosIncRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100 101

    tsRefSetNum++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102
    uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104
    rsetId = TSDB_CODE_REF_FULL;
wafwerar's avatar
wafwerar 已提交
105 106
    taosMemoryFree(nodeList);
    taosMemoryFree(lockedBy);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107
    uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
108
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
109

wafwerar's avatar
wafwerar 已提交
110
  taosThreadMutexUnlock(&tsRefMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
111

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112
  return rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113 114
}

S
ref  
Shengliang Guan 已提交
115 116 117
int32_t taosCloseRef(int32_t rsetId) {
  SRefSet *pSet;
  int32_t  deleted = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
119 120 121 122
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
    uTrace("rsetId:%d is invalid, out of range", rsetId);
    terrno = TSDB_CODE_REF_INVALID_ID;
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123 124
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
125
  pSet = tsRefSetList + rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
126

wafwerar's avatar
wafwerar 已提交
127
  taosThreadMutexLock(&tsRefMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128

129
  if (pSet->state == TSDB_REF_STATE_ACTIVE) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130 131
    pSet->state = TSDB_REF_STATE_DELETED;
    deleted = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132
    uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
134
    uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
135 136
  }

wafwerar's avatar
wafwerar 已提交
137
  taosThreadMutexUnlock(&tsRefMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139
  if (deleted) taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
140 141

  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142 143
}

S
ref  
Shengliang Guan 已提交
144 145
int64_t taosAddRef(int32_t rsetId, void *p) {
  int32_t   hash;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
146 147
  SRefNode *pNode;
  SRefSet  *pSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
148
  int64_t   rid = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
149

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150 151 152 153
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
    uTrace("rsetId:%d p:%p failed to add, rsetId not valid", rsetId, p);
    terrno = TSDB_CODE_REF_INVALID_ID;
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154 155
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156
  pSet = tsRefSetList + rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
157
  taosIncRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
158
  if (pSet->state != TSDB_REF_STATE_ACTIVE) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159
    taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160 161 162
    uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p);
    terrno = TSDB_CODE_REF_ID_REMOVED;
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163
  }
164

wafwerar's avatar
wafwerar 已提交
165
  pNode = taosMemoryCalloc(sizeof(SRefNode), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166 167 168 169
  if (pNode == NULL) {
    terrno = TSDB_CODE_REF_NO_MEMORY;
    return -1;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
170

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
171 172
  rid = atomic_add_fetch_64(&pSet->rid, 1);
  hash = rid % pSet->max;
S
ref  
Shengliang Guan 已提交
173
  taosLockList(pSet->lockedBy + hash);
174

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175 176 177
  pNode->p = p;
  pNode->rid = rid;
  pNode->count = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179 180 181 182
  pNode->prev = NULL;
  pNode->next = pSet->nodeList[hash];
  if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode;
  pSet->nodeList[hash] = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
184
  uTrace("rsetId:%d p:%p rid:%" PRId64 " is added, count:%d", rsetId, p, rid, pSet->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
185

S
ref  
Shengliang Guan 已提交
186
  taosUnlockList(pSet->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187

188
  return rid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189 190
}

S
ref  
Shengliang Guan 已提交
191
int32_t taosRemoveRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 1); }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
192 193

// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
S
ref  
Shengliang Guan 已提交
194 195
void *taosAcquireRef(int32_t rsetId, int64_t rid) {
  int32_t   hash;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
196 197
  SRefNode *pNode;
  SRefSet  *pSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
198
  void     *p = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
200
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
S
ref  
Shengliang Guan 已提交
201
    // uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rsetId not valid", rsetId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202 203
    terrno = TSDB_CODE_REF_INVALID_ID;
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204 205
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
206 207 208 209 210 211
  if (rid <= 0) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rid not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_NOT_EXIST;
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212
  pSet = tsRefSetList + rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
213
  taosIncRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214
  if (pSet->state != TSDB_REF_STATE_ACTIVE) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
215
    uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
216
    taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217 218
    terrno = TSDB_CODE_REF_ID_REMOVED;
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
219
  }
220

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221
  hash = rid % pSet->max;
S
ref  
Shengliang Guan 已提交
222
  taosLockList(pSet->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223 224 225 226

  pNode = pSet->nodeList[hash];

  while (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227
    if (pNode->rid == rid) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228 229
      break;
    }
230 231

    pNode = pNode->next;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232 233 234
  }

  if (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235 236 237 238 239 240 241 242
    if (pNode->removed == 0) {
      pNode->count++;
      p = pNode->p;
      uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid);
    } else {
      terrno = TSDB_CODE_REF_NOT_EXIST;
      uTrace("rsetId:%d p:%p rid:%" PRId64 " is already removed, failed to acquire", rsetId, pNode->p, rid);
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
244 245
    terrno = TSDB_CODE_REF_NOT_EXIST;
    uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246 247
  }

S
ref  
Shengliang Guan 已提交
248
  taosUnlockList(pSet->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250
  taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
251

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252
  return p;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253 254
}

S
ref  
Shengliang Guan 已提交
255
int32_t taosReleaseRef(int32_t rsetId, int64_t rid) { return taosDecRefCount(rsetId, rid, 0); }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
S
ref  
Shengliang Guan 已提交
258
void *taosIterateRef(int32_t rsetId, int64_t rid) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259 260 261
  SRefNode *pNode = NULL;
  SRefSet  *pSet;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262 263 264
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rsetId not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_INVALID_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
265 266 267
    return NULL;
  }

S
TD-2820  
Shengliang Guan 已提交
268
  if (rid < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269 270 271 272 273
    uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rid not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_NOT_EXIST;
    return NULL;
  }

274
  void *newP = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275
  pSet = tsRefSetList + rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276
  taosIncRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277
  if (pSet->state != TSDB_REF_STATE_ACTIVE) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
278 279
    uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid);
    terrno = TSDB_CODE_REF_ID_REMOVED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
280
    taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281 282 283
    return NULL;
  }

284 285
  do {
    newP = NULL;
S
ref  
Shengliang Guan 已提交
286
    int32_t hash = 0;
287 288
    if (rid > 0) {
      hash = rid % pSet->max;
S
ref  
Shengliang Guan 已提交
289
      taosLockList(pSet->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
290

291 292 293 294 295
      pNode = pSet->nodeList[hash];
      while (pNode) {
        if (pNode->rid == rid) break;
        pNode = pNode->next;
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
296

297 298 299
      if (pNode == NULL) {
        uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid);
        terrno = TSDB_CODE_REF_NOT_EXIST;
S
ref  
Shengliang Guan 已提交
300
        taosUnlockList(pSet->lockedBy + hash);
301 302 303
        taosDecRsetCount(pSet);
        return NULL;
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
304

305 306
      // rid is there
      pNode = pNode->next;
307 308 309 310 311
      // check first place
      while (pNode) {
        if (!pNode->removed) break;
        pNode = pNode->next;
      }
312
      if (pNode == NULL) {
S
ref  
Shengliang Guan 已提交
313
        taosUnlockList(pSet->lockedBy + hash);
314 315
        hash++;
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
316 317
    }

318 319
    if (pNode == NULL) {
      for (; hash < pSet->max; ++hash) {
S
ref  
Shengliang Guan 已提交
320
        taosLockList(pSet->lockedBy + hash);
321
        pNode = pSet->nodeList[hash];
322 323 324 325 326 327 328 329
        if (pNode) {
          // check first place
          while (pNode) {
            if (!pNode->removed) break;
            pNode = pNode->next;
          }
          if (pNode) break;
        }
S
ref  
Shengliang Guan 已提交
330
        taosUnlockList(pSet->lockedBy + hash);
331
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
332 333
    }

334 335 336
    if (pNode) {
      pNode->count++;  // acquire it
      newP = pNode->p;
S
ref  
Shengliang Guan 已提交
337
      taosUnlockList(pSet->lockedBy + hash);
338 339 340
      uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid);
    } else {
      uTrace("rsetId:%d the list is over", rsetId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
341 342
    }

343 344 345
    if (rid > 0) taosReleaseRef(rsetId, rid);  // release the current one
    if (pNode) rid = pNode->rid;
  } while (newP && pNode->removed);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
346

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
347
  taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
348 349 350 351

  return newP;
}

S
ref  
Shengliang Guan 已提交
352
int32_t taosListRef() {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
353 354
  SRefSet  *pSet;
  SRefNode *pNode;
S
ref  
Shengliang Guan 已提交
355
  int32_t   num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
356

wafwerar's avatar
wafwerar 已提交
357
  taosThreadMutexLock(&tsRefMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358

S
ref  
Shengliang Guan 已提交
359
  for (int32_t i = 0; i < TSDB_REF_OBJECTS; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
360
    pSet = tsRefSetList + i;
361

S
ref  
Shengliang Guan 已提交
362
    if (pSet->state == TSDB_REF_STATE_EMPTY) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
363

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
364
    uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
365

S
ref  
Shengliang Guan 已提交
366
    for (int32_t j = 0; j < pSet->max; ++j) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
367
      pNode = pSet->nodeList[j];
368

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
369
      while (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
370
        uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
371 372 373
        pNode = pNode->next;
        num++;
      }
374 375
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
376

wafwerar's avatar
wafwerar 已提交
377
  taosThreadMutexUnlock(&tsRefMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
378 379 380 381

  return num;
}

S
ref  
Shengliang Guan 已提交
382 383
static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) {
  int32_t   hash;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
384 385
  SRefSet  *pSet;
  SRefNode *pNode;
S
ref  
Shengliang Guan 已提交
386 387
  int32_t   released = 0;
  int32_t   code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406

  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rsetId not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_INVALID_ID;
    return -1;
  }

  if (rid <= 0) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rid not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_NOT_EXIST;
    return -1;
  }

  pSet = tsRefSetList + rsetId;
  if (pSet->state == TSDB_REF_STATE_EMPTY) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to remove, cleaned", rsetId, rid);
    terrno = TSDB_CODE_REF_ID_REMOVED;
    return -1;
  }
407

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
408
  hash = rid % pSet->max;
S
ref  
Shengliang Guan 已提交
409
  taosLockList(pSet->lockedBy + hash);
410

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
411 412
  pNode = pSet->nodeList[hash];
  while (pNode) {
S
ref  
Shengliang Guan 已提交
413
    if (pNode->rid == rid) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
414

415
    pNode = pNode->next;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
416 417 418 419 420 421 422 423 424 425
  }

  if (pNode) {
    pNode->count--;
    if (remove) pNode->removed = 1;

    if (pNode->count <= 0) {
      if (pNode->prev) {
        pNode->prev->next = pNode->next;
      } else {
426
        pSet->nodeList[hash] = pNode->next;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
427
      }
S
ref  
Shengliang Guan 已提交
428

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
429
      if (pNode->next) {
430 431
        pNode->next->prev = pNode->prev;
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
432 433
      released = 1;
    } else {
D
dapan1121 已提交
434
      uTrace("rsetId:%d p:%p rid:%" PRId64 " is released, remain count %d", rsetId, pNode->p, rid, pNode->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
435 436
    }
  } else {
437
    uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
438 439 440 441
    terrno = TSDB_CODE_REF_NOT_EXIST;
    code = -1;
  }

S
ref  
Shengliang Guan 已提交
442
  taosUnlockList(pSet->lockedBy + hash);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
443

Y
TD-2257  
yihaoDeng 已提交
444
  if (released) {
S
ref  
Shengliang Guan 已提交
445 446
    uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count,
           pNode);
447
    (*pSet->fp)(pNode->p);
wafwerar's avatar
wafwerar 已提交
448
    taosMemoryFree(pNode);
Y
TD-2257  
yihaoDeng 已提交
449 450

    taosDecRsetCount(pSet);
451
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
452 453 454 455

  return code;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
456
static void taosLockList(int64_t *lockedBy) {
S
TD-2616  
Shengliang Guan 已提交
457
  int64_t tid = taosGetSelfPthreadId();
S
ref  
Shengliang Guan 已提交
458
  int32_t i = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
459 460 461 462 463 464 465 466
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
    if (++i % 100 == 0) {
      sched_yield();
    }
  }
}

static void taosUnlockList(int64_t *lockedBy) {
S
TD-2616  
Shengliang Guan 已提交
467
  int64_t tid = taosGetSelfPthreadId();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
468 469 470 471 472
  if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
    assert(false);
  }
}

wafwerar's avatar
wafwerar 已提交
473
static void taosInitRefModule(void) { taosThreadMutexInit(&tsRefMutex, NULL); }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
474

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
475
static void taosIncRsetCount(SRefSet *pSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
476
  atomic_add_fetch_32(&pSet->count, 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
477
  // uTrace("rsetId:%d inc count:%d", pSet->rsetId, count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
478 479
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
480
static void taosDecRsetCount(SRefSet *pSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
481
  int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
482
  // uTrace("rsetId:%d dec count:%d", pSet->rsetId, count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
483 484 485

  if (count > 0) return;

wafwerar's avatar
wafwerar 已提交
486
  taosThreadMutexLock(&tsRefMutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
487 488 489 490 491 492

  if (pSet->state != TSDB_REF_STATE_EMPTY) {
    pSet->state = TSDB_REF_STATE_EMPTY;
    pSet->max = 0;
    pSet->fp = NULL;

wafwerar's avatar
wafwerar 已提交
493 494
    taosMemoryFreeClear(pSet->nodeList);
    taosMemoryFreeClear(pSet->lockedBy);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
495 496

    tsRefSetNum--;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
497
    uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
498 499
  }

wafwerar's avatar
wafwerar 已提交
500
  taosThreadMutexUnlock(&tsRefMutex);
501
}