indexCache.c 20.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17 18
#include "indexCache.h"
#include "indexComm.h"
#include "indexUtil.h"
19
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
20
#include "tsched.h"
dengyihao's avatar
dengyihao 已提交
21

22
#define MAX_INDEX_KEY_LEN 256  // test only, change later
dengyihao's avatar
dengyihao 已提交
23

dengyihao's avatar
dengyihao 已提交
24 25
#define MEM_TERM_LIMIT     10 * 10000
#define MEM_THRESHOLD      1024 * 1024
dengyihao's avatar
dengyihao 已提交
26
#define MEM_ESTIMATE_RADIO 1.5
dengyihao's avatar
dengyihao 已提交
27

dengyihao's avatar
dengyihao 已提交
28 29
static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl);
30

dengyihao's avatar
dengyihao 已提交
31 32 33
static void    indexCacheTermDestroy(CacheTerm* ct);
static int32_t indexCacheTermCompare(const void* l, const void* r);
static char*   indexCacheTermGet(const void* pData);
34

35
static MemTable* indexInternalCacheCreate(int8_t type);
36

dengyihao's avatar
dengyihao 已提交
37 38 39 40 41 42 43 44 45
static int32_t cacheSearchTerm(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
dengyihao's avatar
dengyihao 已提交
46
/*comm func of compare, used in (LE/LT/GE/GT compare)*/
dengyihao's avatar
dengyihao 已提交
47
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s,
dengyihao's avatar
dengyihao 已提交
48
                                      RangeType type);
dengyihao's avatar
dengyihao 已提交
49 50 51 52 53 54 55 56 57 58 59 60
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);

static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
                                           RangeType type);
dengyihao's avatar
dengyihao 已提交
61 62 63 64

typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);

dengyihao's avatar
dengyihao 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
  int32_t ret = func(a, b);
  switch (comType) {
    case QUERY_LESS_THAN: {
      if (ret < 0) return MATCH;
    } break;
    case QUERY_LESS_EQUAL: {
      if (ret <= 0) return MATCH;
      break;
    }
    case QUERY_GREATER_THAN: {
      if (ret > 0) return MATCH;
      break;
    }
    case QUERY_GREATER_EQUAL: {
      if (ret >= 0) return MATCH;
    }
  }
  return CONTINUE;
}
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
  __compar_fn_t func = getComparFunc(type, 0);
  return tDoCommpare(func, QUERY_LESS_THAN, a, b);
}
dengyihao's avatar
dengyihao 已提交
89 90 91 92 93 94
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { return MATCH; }
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { return MATCH; }
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return MATCH; }

static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
                                                                    tCompareGreaterThan, tCompareGreaterEqual};
dengyihao's avatar
dengyihao 已提交
95

dengyihao's avatar
dengyihao 已提交
96 97 98 99 100 101
static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
    {cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual,
     cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange},
    {cacheSearchTerm_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON,
     cacheSearchLessThan_JSON, cacheSearchLessEqual_JSON, cacheSearchGreaterThan_JSON, cacheSearchGreaterEqual_JSON,
     cacheSearchRange_JSON}};
dengyihao's avatar
dengyihao 已提交
102

dengyihao's avatar
dengyihao 已提交
103 104 105
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);

dengyihao's avatar
dengyihao 已提交
106
static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
107 108 109
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
110 111 112 113
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;
  CacheTerm   ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
  CacheTerm*  pCt = &ct;
dengyihao's avatar
dengyihao 已提交
114

dengyihao's avatar
dengyihao 已提交
115
  char* key = indexCacheTermGet(&ct);
dengyihao's avatar
dengyihao 已提交
116 117 118 119 120 121 122 123

  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
124
    if (0 == strcmp(c->colVal, pCt->colVal)) {
dengyihao's avatar
dengyihao 已提交
125 126 127 128 129 130 131 132 133 134 135 136
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else {
      break;
    }
  }
  tSkipListDestroyIter(iter);
