tref.c 12.5 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
#include "taoserror.h"
#include "tulog.h"
#include "tutil.h"

#define TSDB_REF_OBJECTS       50
#define TSDB_REF_STATE_EMPTY   0
#define TSDB_REF_STATE_ACTIVE  1
#define TSDB_REF_STATE_DELETED 2

typedef struct SRefNode {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27 28 29 30 31
  struct SRefNode  *prev;  // previous node 
  struct SRefNode  *next;  // next node
  void             *p;     // pointer to resource protected, 
  int64_t           rid;   // reference ID
  int32_t           count; // number of references
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
32
  int               removed; // 1: removed 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
33 34 35
} SRefNode;
	
typedef struct {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
36 37 38 39 40 41 42
  SRefNode **nodeList; // array of SRefNode linked list
  int        state;    // 0: empty, 1: active;  2: deleted
  int        rsetId;   // refSet ID, global unique 
  int64_t    rid;      // increase by one for each new reference
  int        max;      // mod 
  int32_t    count;    // total number of SRefNodes in this set
  int64_t   *lockedBy; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43 44 45 46 47 48 49 50 51 52 53 54
  void     (*fp)(void *);
} SRefSet;

static SRefSet         tsRefSetList[TSDB_REF_OBJECTS];
static pthread_once_t  tsRefModuleInit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tsRefMutex;
static int             tsRefSetNum = 0;
static int             tsNextId = 0;

static void taosInitRefModule(void);
static void taosLockList(int64_t *lockedBy);
static void taosUnlockList(int64_t *lockedBy);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
55 56 57
static void taosIncRsetCount(SRefSet *pSet);
static void taosDecRsetCount(SRefSet *pSet);
static int  taosDecRefCount(int rsetId, int64_t rid, int remove);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
58 59 60 61 62 63

