indexCache.c 13.9 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17 18
#include "indexCache.h"
#include "indexComm.h"
#include "indexUtil.h"
19
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
20
#include "tsched.h"
dengyihao's avatar
dengyihao 已提交
21

22
#define MAX_INDEX_KEY_LEN 256  // test only, change later
dengyihao's avatar
dengyihao 已提交
23

dengyihao's avatar
dengyihao 已提交
24 25
#define MEM_TERM_LIMIT     10 * 10000
#define MEM_THRESHOLD      1024 * 1024
dengyihao's avatar
dengyihao 已提交
26
#define MEM_ESTIMATE_RADIO 1.5
dengyihao's avatar
dengyihao 已提交
27

dengyihao's avatar
dengyihao 已提交
28 29
static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl);
30

dengyihao's avatar
dengyihao 已提交
31 32 33
static void    indexCacheTermDestroy(CacheTerm* ct);
static int32_t indexCacheTermCompare(const void* l, const void* r);
static char*   indexCacheTermGet(const void* pData);
34

35
static MemTable* indexInternalCacheCreate(int8_t type);
36

dengyihao's avatar
dengyihao 已提交
37 38 39 40
static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
dengyihao's avatar
dengyihao 已提交
41 42 43 44
static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
dengyihao's avatar
dengyihao 已提交
45 46 47
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);

static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
dengyihao's avatar
dengyihao 已提交
48 49
    cacheSearchTerm,      cacheSearchPrefix,      cacheSearchSuffix,       cacheSearchRegex, cacheSearchLessThan,
    cacheSearchLessEqual, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange};
dengyihao's avatar
dengyihao 已提交
50

dengyihao's avatar
dengyihao 已提交
51 52 53
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);

dengyihao's avatar
dengyihao 已提交
54
static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
  if (cache == NULL) {
    return 0;
  }

  MemTable* mem = cache;
  char*     key = indexCacheTermGet(ct);

  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
    if (0 == strcmp(c->colVal, ct->colVal)) {
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else {
      break;
    }
  }
  tSkipListDestroyIter(iter);
dengyihao's avatar
dengyihao 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95
  return 0;
}
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
  // impl later
  return 0;
}
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
  // impl later
  return 0;
}
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
  // impl later
  return 0;
}
static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
  // impl later
  return 0;
}
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
  // impl later
  return 0;
}
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
112 113 114 115
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
  // impl later
  return 0;
}
116
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
dengyihao's avatar
dengyihao 已提交
117

dengyihao's avatar
dengyihao 已提交
118
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
wafwerar's avatar
wafwerar 已提交
119
  IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache));
dengyihao's avatar
dengyihao 已提交
120 121 122
  if (cache == NULL) {
    indexError("failed to create index cache");
    return NULL;
dengyihao's avatar
dengyihao 已提交
123
  };
dengyihao's avatar
dengyihao 已提交
124

dengyihao's avatar
dengyihao 已提交
125
  cache->mem = indexInternalCacheCreate(type);
dengyihao's avatar
dengyihao 已提交
126
  cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
dengyihao's avatar
dengyihao 已提交
127 128 129
  cache->type = type;
  cache->index = idx;
  cache->version = 0;
dengyihao's avatar
dengyihao 已提交
130
  cache->suid = suid;
dengyihao's avatar
dengyihao 已提交
131
  cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
132

wafwerar's avatar
wafwerar 已提交
133 134
  taosThreadMutexInit(&cache->mtx, NULL);
  taosThreadCondInit(&cache->finished, NULL);
dengyihao's avatar
dengyihao 已提交
135

dengyihao's avatar
dengyihao 已提交
136
  indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