dengyihao's avatar
dengyihao 已提交
137 138
  return 0;
}
dengyihao's avatar
dengyihao 已提交
139
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
140 141 142
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
143
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
144 145 146
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
147
static int32_t cacheSearchRegex(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
148 149 150
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
151
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
dengyihao's avatar
dengyihao 已提交
152 153 154 155 156 157
                                      RangeType type) {
  if (cache == NULL) {
    return 0;
  }
  _cache_range_compare cmpFn = rangeCompare[type];

dengyihao's avatar
dengyihao 已提交
158 159 160 161 162 163 164 165
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;

  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
  pCt->version = atomic_load_32(&pCache->version);

  char* key = indexCacheTermGet(pCt);
dengyihao's avatar
dengyihao 已提交
166 167 168 169 170 171 172 173

  SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
174
    TExeCond   cond = cmpFn(c->colVal, pCt->colVal, pCt->colType);
dengyihao's avatar
dengyihao 已提交
175 176 177 178 179 180 181 182 183
    if (cond == MATCH) {
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else if (cond == CONTINUE) {
dengyihao's avatar
dengyihao 已提交
184
      continue;
dengyihao's avatar
dengyihao 已提交
185 186 187 188
    } else if (cond == BREAK) {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
189
  taosMemoryFree(pCt);
dengyihao's avatar
dengyihao 已提交
190 191 192
  tSkipListDestroyIter(iter);
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
193 194 195 196 197
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, LT);
}
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, LE);
dengyihao's avatar
dengyihao 已提交
198
}
dengyihao's avatar
dengyihao 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, GT);
}
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, GE);
}

static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
217
}
dengyihao's avatar
dengyihao 已提交
218 219
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT);
dengyihao's avatar
dengyihao 已提交
220
}
dengyihao's avatar
dengyihao 已提交
221 222
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, LE);
dengyihao's avatar
dengyihao 已提交
223
}
dengyihao's avatar
dengyihao 已提交
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, GT);
}
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE);
}
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
}

static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
                                           RangeType type) {
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
239 240 241
  // impl later
  return 0;
}
242
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
dengyihao's avatar
dengyihao 已提交
243

dengyihao's avatar
dengyihao 已提交
244
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
wafwerar's avatar
wafwerar 已提交
245
  IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache));
dengyihao's avatar
dengyihao 已提交
246 247 248
  if (cache == NULL) {
    indexError("failed to create index cache");
    return NULL;
dengyihao's avatar
dengyihao 已提交
249
  };
dengyihao's avatar
dengyihao 已提交
250

dengyihao's avatar
dengyihao 已提交
251
  cache->mem = indexInternalCacheCreate(type);
dengyihao's avatar
dengyihao 已提交
252
  cache->mem->pCache = cache;
dengyihao's avatar
dengyihao 已提交
253
  cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
dengyihao's avatar
dengyihao 已提交
254 255 256
  cache->type = type;
  cache->index = idx;
  cache->version = 0;
dengyihao's avatar
dengyihao 已提交
257
  cache->suid = suid;
dengyihao's avatar
dengyihao 已提交
258
  cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
259

wafwerar's avatar
wafwerar 已提交
260 261
  taosThreadMutexInit(&cache->mtx, NULL);
  taosThreadCondInit(&cache->finished, NULL);
dengyihao's avatar
dengyihao 已提交
262

dengyihao's avatar
dengyihao 已提交
263
  indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
264 265
  return cache;
}
266
void indexCacheDebug(IndexCache* cache) {
267 268
  MemTable* tbl = NULL;

wafwerar's avatar
wafwerar 已提交
269
  taosThreadMutexLock(&cache->mtx);
270 271
  tbl = cache->mem;
  indexMemRef(tbl);
wafwerar's avatar
wafwerar 已提交
272
  taosThreadMutexUnlock(&cache->mtx);
273

dengyihao's avatar
dengyihao 已提交
274 275 276 277 278 279 280 281 282 283
  {
    SSkipList*         slt = tbl->mem;
    SSkipListIterator* iter = tSkipListCreateIter(slt);
    while (tSkipListIterNext(iter)) {
      SSkipListNode* node = tSkipListIterGet(iter);
      CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (ct != NULL) {
        // TODO, add more debug info
        indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
      }
284
    }
dengyihao's avatar
dengyihao 已提交
285 286 287
    tSkipListDestroyIter(iter);

    indexMemUnRef(tbl);
288
  }
289

dengyihao's avatar
dengyihao 已提交
290
  {
wafwerar's avatar
wafwerar 已提交
291
    taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
292 293
    tbl = cache->imm;
    indexMemRef(tbl);
wafwerar's avatar
wafwerar 已提交
294
    taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
    if (tbl != NULL) {
      SSkipList*         slt = tbl->mem;
      SSkipListIterator* iter = tSkipListCreateIter(slt);
      while (tSkipListIterNext(iter)) {
        SSkipListNode* node = tSkipListIterGet(iter);
        CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
        if (ct != NULL) {
          // TODO, add more debug info
          indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
        }
      }
      tSkipListDestroyIter(iter);
    }

    indexMemUnRef(tbl);
  }
311
}
dengyihao's avatar
dengyihao 已提交
312

