indexCache.c 23.5 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17 18
#include "indexCache.h"
#include "indexComm.h"
#include "indexUtil.h"
19
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
20
#include "tsched.h"
dengyihao's avatar
dengyihao 已提交
21

22
#define MAX_INDEX_KEY_LEN 256  // test only, change later
dengyihao's avatar
dengyihao 已提交
23

dengyihao's avatar
dengyihao 已提交
24
#define MEM_TERM_LIMIT     10 * 10000
dengyihao's avatar
dengyihao 已提交
25
#define MEM_THRESHOLD      64 * 1024
dengyihao's avatar
dengyihao 已提交
26
#define MEM_SIGNAL_QUIT    MEM_THRESHOLD * 20
dengyihao's avatar
dengyihao 已提交
27
#define MEM_ESTIMATE_RADIO 1.5
dengyihao's avatar
dengyihao 已提交
28

dengyihao's avatar
dengyihao 已提交
29 30
static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl);
31

dengyihao's avatar
dengyihao 已提交
32 33
static void    indexCacheTermDestroy(CacheTerm* ct);
static int32_t indexCacheTermCompare(const void* l, const void* r);
34
static int32_t indexCacheJsonTermCompare(const void* l, const void* r);
dengyihao's avatar
dengyihao 已提交
35
static char*   indexCacheTermGet(const void* pData);
36

37
static MemTable* indexInternalCacheCreate(int8_t type);
38

dengyihao's avatar
dengyihao 已提交
39 40 41 42 43 44 45 46 47
static int32_t cacheSearchTerm(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
dengyihao's avatar
dengyihao 已提交
48
/*comm func of compare, used in (LE/LT/GE/GT compare)*/
dengyihao's avatar
dengyihao 已提交
49
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s,
dengyihao's avatar
dengyihao 已提交
50
                                      RangeType type);
dengyihao's avatar
dengyihao 已提交
51 52 53 54 55 56 57 58 59 60 61 62
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);

static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
                                           RangeType type);
dengyihao's avatar
dengyihao 已提交
63

dengyihao's avatar
dengyihao 已提交
64 65 66 67 68 69
static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
    {cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual,
     cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange},
    {cacheSearchTerm_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON,
     cacheSearchLessThan_JSON, cacheSearchLessEqual_JSON, cacheSearchGreaterThan_JSON, cacheSearchGreaterEqual_JSON,
     cacheSearchRange_JSON}};
dengyihao's avatar
dengyihao 已提交
70

dengyihao's avatar
dengyihao 已提交
71 72 73
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);

dengyihao's avatar
dengyihao 已提交
74
static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
75 76 77
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
78 79
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;
dengyihao's avatar
dengyihao 已提交
80

dengyihao's avatar
dengyihao 已提交
81 82 83 84 85
  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
  pCt->version = atomic_load_32(&pCache->version);

  char* key = indexCacheTermGet(pCt);
dengyihao's avatar
dengyihao 已提交
86 87 88 89 90 91 92 93

  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