137 138
  return cache;
}
139
void indexCacheDebug(IndexCache* cache) {
140 141
  MemTable* tbl = NULL;

wafwerar's avatar
wafwerar 已提交
142
  taosThreadMutexLock(&cache->mtx);
143 144
  tbl = cache->mem;
  indexMemRef(tbl);
wafwerar's avatar
wafwerar 已提交
145
  taosThreadMutexUnlock(&cache->mtx);
146

dengyihao's avatar
dengyihao 已提交
147 148 149 150 151 152 153 154 155 156
  {
    SSkipList*         slt = tbl->mem;
    SSkipListIterator* iter = tSkipListCreateIter(slt);
    while (tSkipListIterNext(iter)) {
      SSkipListNode* node = tSkipListIterGet(iter);
      CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (ct != NULL) {
        // TODO, add more debug info
        indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
      }
157
    }
dengyihao's avatar
dengyihao 已提交
158 159 160
    tSkipListDestroyIter(iter);

    indexMemUnRef(tbl);
161
  }
162

dengyihao's avatar
dengyihao 已提交
163
  {
wafwerar's avatar
wafwerar 已提交
164
    taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
165 166
    tbl = cache->imm;
    indexMemRef(tbl);
wafwerar's avatar
wafwerar 已提交
167
    taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
    if (tbl != NULL) {
      SSkipList*         slt = tbl->mem;
      SSkipListIterator* iter = tSkipListCreateIter(slt);
      while (tSkipListIterNext(iter)) {
        SSkipListNode* node = tSkipListIterGet(iter);
        CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
        if (ct != NULL) {
          // TODO, add more debug info
          indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
        }
      }
      tSkipListDestroyIter(iter);
    }

    indexMemUnRef(tbl);
  }
184
}
dengyihao's avatar
dengyihao 已提交
185

dengyihao's avatar
dengyihao 已提交
186 187 188 189 190
void indexCacheDestroySkiplist(SSkipList* slt) {
  SSkipListIterator* iter = tSkipListCreateIter(slt);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
191
    if (ct != NULL) {
wafwerar's avatar
wafwerar 已提交
192 193
      taosMemoryFree(ct->colVal);
      taosMemoryFree(ct);
dengyihao's avatar
dengyihao 已提交
194
    }
dengyihao's avatar
dengyihao 已提交
195 196
  }
  tSkipListDestroyIter(iter);
197
  tSkipListDestroy(slt);
dengyihao's avatar
dengyihao 已提交
198
}
dengyihao's avatar
dengyihao 已提交
199
void indexCacheDestroyImm(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
200 201 202
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
203

204
  MemTable* tbl = NULL;
wafwerar's avatar
wafwerar 已提交
205
  taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
206

207
  tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
208
  cache->imm = NULL;  // or throw int bg thread
wafwerar's avatar
wafwerar 已提交
209
  taosThreadCondBroadcast(&cache->finished);
dengyihao's avatar
dengyihao 已提交
210

wafwerar's avatar
wafwerar 已提交
211
  taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
212 213

  indexMemUnRef(tbl);
214
  indexMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
215
}
dengyihao's avatar
dengyihao 已提交
216 217
void indexCacheDestroy(void* cache) {
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
218 219 220
  if (pCache == NULL) {
    return;
  }
221 222
  indexMemUnRef(pCache->mem);
  indexMemUnRef(pCache->imm);
wafwerar's avatar
wafwerar 已提交
223
  taosMemoryFree(pCache->colName);
dengyihao's avatar
dengyihao 已提交
224

wafwerar's avatar
wafwerar 已提交
225 226
  taosThreadMutexDestroy(&pCache->mtx);
  taosThreadCondDestroy(&pCache->finished);
dengyihao's avatar
dengyihao 已提交
227

wafwerar's avatar
wafwerar 已提交
228
  taosMemoryFree(pCache);
dengyihao's avatar
dengyihao 已提交
229 230
}

dengyihao's avatar
dengyihao 已提交
231
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
wafwerar's avatar
wafwerar 已提交
232
  Iterate* iiter = taosMemoryCalloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
233 234 235
  if (iiter == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
236

wafwerar's avatar
wafwerar 已提交
237
  taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
238 239

  indexMemRef(cache->imm);
dengyihao's avatar
dengyihao 已提交
240

241
  MemTable* tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
242
  iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
243
  iiter->val.colVal = NULL;
244
  iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
dengyihao's avatar
dengyihao 已提交
245 246 247
  iiter->next = indexCacheIteratorNext;
  iiter->getValue = indexCacheIteratorGetValue;

wafwerar's avatar
wafwerar 已提交
248
  taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
249

dengyihao's avatar
dengyihao 已提交
250 251 252
  return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
253 254 255
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
256 257
  tSkipListDestroyIter(iter->iter);
  iterateValueDestroy(&iter->val, true);
wafwerar's avatar
wafwerar 已提交
258
  taosMemoryFree(iter);
dengyihao's avatar
dengyihao 已提交
259
}
dengyihao's avatar
dengyihao 已提交
260 261 262 263 264 265 266 267 268

int indexCacheSchedToMerge(IndexCache* pCache) {
  SSchedMsg schedMsg = {0};
  schedMsg.fp = doMergeWork;
  schedMsg.ahandle = pCache;
  schedMsg.thandle = NULL;
  schedMsg.msg = NULL;

  taosScheduleTask(indexQhandle, &schedMsg);
269 270

  return 0;
dengyihao's avatar
dengyihao 已提交
271
}
272