dengyihao's avatar
dengyihao 已提交
313 314 315 316 317
void indexCacheDestroySkiplist(SSkipList* slt) {
  SSkipListIterator* iter = tSkipListCreateIter(slt);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
318
    if (ct != NULL) {
wafwerar's avatar
wafwerar 已提交
319 320
      taosMemoryFree(ct->colVal);
      taosMemoryFree(ct);
dengyihao's avatar
dengyihao 已提交
321
    }
dengyihao's avatar
dengyihao 已提交
322 323
  }
  tSkipListDestroyIter(iter);
324
  tSkipListDestroy(slt);
dengyihao's avatar
dengyihao 已提交
325
}
dengyihao's avatar
dengyihao 已提交
326
void indexCacheDestroyImm(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
327 328 329
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
330

331
  MemTable* tbl = NULL;
wafwerar's avatar
wafwerar 已提交
332
  taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
333

334
  tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
335
  cache->imm = NULL;  // or throw int bg thread
wafwerar's avatar
wafwerar 已提交
336
  taosThreadCondBroadcast(&cache->finished);
dengyihao's avatar
dengyihao 已提交
337

wafwerar's avatar
wafwerar 已提交
338
  taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
339 340

  indexMemUnRef(tbl);
341
  indexMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
342
}
dengyihao's avatar
dengyihao 已提交
343 344
void indexCacheDestroy(void* cache) {
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
345 346 347
  if (pCache == NULL) {
    return;
  }
348 349
  indexMemUnRef(pCache->mem);
  indexMemUnRef(pCache->imm);
wafwerar's avatar
wafwerar 已提交
350
  taosMemoryFree(pCache->colName);
dengyihao's avatar
dengyihao 已提交
351

wafwerar's avatar
wafwerar 已提交
352 353
  taosThreadMutexDestroy(&pCache->mtx);
  taosThreadCondDestroy(&pCache->finished);
dengyihao's avatar
dengyihao 已提交
354

wafwerar's avatar
wafwerar 已提交
355
  taosMemoryFree(pCache);
dengyihao's avatar
dengyihao 已提交
356 357
}

dengyihao's avatar
dengyihao 已提交
358
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
wafwerar's avatar
wafwerar 已提交
359
  Iterate* iiter = taosMemoryCalloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
360 361 362
  if (iiter == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
363

wafwerar's avatar
wafwerar 已提交
364
  taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
365 366

  indexMemRef(cache->imm);
dengyihao's avatar
dengyihao 已提交
367

368
  MemTable* tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
369
  iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
370
  iiter->val.colVal = NULL;
371
  iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
dengyihao's avatar
dengyihao 已提交
372 373 374
  iiter->next = indexCacheIteratorNext;
  iiter->getValue = indexCacheIteratorGetValue;

wafwerar's avatar
wafwerar 已提交
375
  taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
376

dengyihao's avatar
dengyihao 已提交
377 378 379
  return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
380 381 382
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
383 384
  tSkipListDestroyIter(iter->iter);
  iterateValueDestroy(&iter->val, true);
wafwerar's avatar
wafwerar 已提交
385
  taosMemoryFree(iter);
dengyihao's avatar
dengyihao 已提交
386
}
dengyihao's avatar
dengyihao 已提交
387 388 389 390 391 392 393 394 395

int indexCacheSchedToMerge(IndexCache* pCache) {
  SSchedMsg schedMsg = {0};
  schedMsg.fp = doMergeWork;
  schedMsg.ahandle = pCache;
  schedMsg.thandle = NULL;
  schedMsg.msg = NULL;

  taosScheduleTask(indexQhandle, &schedMsg);
396 397

  return 0;
dengyihao's avatar
dengyihao 已提交
398
}
399