94
    if (0 == strcmp(c->colVal, pCt->colVal)) {
dengyihao's avatar
dengyihao 已提交
95 96 97 98 99 100 101 102 103 104 105
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
106 107

  taosMemoryFree(pCt);
dengyihao's avatar
dengyihao 已提交
108
  tSkipListDestroyIter(iter);
dengyihao's avatar
dengyihao 已提交
109 110
  return 0;
}
dengyihao's avatar
dengyihao 已提交
111
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
112 113 114
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
115
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
116 117 118
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
119
static int32_t cacheSearchRegex(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
120 121 122
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
123
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
dengyihao's avatar
dengyihao 已提交
124 125 126 127
                                      RangeType type) {
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
128

129
  _cache_range_compare cmpFn = indexGetCompare(type);
dengyihao's avatar
dengyihao 已提交
130

dengyihao's avatar
dengyihao 已提交
131 132 133 134 135 136 137 138
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;

  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
  pCt->version = atomic_load_32(&pCache->version);

  char* key = indexCacheTermGet(pCt);
dengyihao's avatar
dengyihao 已提交
139 140 141 142 143 144 145 146

  SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
147
    TExeCond   cond = cmpFn(c->colVal, pCt->colVal, pCt->colType);
dengyihao's avatar
dengyihao 已提交
148 149 150 151 152 153 154 155 156
    if (cond == MATCH) {
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else if (cond == CONTINUE) {
dengyihao's avatar
dengyihao 已提交
157
      continue;
dengyihao's avatar
dengyihao 已提交
158 159 160 161
    } else if (cond == BREAK) {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
162
  taosMemoryFree(pCt);
dengyihao's avatar
dengyihao 已提交
163 164 165
  tSkipListDestroyIter(iter);
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
166 167 168 169 170
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, LT);
}
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, LE);
dengyihao's avatar
dengyihao 已提交
171
}
dengyihao's avatar
dengyihao 已提交
172 173 174 175 176 177 178 179
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, GT);
}
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, GE);
}

static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
  if (cache == NULL) {
    return 0;
  }
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;

  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
  pCt->version = atomic_load_32(&pCache->version);

  char* exBuf = NULL;
  if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
    exBuf = indexPackJsonData(term);
    pCt->colVal = exBuf;
  }
  char* key = indexCacheTermGet(pCt);

  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
204

dengyihao's avatar
dengyihao 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
    if (0 == strcmp(c->colVal, pCt->colVal)) {
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else {
      break;
    }
  }

  taosMemoryFree(pCt);
  taosMemoryFree(exBuf);
  tSkipListDestroyIter(iter);
  return 0;

dengyihao's avatar
dengyihao 已提交
223 224 225 226 227 228 229 230 231 232
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
233
}
dengyihao's avatar
dengyihao 已提交
234 235
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT);
dengyihao's avatar
dengyihao 已提交
236
}
dengyihao's avatar
dengyihao 已提交
237 238
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, LE);
dengyihao's avatar
dengyihao 已提交
239
}
dengyihao's avatar
dengyihao 已提交
240 241 242 243 244 245 246 247 248 249 250 251
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, GT);
}
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE);
}
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
}

static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
                                           RangeType type) {
dengyihao's avatar
dengyihao 已提交
252 253 254
  if (cache == NULL) {
    return 0;
  }
255
  _cache_range_compare cmpFn = indexGetCompare(type);
dengyihao's avatar
dengyihao 已提交
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273

  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;

  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
  pCt->version = atomic_load_32(&pCache->version);

  int8_t dType = INDEX_TYPE_GET_TYPE(term->colType);
  int    skip = 0;
  char*  exBuf = NULL;

  if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
    exBuf = indexPackJsonDataPrefix(term, &skip);
    pCt->colVal = exBuf;
  }
  char* key = indexCacheTermGet(pCt);

274
  // SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
dengyihao's avatar
dengyihao 已提交
275 276 277 278 279 280 281
  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
282 283 284
    // printf("json val: %s\n", c->colVal);
    if (0 != strncmp(c->colVal, pCt->colVal, skip)) {
      break;
285
    }
dengyihao's avatar
dengyihao 已提交
286 287
    char* p = taosMemoryCalloc(1, strlen(c->colVal) + 1);
    memcpy(p, c->colVal, strlen(c->colVal));
dengyihao's avatar
dengyihao 已提交
288

dengyihao's avatar
dengyihao 已提交
289
    TExeCond cond = cmpFn(p + skip, term->colVal, dType);
dengyihao's avatar
dengyihao 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302
    if (cond == MATCH) {
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else if (cond == CONTINUE) {
      continue;
    } else if (cond == BREAK) {
      break;
    }
dengyihao's avatar
dengyihao 已提交
303
    taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
304 305 306 307 308 309
  }

  taosMemoryFree(pCt);
  taosMemoryFree(exBuf);
  tSkipListDestroyIter(iter);