dengyihao's avatar
dengyihao 已提交
273 274
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
  while (true) {
dengyihao's avatar
dengyihao 已提交
275
    if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
dengyihao's avatar
dengyihao 已提交
276 277 278
      break;
    } else if (cache->imm != NULL) {
      // TODO: wake up by condition variable
wafwerar's avatar
wafwerar 已提交
279
      taosThreadCondWait(&cache->finished, &cache->mtx);
dengyihao's avatar
dengyihao 已提交
280
    } else {
dengyihao's avatar
dengyihao 已提交
281
      indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
282 283
      cache->imm = cache->mem;
      cache->mem = indexInternalCacheCreate(cache->type);
dengyihao's avatar
dengyihao 已提交
284
      cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
285 286 287 288 289 290
      // sched to merge
      // unref cache in bgwork
      indexCacheSchedToMerge(cache);
    }
  }
}
dengyihao's avatar
dengyihao 已提交
291
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
292 293 294
  if (cache == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
295
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
296

dengyihao's avatar
dengyihao 已提交
297
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
298
  indexCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
299
  // encode data
wafwerar's avatar
wafwerar 已提交
300
  CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm));
dengyihao's avatar
dengyihao 已提交
301 302 303
  if (cache == NULL) {
    return -1;
  }
304 305
  // set up key
  ct->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
306
  if (hasJson) {
dengyihao's avatar
add UT  
dengyihao 已提交
307
    ct->colVal = indexPackJsonData(term);
dengyihao's avatar
dengyihao 已提交
308
  } else {
wafwerar's avatar
wafwerar 已提交
309
    ct->colVal = (char*)taosMemoryCalloc(1, sizeof(char) * (term->nColVal + 1));
dengyihao's avatar
dengyihao 已提交
310 311
    memcpy(ct->colVal, term->colVal, term->nColVal);
  }
dengyihao's avatar
dengyihao 已提交
312 313
  ct->version = atomic_add_fetch_32(&pCache->version, 1);
  // set value
314 315
  ct->uid = uid;
  ct->operaType = term->operType;
dengyihao's avatar
dengyihao 已提交
316
  // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
317
  int64_t estimate = sizeof(ct) + strlen(ct->colVal);
dengyihao's avatar
dengyihao 已提交
318

wafwerar's avatar
wafwerar 已提交
319
  taosThreadMutexLock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
320
  pCache->occupiedMem += estimate;
dengyihao's avatar
dengyihao 已提交
321
  indexCacheMakeRoomForWrite(pCache);
322 323 324 325 326
  MemTable* tbl = pCache->mem;
  indexMemRef(tbl);
  tSkipListPut(tbl->mem, (char*)ct);
  indexMemUnRef(tbl);

wafwerar's avatar
wafwerar 已提交
327
  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
328 329

  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
330
  return 0;
dengyihao's avatar
dengyihao 已提交
331
  // encode end
dengyihao's avatar
dengyihao 已提交
332
}
dengyihao's avatar
dengyihao 已提交
333
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
dengyihao's avatar
dengyihao 已提交
334
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
335
  return 0;
dengyihao's avatar
dengyihao 已提交
336
}
337

dengyihao's avatar
dengyihao 已提交
338
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
339 340 341
  if (mem == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
342
  return cacheSearch[qtype](mem, ct, tr, s);
dengyihao's avatar
dengyihao 已提交
343
}
dengyihao's avatar
dengyihao 已提交
344
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
dengyihao's avatar
add UT  
dengyihao 已提交
345
  int64_t st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
346 347 348
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
349 350 351
  IndexCache* pCache = cache;

  MemTable *mem = NULL, *imm = NULL;
wafwerar's avatar
wafwerar 已提交
352
  taosThreadMutexLock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
353 354 355 356
  mem = pCache->mem;
  imm = pCache->imm;
  indexMemRef(mem);
  indexMemRef(imm);
wafwerar's avatar
wafwerar 已提交
357
  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
358 359 360

  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
dengyihao's avatar
dengyihao 已提交
361 362 363 364

  bool  hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
  char* p = term->colVal;
  if (hasJson) {
dengyihao's avatar
add UT  
dengyihao 已提交
365
    p = indexPackJsonData(term);
dengyihao's avatar
dengyihao 已提交
366 367
  }
  CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)};