dengyihao's avatar
dengyihao 已提交
400 401
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
  while (true) {
dengyihao's avatar
dengyihao 已提交
402
    if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
dengyihao's avatar
dengyihao 已提交
403 404 405
      break;
    } else if (cache->imm != NULL) {
      // TODO: wake up by condition variable
wafwerar's avatar
wafwerar 已提交
406
      taosThreadCondWait(&cache->finished, &cache->mtx);
dengyihao's avatar
dengyihao 已提交
407
    } else {
dengyihao's avatar
dengyihao 已提交
408
      indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
409 410
      cache->imm = cache->mem;
      cache->mem = indexInternalCacheCreate(cache->type);
dengyihao's avatar
dengyihao 已提交
411
      cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
412 413 414 415 416 417
      // sched to merge
      // unref cache in bgwork
      indexCacheSchedToMerge(cache);
    }
  }
}
dengyihao's avatar
dengyihao 已提交
418
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
419 420 421
  if (cache == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
422
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
423

dengyihao's avatar
dengyihao 已提交
424
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
425
  indexCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
426
  // encode data
wafwerar's avatar
wafwerar 已提交
427
  CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm));
dengyihao's avatar
dengyihao 已提交
428 429 430
  if (cache == NULL) {
    return -1;
  }
431 432
  // set up key
  ct->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
433
  if (hasJson) {
dengyihao's avatar
add UT  
dengyihao 已提交
434
    ct->colVal = indexPackJsonData(term);
dengyihao's avatar
dengyihao 已提交
435
  } else {
wafwerar's avatar
wafwerar 已提交
436
    ct->colVal = (char*)taosMemoryCalloc(1, sizeof(char) * (term->nColVal + 1));
dengyihao's avatar
dengyihao 已提交
437 438
    memcpy(ct->colVal, term->colVal, term->nColVal);
  }
dengyihao's avatar
dengyihao 已提交
439 440
  ct->version = atomic_add_fetch_32(&pCache->version, 1);
  // set value
441 442
  ct->uid = uid;
  ct->operaType = term->operType;
dengyihao's avatar
dengyihao 已提交
443
  // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
444
  int64_t estimate = sizeof(ct) + strlen(ct->colVal);
dengyihao's avatar
dengyihao 已提交
445

wafwerar's avatar
wafwerar 已提交
446
  taosThreadMutexLock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
447
  pCache->occupiedMem += estimate;
dengyihao's avatar
dengyihao 已提交
448
  indexCacheMakeRoomForWrite(pCache);
449 450 451 452 453
  MemTable* tbl = pCache->mem;
  indexMemRef(tbl);
  tSkipListPut(tbl->mem, (char*)ct);
  indexMemUnRef(tbl);

wafwerar's avatar
wafwerar 已提交
454
  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
455 456

  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
457
  return 0;
dengyihao's avatar
dengyihao 已提交
458
  // encode end
dengyihao's avatar
dengyihao 已提交
459
}
dengyihao's avatar
dengyihao 已提交
460
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
dengyihao's avatar
dengyihao 已提交
461
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
462
  return 0;
dengyihao's avatar
dengyihao 已提交
463
}
464

dengyihao's avatar
dengyihao 已提交
465
static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
466 467 468
  if (mem == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
469 470 471 472 473 474 475 476 477

  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;

  if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
    return cacheSearch[1][qtype](mem, term, tr, s);
  } else {
    return cacheSearch[0][qtype](mem, term, tr, s);
  }
dengyihao's avatar
dengyihao 已提交
478
}
dengyihao's avatar
dengyihao 已提交
479
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
dengyihao's avatar
add UT  
dengyihao 已提交
480
  int64_t st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
481 482 483
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
484 485 486
  IndexCache* pCache = cache;

  MemTable *mem = NULL, *imm = NULL;
wafwerar's avatar
wafwerar 已提交
487
  taosThreadMutexLock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
488 489 490 491
  mem = pCache->mem;
  imm = pCache->imm;
  indexMemRef(mem);
  indexMemRef(imm);
wafwerar's avatar
wafwerar 已提交
492
  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
493

dengyihao's avatar
dengyihao 已提交
494 495
  // SIndexTerm*     term = query->term;
  // EIndexQueryType qtype = query->qType;
dengyihao's avatar
dengyihao 已提交
496

dengyihao's avatar
dengyihao 已提交
497 498 499 500 501 502
  // bool  isJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
  // char* p = term->colVal;
  // if (isJson) {
  //  p = indexPackJsonData(term);
  //}
  // CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)};
dengyihao's avatar
dengyihao 已提交
503

dengyihao's avatar
dengyihao 已提交
504
  int ret = indexQueryMem(mem, query, result, s);