dengyihao's avatar
dengyihao 已提交
310 311 312
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
313 314 315
  // impl later
  return 0;
}
316
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
dengyihao's avatar
dengyihao 已提交
317

dengyihao's avatar
dengyihao 已提交
318
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
wafwerar's avatar
wafwerar 已提交
319
  IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache));
dengyihao's avatar
dengyihao 已提交
320 321 322
  if (cache == NULL) {
    indexError("failed to create index cache");
    return NULL;
dengyihao's avatar
dengyihao 已提交
323
  };
dengyihao's avatar
dengyihao 已提交
324

dengyihao's avatar
dengyihao 已提交
325
  cache->mem = indexInternalCacheCreate(type);
dengyihao's avatar
dengyihao 已提交
326
  cache->mem->pCache = cache;
dengyihao's avatar
dengyihao 已提交
327
  cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
dengyihao's avatar
dengyihao 已提交
328 329 330
  cache->type = type;
  cache->index = idx;
  cache->version = 0;
dengyihao's avatar
dengyihao 已提交
331
  cache->suid = suid;
dengyihao's avatar
dengyihao 已提交
332
  cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
333

wafwerar's avatar
wafwerar 已提交
334 335
  taosThreadMutexInit(&cache->mtx, NULL);
  taosThreadCondInit(&cache->finished, NULL);
dengyihao's avatar
dengyihao 已提交
336

dengyihao's avatar
dengyihao 已提交
337
  indexCacheRef(cache);
338 339 340
  if (idx != NULL) {
    indexAcquireRef(idx->refId);
  }
dengyihao's avatar
dengyihao 已提交
341 342
  return cache;
}
343
void indexCacheDebug(IndexCache* cache) {
344 345
  MemTable* tbl = NULL;

wafwerar's avatar
wafwerar 已提交
346
  taosThreadMutexLock(&cache->mtx);
347 348
  tbl = cache->mem;
  indexMemRef(tbl);
wafwerar's avatar
wafwerar 已提交
349
  taosThreadMutexUnlock(&cache->mtx);
350

dengyihao's avatar
dengyihao 已提交
351 352 353 354 355 356 357 358 359 360
  {
    SSkipList*         slt = tbl->mem;
    SSkipListIterator* iter = tSkipListCreateIter(slt);
    while (tSkipListIterNext(iter)) {
      SSkipListNode* node = tSkipListIterGet(iter);
      CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (ct != NULL) {
        // TODO, add more debug info
        indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
      }
361
    }
dengyihao's avatar
dengyihao 已提交
362 363 364
    tSkipListDestroyIter(iter);

    indexMemUnRef(tbl);
365
  }
366

dengyihao's avatar
dengyihao 已提交
367
  {
wafwerar's avatar
wafwerar 已提交
368
    taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
369 370
    tbl = cache->imm;
    indexMemRef(tbl);
wafwerar's avatar
wafwerar 已提交
371
    taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
    if (tbl != NULL) {
      SSkipList*         slt = tbl->mem;
      SSkipListIterator* iter = tSkipListCreateIter(slt);
      while (tSkipListIterNext(iter)) {
        SSkipListNode* node = tSkipListIterGet(iter);
        CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
        if (ct != NULL) {
          // TODO, add more debug info
          indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
        }
      }
      tSkipListDestroyIter(iter);
    }

    indexMemUnRef(tbl);
  }
388
}
dengyihao's avatar
dengyihao 已提交
389