int taosOpenRef(int max, void (*fp)(void *))
{
  SRefNode **nodeList;
  SRefSet   *pSet;
  int64_t   *lockedBy;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64
  int        i, rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
65 66 67 68
  
  pthread_once(&tsRefModuleInit, taosInitRefModule);
   
  nodeList = calloc(sizeof(SRefNode *), (size_t)max);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
69 70 71
  if (nodeList == NULL)  {
    terrno = TSDB_CODE_REF_NO_MEMORY;
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72 73 74 75 76
  }

  lockedBy = calloc(sizeof(int64_t), (size_t)max);
  if (lockedBy == NULL) {
    free(nodeList);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77 78
    terrno = TSDB_CODE_REF_NO_MEMORY;
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79 80 81 82 83 84
  }

  pthread_mutex_lock(&tsRefMutex);
 
  for (i = 0; i < TSDB_REF_OBJECTS; ++i) {
    tsNextId = (tsNextId + 1) % TSDB_REF_OBJECTS;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
85
    if (tsNextId == 0) tsNextId = 1;   // dont use 0 as rsetId
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86 87 88 89
    if (tsRefSetList[tsNextId].state == TSDB_REF_STATE_EMPTY) break;
  } 

  if (i < TSDB_REF_OBJECTS) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90 91
    rsetId = tsNextId;
    pSet = tsRefSetList + rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
92 93 94 95
    pSet->max = max;
    pSet->nodeList = nodeList;
    pSet->lockedBy = lockedBy;
    pSet->fp = fp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97
    pSet->rid = 1;
    pSet->rsetId = rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
98
    pSet->state = TSDB_REF_STATE_ACTIVE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99
    taosIncRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
100 101

    tsRefSetNum++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102
    uTrace("rsetId:%d is opened, max:%d, fp:%p refSetNum:%d", rsetId, max, fp, tsRefSetNum);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
104
    rsetId = TSDB_CODE_REF_FULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105 106 107 108 109 110 111
    free (nodeList);
    free (lockedBy);
    uTrace("run out of Ref ID, maximum:%d refSetNum:%d", TSDB_REF_OBJECTS, tsRefSetNum);
  } 

  pthread_mutex_unlock(&tsRefMutex);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112
  return rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113 114
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
115
int taosCloseRef(int rsetId)
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
116 117 118 119
{
  SRefSet  *pSet;
  int       deleted = 0;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
120 121 122 123
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
    uTrace("rsetId:%d is invalid, out of range", rsetId);
    terrno = TSDB_CODE_REF_INVALID_ID;
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124 125
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
126
  pSet = tsRefSetList + rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
127 128 129 130 131 132

  pthread_mutex_lock(&tsRefMutex);

  if (pSet->state == TSDB_REF_STATE_ACTIVE) { 
    pSet->state = TSDB_REF_STATE_DELETED;
    deleted = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133
    uTrace("rsetId:%d is closed, count:%d", rsetId, pSet->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
134
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
135
    uTrace("rsetId:%d is already closed, count:%d", rsetId, pSet->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
136 137 138 139
  }

  pthread_mutex_unlock(&tsRefMutex);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
140
  if (deleted) taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
141 142

  return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
143 144
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
145
int64_t taosAddRef(int rsetId, void *p) 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
146 147 148 149
{
  int       hash;
  SRefNode *pNode;
  SRefSet  *pSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150
  int64_t   rid = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152 153 154 155
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
    uTrace("rsetId:%d p:%p failed to add, rsetId not valid", rsetId, p);
    terrno = TSDB_CODE_REF_INVALID_ID;
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156 157
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
158
  pSet = tsRefSetList + rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159
  taosIncRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
160
  if (pSet->state != TSDB_REF_STATE_ACTIVE) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161
    taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
162 163 164
    uTrace("rsetId:%d p:%p failed to add, not active", rsetId, p);
    terrno = TSDB_CODE_REF_ID_REMOVED;
    return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
165 166
  }
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
167 168 169 170 171
  pNode = calloc(sizeof(SRefNode), 1);
  if (pNode == NULL) {
    terrno = TSDB_CODE_REF_NO_MEMORY;
    return -1;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
172

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173 174
  rid = atomic_add_fetch_64(&pSet->rid, 1);
  hash = rid % pSet->max;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
175 176
  taosLockList(pSet->lockedBy+hash);
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
177 178 179
  pNode->p = p;
  pNode->rid = rid;
  pNode->count = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
181 182 183 184
  pNode->prev = NULL;
  pNode->next = pSet->nodeList[hash];
  if (pSet->nodeList[hash]) pSet->nodeList[hash]->prev = pNode;
  pSet->nodeList[hash] = pNode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
185

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
186
  uTrace("rsetId:%d p:%p rid:%" PRId64 " is added, count:%d", rsetId, p, rid, pSet->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187 188 189

  taosUnlockList(pSet->lockedBy+hash);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190
  return rid; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191 192
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
193 194 195 196 197 198
int taosRemoveRef(int rsetId, int64_t rid) 
{
  return taosDecRefCount(rsetId, rid, 1);
}

// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199
void *taosAcquireRef(int rsetId, int64_t rid) 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
200
{
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201
  int       hash;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
202 203
  SRefNode *pNode;
  SRefSet  *pSet;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204
  void     *p = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
206 207 208 209
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rsetId not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_INVALID_ID;
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210 211
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212 213 214 215 216 217
  if (rid <= 0) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, rid not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_NOT_EXIST;
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218
  pSet = tsRefSetList + rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
219
  taosIncRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220
  if (pSet->state != TSDB_REF_STATE_ACTIVE) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221
    uTrace("rsetId:%d rid:%" PRId64 " failed to acquire, not active", rsetId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222
    taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223 224
    terrno = TSDB_CODE_REF_ID_REMOVED;
    return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225 226
  }
  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227
  hash = rid % pSet->max;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228 229 230 231 232
  taosLockList(pSet->lockedBy+hash);

  pNode = pSet->nodeList[hash];

  while (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233
    if (pNode->rid == rid) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234 235 236 237 238 239 240
      break;
    }
      
    pNode = pNode->next;   
  }

  if (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
241 242 243 244 245 246 247 248
    if (pNode->removed == 0) {
      pNode->count++;
      p = pNode->p;
      uTrace("rsetId:%d p:%p rid:%" PRId64 " is acquired", rsetId, pNode->p, rid);
    } else {
      terrno = TSDB_CODE_REF_NOT_EXIST;
      uTrace("rsetId:%d p:%p rid:%" PRId64 " is already removed, failed to acquire", rsetId, pNode->p, rid);
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
249
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250 251
    terrno = TSDB_CODE_REF_NOT_EXIST;
    uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to acquire", rsetId, rid);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252 253 254 255
  }

  taosUnlockList(pSet->lockedBy+hash);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256
  taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258
  return p;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259 260
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
261
int taosReleaseRef(int rsetId, int64_t rid) 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262
{
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
263
  return taosDecRefCount(rsetId, rid, 0);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
264 265
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266 267
// if rid is 0, return the first p in hash list, otherwise, return the next after current rid
void *taosIterateRef(int rsetId, int64_t rid) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
268 269 270
  SRefNode *pNode = NULL;
  SRefSet  *pSet;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271 272 273
  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rsetId not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_INVALID_ID;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
274 275 276
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277 278 279 280 281 282
  if (rid <= 0) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rid not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_NOT_EXIST;
    return NULL;
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283
  pSet = tsRefSetList + rsetId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284
  taosIncRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285
  if (pSet->state != TSDB_REF_STATE_ACTIVE) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
286 287
    uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rset not active", rsetId, rid);
    terrno = TSDB_CODE_REF_ID_REMOVED;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
288
    taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289 290 291 292
    return NULL;
  }

  int hash = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
293 294
  if (rid > 0) {
    hash = rid % pSet->max;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
295 296 297 298
    taosLockList(pSet->lockedBy+hash);

    pNode = pSet->nodeList[hash];
    while (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
299
      if (pNode->rid == rid) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
300 301 302 303
      pNode = pNode->next;   
    }

    if (pNode == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
304 305
      uError("rsetId:%d rid:%" PRId64 " not there, quit", rsetId, rid);
      terrno = TSDB_CODE_REF_NOT_EXIST;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
306 307 308 309
      taosUnlockList(pSet->lockedBy+hash);
      return NULL;
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
310
    // rid is there
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
    pNode = pNode->next;
    if (pNode == NULL) { 
      taosUnlockList(pSet->lockedBy+hash);
      hash++;
    }
  }

  if (pNode == NULL) {
    for (; hash < pSet->max; ++hash) {
      taosLockList(pSet->lockedBy+hash);
      pNode = pSet->nodeList[hash];
      if (pNode) break; 
      taosUnlockList(pSet->lockedBy+hash);
    }
  } 

  void *newP = NULL;
  if (pNode) {
    pNode->count++;  // acquire it
    newP = pNode->p;  
    taosUnlockList(pSet->lockedBy+hash);
Y
TD-2257  
yihaoDeng 已提交
332
    uTrace("rsetId:%d p:%p rid:%" PRId64 " is returned", rsetId, newP, rid); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
333
  } else {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
334
    uTrace("rsetId:%d the list is over", rsetId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
335 336
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
337
  if (rid > 0) taosReleaseRef(rsetId, rid);  // release the current one
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
338

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
339
  taosDecRsetCount(pSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
340 341 342 343

  return newP;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356
int taosListRef() {
  SRefSet  *pSet;
  SRefNode *pNode;
  int       num = 0;

  pthread_mutex_lock(&tsRefMutex);

  for (int i = 0; i < TSDB_REF_OBJECTS; ++i) {
    pSet = tsRefSetList + i;
    
    if (pSet->state == TSDB_REF_STATE_EMPTY) 
      continue;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
357
    uInfo("rsetId:%d state:%d count::%d", i, pSet->state, pSet->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358 359 360 361 362

    for (int j=0; j < pSet->max; ++j) {
      pNode = pSet->nodeList[j];
     
      while (pNode) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
363
        uInfo("rsetId:%d p:%p rid:%" PRId64 "count:%d", i, pNode->p, pNode->rid, pNode->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
364 365 366 367 368 369 370 371 372 373 374
        pNode = pNode->next;
        num++;
      }
    }  
  } 

  pthread_mutex_unlock(&tsRefMutex);

  return num;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
static int taosDecRefCount(int rsetId, int64_t rid, int remove) {
  int       hash;
  SRefSet  *pSet;
  SRefNode *pNode;
  int       released = 0;
  int       code = 0;

  if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rsetId not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_INVALID_ID;
    return -1;
  }

  if (rid <= 0) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to remove, rid not valid", rsetId, rid);
    terrno = TSDB_CODE_REF_NOT_EXIST;
    return -1;
  }

  pSet = tsRefSetList + rsetId;
  if (pSet->state == TSDB_REF_STATE_EMPTY) {
    uTrace("rsetId:%d rid:%" PRId64 " failed to remove, cleaned", rsetId, rid);
    terrno = TSDB_CODE_REF_ID_REMOVED;
    return -1;
  }
  
  hash = rid % pSet->max;
  taosLockList(pSet->lockedBy+hash);
  
  pNode = pSet->nodeList[hash];
  while (pNode) {
    if (pNode->rid == rid)
      break;

    pNode = pNode->next;  
  }

  if (pNode) {
    pNode->count--;
    if (remove) pNode->removed = 1;

    if (pNode->count <= 0) {
      if (pNode->prev) {
        pNode->prev->next = pNode->next;
      } else {
        pSet->nodeList[hash] = pNode->next;  
      }
		
      if (pNode->next) {
        pNode->next->prev = pNode->prev; 
      } 
      released = 1;
    } else {
Y
TD-2257  
yihaoDeng 已提交
428
       uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
429 430
    }
  } else {
Y
TD-2257  
yihaoDeng 已提交
431
    uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
432 433 434 435 436 437
    terrno = TSDB_CODE_REF_NOT_EXIST;
    code = -1;
  }

  taosUnlockList(pSet->lockedBy+hash);

Y
TD-2257  
yihaoDeng 已提交
438 439 440 441 442 443 444
  if (released) {
    uTrace("rsetId:%d p:%p rid:%" PRId64 " is removed, count:%d, free mem: %p", rsetId, pNode->p, rid, pSet->count, pNode);
    (*pSet->fp)(pNode->p); 
    free(pNode);

    taosDecRsetCount(pSet);
  } 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
445 446 447 448

  return code;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
449
static void taosLockList(int64_t *lockedBy) {
S
TD-2616  
Shengliang Guan 已提交
450
  int64_t tid = taosGetSelfPthreadId();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
451 452 453 454 455 456 457 458 459
  int     i = 0;
  while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) {
    if (++i % 100 == 0) {
      sched_yield();
    }
  }
}

static void taosUnlockList(int64_t *lockedBy) {
S
TD-2616  
Shengliang Guan 已提交
460
  int64_t tid = taosGetSelfPthreadId();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
461 462 463 464 465 466 467 468 469
  if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) {
    assert(false);
  }
}

static void taosInitRefModule(void) {
  pthread_mutex_init(&tsRefMutex, NULL);
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
470
static void taosIncRsetCount(SRefSet *pSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
471
  atomic_add_fetch_32(&pSet->count, 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
472
  // uTrace("rsetId:%d inc count:%d", pSet->rsetId, count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
473 474
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
475
static void taosDecRsetCount(SRefSet *pSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
476
  int32_t count = atomic_sub_fetch_32(&pSet->count, 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
477
  // uTrace("rsetId:%d dec count:%d", pSet->rsetId, count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
478 479 480 481 482 483 484 485 486 487

  if (count > 0) return;

  pthread_mutex_lock(&tsRefMutex);

  if (pSet->state != TSDB_REF_STATE_EMPTY) {
    pSet->state = TSDB_REF_STATE_EMPTY;
    pSet->max = 0;
    pSet->fp = NULL;

S
TD-1848  
Shengliang Guan 已提交
488 489
    tfree(pSet->nodeList);
    tfree(pSet->lockedBy);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
490 491

    tsRefSetNum--;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
492
    uTrace("rsetId:%d is cleaned, refSetNum:%d count:%d", pSet->rsetId, tsRefSetNum, pSet->count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
493 494 495 496 497
  }

  pthread_mutex_unlock(&tsRefMutex);
}