dengyihao's avatar
dengyihao 已提交
505 506
  if (ret == 0 && *s != kTypeDeletion) {
    // continue search in imm
dengyihao's avatar
dengyihao 已提交
507
    ret = indexQueryMem(imm, query, result, s);
dengyihao's avatar
dengyihao 已提交
508
  }
dengyihao's avatar
dengyihao 已提交
509 510 511
  // if (isJson) {
  //  taosMemoryFreeClear(p);
  //}
dengyihao's avatar
dengyihao 已提交
512

513 514
  indexMemUnRef(mem);
  indexMemUnRef(imm);
dengyihao's avatar
add UT  
dengyihao 已提交
515
  indexInfo("cache search, time cost %" PRIu64 "us", taosGetTimestampUs() - st);
dengyihao's avatar
dengyihao 已提交
516 517

  return ret;
dengyihao's avatar
dengyihao 已提交
518
}
dengyihao's avatar
dengyihao 已提交
519 520

void indexCacheRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
521 522 523
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
524 525 526 527
  int ref = T_REF_INC(cache);
  UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
528 529 530
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
531
  int ref = T_REF_DEC(cache);
dengyihao's avatar
dengyihao 已提交
532 533 534
  if (ref == 0) {
    indexCacheDestroy(cache);
  }
dengyihao's avatar
dengyihao 已提交
535
}
536 537

void indexMemRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
538 539 540
  if (tbl == NULL) {
    return;
  }
541 542 543 544
  int ref = T_REF_INC(tbl);
  UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
545 546 547
  if (tbl == NULL) {
    return;
  }
548 549 550 551
  int ref = T_REF_DEC(tbl);
  if (ref == 0) {
    SSkipList* slt = tbl->mem;
    indexCacheDestroySkiplist(slt);
wafwerar's avatar
wafwerar 已提交
552
    taosMemoryFree(tbl);
553 554 555
  }
}

dengyihao's avatar
dengyihao 已提交
556
static void indexCacheTermDestroy(CacheTerm* ct) {
dengyihao's avatar
dengyihao 已提交
557 558 559
  if (ct == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
560 561
  taosMemoryFree(ct->colVal);
  taosMemoryFree(ct);
562
}
dengyihao's avatar
dengyihao 已提交
563
static char* indexCacheTermGet(const void* pData) {
564 565 566
  CacheTerm* p = (CacheTerm*)pData;
  return (char*)p;
}
dengyihao's avatar
dengyihao 已提交
567
static int32_t indexCacheTermCompare(const void* l, const void* r) {
568 569 570
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;
  // compare colVal
dengyihao's avatar
dengyihao 已提交
571
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
dengyihao's avatar
dengyihao 已提交
572 573 574
  if (cmp == 0) {
    return rt->version - lt->version;
  }
dengyihao's avatar
dengyihao 已提交
575
  return cmp;
576 577 578
}

static MemTable* indexInternalCacheCreate(int8_t type) {
dengyihao's avatar
dengyihao 已提交
579 580
  type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;

wafwerar's avatar
wafwerar 已提交
581
  MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
582
  indexMemRef(tbl);
dengyihao's avatar
dengyihao 已提交
583
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
dengyihao's avatar
dengyihao 已提交
584 585
    tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY,
                               indexCacheTermGet);
586 587 588 589 590 591 592
  }
  return tbl;
}

static void doMergeWork(SSchedMsg* msg) {
  IndexCache* pCache = msg->ahandle;
  SIndex*     sidx = (SIndex*)pCache->index;
dengyihao's avatar
dengyihao 已提交
593
  indexFlushCacheToTFile(sidx, pCache);
594 595 596
}
static bool indexCacheIteratorNext(Iterate* itera) {
  SSkipListIterator* iter = itera->iter;
dengyihao's avatar
dengyihao 已提交
597 598 599
  if (iter == NULL) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
600
  IterateValue* iv = &itera->val;
601 602 603 604 605 606 607 608
  iterateValueDestroy(iv, false);

  bool next = tSkipListIterNext(iter);
  if (next) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);

    iv->type = ct->operaType;
dengyihao's avatar
dengyihao 已提交
609
    iv->ver = ct->version;
dengyihao's avatar
dengyihao 已提交
610
    iv->colVal = tstrdup(ct->colVal);
611 612 613 614 615 616

    taosArrayPush(iv->val, &ct->uid);
  }
  return next;
}

dengyihao's avatar
dengyihao 已提交
617 618 619 620
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
  // opt later
  return &iter->val;
}