dengyihao's avatar
dengyihao 已提交
390 391
void indexCacheDestroySkiplist(SSkipList* slt) {
  SSkipListIterator* iter = tSkipListCreateIter(slt);
dengyihao's avatar
dengyihao 已提交
392
  while (iter != NULL && tSkipListIterNext(iter)) {
dengyihao's avatar
dengyihao 已提交
393 394
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
395
    if (ct != NULL) {
wafwerar's avatar
wafwerar 已提交
396 397
      taosMemoryFree(ct->colVal);
      taosMemoryFree(ct);
dengyihao's avatar
dengyihao 已提交
398
    }
dengyihao's avatar
dengyihao 已提交
399 400
  }
  tSkipListDestroyIter(iter);
401
  tSkipListDestroy(slt);
dengyihao's avatar
dengyihao 已提交
402
}
dengyihao's avatar
dengyihao 已提交
403 404 405 406 407 408 409 410
void indexCacheBroadcast(void* cache) {
  IndexCache* pCache = cache;
  taosThreadCondBroadcast(&pCache->finished);
}
void indexCacheWait(void* cache) {
  IndexCache* pCache = cache;
  taosThreadCondWait(&pCache->finished, &pCache->mtx);
}
dengyihao's avatar
dengyihao 已提交
411
void indexCacheDestroyImm(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
412 413 414
  if (cache == NULL) {
    return;
  }
415
  MemTable* tbl = NULL;
wafwerar's avatar
wafwerar 已提交
416
  taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
417

418
  tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
419
  cache->imm = NULL;  // or throw int bg thread
dengyihao's avatar
dengyihao 已提交
420
  indexCacheBroadcast(cache);
dengyihao's avatar
dengyihao 已提交
421

wafwerar's avatar
wafwerar 已提交
422
  taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
423 424

  indexMemUnRef(tbl);
425
  indexMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
426
}
dengyihao's avatar
dengyihao 已提交
427 428
void indexCacheDestroy(void* cache) {
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
429 430 431
  if (pCache == NULL) {
    return;
  }
432

433 434
  indexMemUnRef(pCache->mem);
  indexMemUnRef(pCache->imm);
wafwerar's avatar
wafwerar 已提交
435
  taosMemoryFree(pCache->colName);
dengyihao's avatar
dengyihao 已提交
436

wafwerar's avatar
wafwerar 已提交
437 438
  taosThreadMutexDestroy(&pCache->mtx);
  taosThreadCondDestroy(&pCache->finished);
439 440 441
  if (pCache->index != NULL) {
    indexReleaseRef(((SIndex*)pCache->index)->refId);
  }
wafwerar's avatar
wafwerar 已提交
442
  taosMemoryFree(pCache);
dengyihao's avatar
dengyihao 已提交
443 444
}

dengyihao's avatar
dengyihao 已提交
445
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
446 447 448
  if (cache->imm == NULL) {
    return NULL;
  }
wafwerar's avatar
wafwerar 已提交
449
  Iterate* iiter = taosMemoryCalloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
450 451 452
  if (iiter == NULL) {
    return NULL;
  }
wafwerar's avatar
wafwerar 已提交
453
  taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
454 455

  indexMemRef(cache->imm);
dengyihao's avatar
dengyihao 已提交
456

457
  MemTable* tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
458
  iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
459
  iiter->val.colVal = NULL;
460
  iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
dengyihao's avatar
dengyihao 已提交
461 462 463
  iiter->next = indexCacheIteratorNext;
  iiter->getValue = indexCacheIteratorGetValue;

wafwerar's avatar
wafwerar 已提交
464
  taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
465

dengyihao's avatar
dengyihao 已提交
466 467 468
  return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
469 470 471
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
472 473
  tSkipListDestroyIter(iter->iter);
  iterateValueDestroy(&iter->val, true);
wafwerar's avatar
wafwerar 已提交
474
  taosMemoryFree(iter);
dengyihao's avatar
dengyihao 已提交
475
}
dengyihao's avatar
dengyihao 已提交
476