dengyihao's avatar
dengyihao 已提交
368 369 370 371 372

  int ret = indexQueryMem(mem, &ct, qtype, result, s);
  if (ret == 0 && *s != kTypeDeletion) {
    // continue search in imm
    ret = indexQueryMem(imm, &ct, qtype, result, s);
dengyihao's avatar
dengyihao 已提交
373
  }
dengyihao's avatar
add UT  
dengyihao 已提交
374

dengyihao's avatar
dengyihao 已提交
375
  if (hasJson) {
wafwerar's avatar
wafwerar 已提交
376
    taosMemoryFreeClear(p);
dengyihao's avatar
dengyihao 已提交
377
  }
dengyihao's avatar
dengyihao 已提交
378

379 380
  indexMemUnRef(mem);
  indexMemUnRef(imm);
dengyihao's avatar
add UT  
dengyihao 已提交
381
  indexInfo("cache search, time cost %" PRIu64 "us", taosGetTimestampUs() - st);
dengyihao's avatar
dengyihao 已提交
382 383

  return ret;
dengyihao's avatar
dengyihao 已提交
384
}
dengyihao's avatar
dengyihao 已提交
385 386

void indexCacheRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
387 388 389
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
390 391 392 393
  int ref = T_REF_INC(cache);
  UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
394 395 396
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
397
  int ref = T_REF_DEC(cache);
dengyihao's avatar
dengyihao 已提交
398 399 400
  if (ref == 0) {
    indexCacheDestroy(cache);
  }
dengyihao's avatar
dengyihao 已提交
401
}
402 403

void indexMemRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
404 405 406
  if (tbl == NULL) {
    return;
  }
407 408 409 410
  int ref = T_REF_INC(tbl);
  UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
411 412 413
  if (tbl == NULL) {
    return;
  }
414 415 416 417
  int ref = T_REF_DEC(tbl);
  if (ref == 0) {
    SSkipList* slt = tbl->mem;
    indexCacheDestroySkiplist(slt);
wafwerar's avatar
wafwerar 已提交
418
    taosMemoryFree(tbl);
419 420 421
  }
}

dengyihao's avatar
dengyihao 已提交
422
static void indexCacheTermDestroy(CacheTerm* ct) {
dengyihao's avatar
dengyihao 已提交
423 424 425
  if (ct == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
426 427
  taosMemoryFree(ct->colVal);
  taosMemoryFree(ct);
428
}
dengyihao's avatar
dengyihao 已提交
429
static char* indexCacheTermGet(const void* pData) {
430 431 432
  CacheTerm* p = (CacheTerm*)pData;
  return (char*)p;
}
dengyihao's avatar
dengyihao 已提交
433
static int32_t indexCacheTermCompare(const void* l, const void* r) {
434 435 436
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;
  // compare colVal
dengyihao's avatar
dengyihao 已提交
437
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
dengyihao's avatar
dengyihao 已提交
438 439 440
  if (cmp == 0) {
    return rt->version - lt->version;
  }
dengyihao's avatar
dengyihao 已提交
441
  return cmp;
442 443 444
}

static MemTable* indexInternalCacheCreate(int8_t type) {
dengyihao's avatar
dengyihao 已提交
445 446
  type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;

wafwerar's avatar
wafwerar 已提交
447
  MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
448
  indexMemRef(tbl);
dengyihao's avatar
dengyihao 已提交
449
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
dengyihao's avatar
dengyihao 已提交
450 451
    tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY,
                               indexCacheTermGet);
452 453 454 455 456 457 458
  }
  return tbl;
}

static void doMergeWork(SSchedMsg* msg) {
  IndexCache* pCache = msg->ahandle;
  SIndex*     sidx = (SIndex*)pCache->index;
dengyihao's avatar
dengyihao 已提交
459
  indexFlushCacheToTFile(sidx, pCache);
460 461 462
}
static bool indexCacheIteratorNext(Iterate* itera) {
  SSkipListIterator* iter = itera->iter;
dengyihao's avatar
dengyihao 已提交
463 464 465
  if (iter == NULL) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
466
  IterateValue* iv = &itera->val;
467 468 469 470 471 472 473 474
  iterateValueDestroy(iv, false);

  bool next = tSkipListIterNext(iter);
  if (next) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);

    iv->type = ct->operaType;
dengyihao's avatar
dengyihao 已提交
475
    iv->ver = ct->version;
dengyihao's avatar
dengyihao 已提交
476
    iv->colVal = tstrdup(ct->colVal);
477 478 479 480 481 482

    taosArrayPush(iv->val, &ct->uid);
  }
  return next;
}

dengyihao's avatar
dengyihao 已提交
483 484 485 486
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
  // opt later
  return &iter->val;
}