dengyihao's avatar
dengyihao 已提交
477
int indexCacheSchedToMerge(IndexCache* pCache, bool notify) {
dengyihao's avatar
dengyihao 已提交
478 479 480
  SSchedMsg schedMsg = {0};
  schedMsg.fp = doMergeWork;
  schedMsg.ahandle = pCache;
dengyihao's avatar
dengyihao 已提交
481 482 483
  if (notify) {
    schedMsg.thandle = taosMemoryMalloc(1);
  }
dengyihao's avatar
dengyihao 已提交
484
  schedMsg.msg = NULL;
dengyihao's avatar
dengyihao 已提交
485
  indexAcquireRef(pCache->index->refId);
dengyihao's avatar
dengyihao 已提交
486
  taosScheduleTask(indexQhandle, &schedMsg);
487
  return 0;
dengyihao's avatar
dengyihao 已提交
488
}
489

dengyihao's avatar
dengyihao 已提交
490 491
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
  while (true) {
dengyihao's avatar
dengyihao 已提交
492
    if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
dengyihao's avatar
dengyihao 已提交
493 494 495
      break;
    } else if (cache->imm != NULL) {
      // TODO: wake up by condition variable
dengyihao's avatar
dengyihao 已提交
496
      indexCacheWait(cache);
dengyihao's avatar
dengyihao 已提交
497
    } else {
dengyihao's avatar
dengyihao 已提交
498 499
      bool notifyQuit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false;

dengyihao's avatar
dengyihao 已提交
500
      indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
501 502
      cache->imm = cache->mem;
      cache->mem = indexInternalCacheCreate(cache->type);
dengyihao's avatar
dengyihao 已提交
503
      cache->mem->pCache = cache;
dengyihao's avatar
dengyihao 已提交
504
      cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
505 506
      // sched to merge
      // unref cache in bgwork
dengyihao's avatar
dengyihao 已提交
507
      indexCacheSchedToMerge(cache, notifyQuit);
dengyihao's avatar
dengyihao 已提交
508 509 510
    }
  }
}
dengyihao's avatar
dengyihao 已提交
511
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
512 513 514
  if (cache == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
515
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
516

dengyihao's avatar
dengyihao 已提交
517
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
518
  indexCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
519
  // encode data
wafwerar's avatar
wafwerar 已提交
520
  CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm));
dengyihao's avatar
dengyihao 已提交
521 522 523
  if (cache == NULL) {
    return -1;
  }
524 525
  // set up key
  ct->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
526
  if (hasJson) {
dengyihao's avatar
add UT  
dengyihao 已提交
527
    ct->colVal = indexPackJsonData(term);
dengyihao's avatar
dengyihao 已提交
528
  } else {
wafwerar's avatar
wafwerar 已提交
529
    ct->colVal = (char*)taosMemoryCalloc(1, sizeof(char) * (term->nColVal + 1));
dengyihao's avatar
dengyihao 已提交
530 531
    memcpy(ct->colVal, term->colVal, term->nColVal);
  }
dengyihao's avatar
dengyihao 已提交
532 533
  ct->version = atomic_add_fetch_32(&pCache->version, 1);
  // set value
534 535
  ct->uid = uid;
  ct->operaType = term->operType;
dengyihao's avatar
dengyihao 已提交
536
  // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
537
  int64_t estimate = sizeof(ct) + strlen(ct->colVal);
dengyihao's avatar
dengyihao 已提交
538

wafwerar's avatar
wafwerar 已提交
539
  taosThreadMutexLock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
540
  pCache->occupiedMem += estimate;
dengyihao's avatar
dengyihao 已提交
541
  indexCacheMakeRoomForWrite(pCache);
542 543 544 545 546
  MemTable* tbl = pCache->mem;
  indexMemRef(tbl);
  tSkipListPut(tbl->mem, (char*)ct);
  indexMemUnRef(tbl);

wafwerar's avatar
wafwerar 已提交
547
  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
548 549

  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
550
  return 0;
dengyihao's avatar
dengyihao 已提交
551
  // encode end
dengyihao's avatar
dengyihao 已提交
552
}
dengyihao's avatar
dengyihao 已提交
553 554 555 556 557 558
void indexCacheForceToMerge(void* cache) {
  IndexCache* pCache = cache;
  indexCacheRef(pCache);
  taosThreadMutexLock(&pCache->mtx);

  indexInfo("%p is forced to merge into tfile", pCache);
dengyihao's avatar
dengyihao 已提交
559
  pCache->occupiedMem += MEM_SIGNAL_QUIT;
dengyihao's avatar
dengyihao 已提交
560 561 562 563 564 565
  indexCacheMakeRoomForWrite(pCache);

  taosThreadMutexUnlock(&pCache->mtx);
  indexCacheUnRef(pCache);
  return;
}
dengyihao's avatar
dengyihao 已提交
566
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
dengyihao's avatar
dengyihao 已提交
567
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
568
  return 0;
dengyihao's avatar
dengyihao 已提交
569
}
570

dengyihao's avatar
dengyihao 已提交
571
static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
572 573 574
  if (mem == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
575 576 577 578 579 580 581 582 583

  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;

  if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
    return cacheSearch[1][qtype](mem, term, tr, s);
  } else {
    return cacheSearch[0][qtype](mem, term, tr, s);
  }
dengyihao's avatar
dengyihao 已提交
584
}
dengyihao's avatar
dengyihao 已提交
585
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
dengyihao's avatar
add UT  
dengyihao 已提交
586
  int64_t st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
587 588 589
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
590 591 592
  IndexCache* pCache = cache;

  MemTable *mem = NULL, *imm = NULL;
wafwerar's avatar
wafwerar 已提交
593
  taosThreadMutexLock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
594 595 596 597
  mem = pCache->mem;
  imm = pCache->imm;
  indexMemRef(mem);
  indexMemRef(imm);
wafwerar's avatar
wafwerar 已提交
598
  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
599

dengyihao's avatar
dengyihao 已提交
600
  int ret = indexQueryMem(mem, query, result, s);
dengyihao's avatar
dengyihao 已提交
601 602
  if (ret == 0 && *s != kTypeDeletion) {
    // continue search in imm
dengyihao's avatar
dengyihao 已提交
603
    ret = indexQueryMem(imm, query, result, s);
dengyihao's avatar
dengyihao 已提交
604
  }
dengyihao's avatar
dengyihao 已提交
605

606 607
  indexMemUnRef(mem);
  indexMemUnRef(imm);
dengyihao's avatar
add UT  
dengyihao 已提交
608
  indexInfo("cache search, time cost %" PRIu64 "us", taosGetTimestampUs() - st);
dengyihao's avatar
dengyihao 已提交
609 610

  return ret;
dengyihao's avatar
dengyihao 已提交
611
}
dengyihao's avatar
dengyihao 已提交
612 613

void indexCacheRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
614 615 616
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
617 618 619 620
  int ref = T_REF_INC(cache);
  UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
621 622 623
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
624
  int ref = T_REF_DEC(cache);
dengyihao's avatar
dengyihao 已提交
625 626 627
  if (ref == 0) {
    indexCacheDestroy(cache);
  }
dengyihao's avatar
dengyihao 已提交
628
}
629 630

void indexMemRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
631 632 633
  if (tbl == NULL) {
    return;
  }
634 635 636 637
  int ref = T_REF_INC(tbl);
  UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
638 639 640
  if (tbl == NULL) {
    return;
  }
641 642 643 644
  int ref = T_REF_DEC(tbl);
  if (ref == 0) {
    SSkipList* slt = tbl->mem;
    indexCacheDestroySkiplist(slt);
wafwerar's avatar
wafwerar 已提交
645
    taosMemoryFree(tbl);
646 647 648
  }
}

dengyihao's avatar
dengyihao 已提交
649
static void indexCacheTermDestroy(CacheTerm* ct) {
dengyihao's avatar
dengyihao 已提交
650 651 652
  if (ct == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
653 654
  taosMemoryFree(ct->colVal);
  taosMemoryFree(ct);
655
}
dengyihao's avatar
dengyihao 已提交
656
static char* indexCacheTermGet(const void* pData) {
657 658 659
  CacheTerm* p = (CacheTerm*)pData;
  return (char*)p;
}
dengyihao's avatar
dengyihao 已提交
660
static int32_t indexCacheTermCompare(const void* l, const void* r) {
661 662 663
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;
  // compare colVal
dengyihao's avatar
dengyihao 已提交
664
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
dengyihao's avatar
dengyihao 已提交
665 666 667
  if (cmp == 0) {
    return rt->version - lt->version;
  }
dengyihao's avatar
dengyihao 已提交
668
  return cmp;
669 670
}

671 672 673 674 675 676 677
static int indexFindCh(char* a, char c) {
  char* p = a;
  while (*p != 0 && *p++ != c) {
  }
  return p - a;
}
static int indexCacheJsonTermCompareImpl(char* a, char* b) {
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695
  // int alen = indexFindCh(a, '&');
  // int blen = indexFindCh(b, '&');

  // int cmp = strncmp(a, b, MIN(alen, blen));
  // if (cmp == 0) {
  //  cmp = alen - blen;
  //  if (cmp != 0) {
  //    return cmp;
  //  }
  //  cmp = *(a + alen) - *(b + blen);
  //  if (cmp != 0) {
  //    return cmp;
  //  }
  //  alen += 2;
  //  blen += 2;
  //  cmp = strcmp(a + alen, b + blen);
  //}
  return 0;
696 697 698 699 700
}
static int32_t indexCacheJsonTermCompare(const void* l, const void* r) {
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;
  // compare colVal
701
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
702 703 704 705 706
  if (cmp == 0) {
    return rt->version - lt->version;
  }
  return cmp;
}
707
static MemTable* indexInternalCacheCreate(int8_t type) {
708 709 710
  int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;
  int32_t (*cmpFn)(const void* l, const void* r) =
      INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? indexCacheJsonTermCompare : indexCacheTermCompare;
dengyihao's avatar
dengyihao 已提交
711

wafwerar's avatar
wafwerar 已提交
712
  MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
713
  indexMemRef(tbl);
714 715 716
  if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) {
    tbl->mem =
        tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, indexCacheTermGet);
717 718 719 720 721 722 723
  }
  return tbl;
}

static void doMergeWork(SSchedMsg* msg) {
  IndexCache* pCache = msg->ahandle;
  SIndex*     sidx = (SIndex*)pCache->index;
dengyihao's avatar
dengyihao 已提交
724 725 726

  sidx->quit = msg->thandle ? true : false;
  taosMemoryFree(msg->thandle);
dengyihao's avatar
dengyihao 已提交
727
  indexFlushCacheToTFile(sidx, pCache);
728 729 730
}
static bool indexCacheIteratorNext(Iterate* itera) {
  SSkipListIterator* iter = itera->iter;
dengyihao's avatar
dengyihao 已提交
731 732 733
  if (iter == NULL) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
734
  IterateValue* iv = &itera->val;
735 736 737 738 739 740 741 742
  iterateValueDestroy(iv, false);

  bool next = tSkipListIterNext(iter);
  if (next) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);

    iv->type = ct->operaType;
dengyihao's avatar
dengyihao 已提交
743
    iv->ver = ct->version;
dengyihao's avatar
dengyihao 已提交
744
    iv->colVal = tstrdup(ct->colVal);
745 746 747 748 749
    taosArrayPush(iv->val, &ct->uid);
  }
  return next;
}

dengyihao's avatar
dengyihao 已提交
750 751 752 753
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
  // opt later
  return &iter->val;
}