indexCache.c 23.8 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17 18
#include "indexCache.h"
#include "indexComm.h"
#include "indexUtil.h"
19
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
20
#include "tsched.h"
dengyihao's avatar
dengyihao 已提交
21

22
#define MAX_INDEX_KEY_LEN 256  // test only, change later
dengyihao's avatar
dengyihao 已提交
23

dengyihao's avatar
dengyihao 已提交
24 25
#define MEM_TERM_LIMIT     10 * 10000
#define MEM_THRESHOLD      1024 * 1024
dengyihao's avatar
dengyihao 已提交
26
#define MEM_ESTIMATE_RADIO 1.5
dengyihao's avatar
dengyihao 已提交
27

dengyihao's avatar
dengyihao 已提交
28 29
static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl);
30

dengyihao's avatar
dengyihao 已提交
31 32
static void    indexCacheTermDestroy(CacheTerm* ct);
static int32_t indexCacheTermCompare(const void* l, const void* r);
33
static int32_t indexCacheJsonTermCompare(const void* l, const void* r);
dengyihao's avatar
dengyihao 已提交
34
static char*   indexCacheTermGet(const void* pData);
35

36
static MemTable* indexInternalCacheCreate(int8_t type);
37

dengyihao's avatar
dengyihao 已提交
38 39 40 41 42 43 44 45 46
static int32_t cacheSearchTerm(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
dengyihao's avatar
dengyihao 已提交
47
/*comm func of compare, used in (LE/LT/GE/GT compare)*/
dengyihao's avatar
dengyihao 已提交
48
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s,
dengyihao's avatar
dengyihao 已提交
49
                                      RangeType type);
dengyihao's avatar
dengyihao 已提交
50 51 52 53 54 55 56 57 58 59 60 61
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s);

static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
                                           RangeType type);
dengyihao's avatar
dengyihao 已提交
62 63 64 65

typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);

dengyihao's avatar
dengyihao 已提交
66
static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
67
  // optime later
dengyihao's avatar
dengyihao 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
  int32_t ret = func(a, b);
  switch (comType) {
    case QUERY_LESS_THAN: {
      if (ret < 0) return MATCH;
    } break;
    case QUERY_LESS_EQUAL: {
      if (ret <= 0) return MATCH;
      break;
    }
    case QUERY_GREATER_THAN: {
      if (ret > 0) return MATCH;
      break;
    }
    case QUERY_GREATER_EQUAL: {
      if (ret >= 0) return MATCH;
    }
  }
  return CONTINUE;
}
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
  __compar_fn_t func = getComparFunc(type, 0);
  return tDoCommpare(func, QUERY_LESS_THAN, a, b);
}
dengyihao's avatar
dengyihao 已提交
91 92 93 94 95 96 97 98 99 100 101 102
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
  __compar_fn_t func = getComparFunc(type, 0);
  return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
}
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
  __compar_fn_t func = getComparFunc(type, 0);
  return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
}
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
  __compar_fn_t func = getComparFunc(type, 0);
  return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
}
dengyihao's avatar
dengyihao 已提交
103 104 105

static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
                                                                    tCompareGreaterThan, tCompareGreaterEqual};
dengyihao's avatar
dengyihao 已提交
106

dengyihao's avatar
dengyihao 已提交
107 108 109 110 111 112
static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
    {cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual,
     cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange},
    {cacheSearchTerm_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON,
     cacheSearchLessThan_JSON, cacheSearchLessEqual_JSON, cacheSearchGreaterThan_JSON, cacheSearchGreaterEqual_JSON,
     cacheSearchRange_JSON}};
dengyihao's avatar
dengyihao 已提交
113

dengyihao's avatar
dengyihao 已提交
114 115 116
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);

dengyihao's avatar
dengyihao 已提交
117
static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
118 119 120
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
121 122
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;
dengyihao's avatar
dengyihao 已提交
123

dengyihao's avatar
dengyihao 已提交
124 125 126 127 128
  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
  pCt->version = atomic_load_32(&pCache->version);

  char* key = indexCacheTermGet(pCt);
dengyihao's avatar
dengyihao 已提交
129 130 131 132 133 134 135 136

  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
137
    if (0 == strcmp(c->colVal, pCt->colVal)) {
dengyihao's avatar
dengyihao 已提交
138 139 140 141 142 143 144 145 146 147 148
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
149 150

  taosMemoryFree(pCt);
dengyihao's avatar
dengyihao 已提交
151
  tSkipListDestroyIter(iter);
dengyihao's avatar
dengyihao 已提交
152 153
  return 0;
}
dengyihao's avatar
dengyihao 已提交
154
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
155 156 157
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
158
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
159 160 161
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
162
static int32_t cacheSearchRegex(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
163 164 165
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
166
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
dengyihao's avatar
dengyihao 已提交
167 168 169 170
                                      RangeType type) {
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
171

dengyihao's avatar
dengyihao 已提交
172 173
  _cache_range_compare cmpFn = rangeCompare[type];

dengyihao's avatar
dengyihao 已提交
174 175 176 177 178 179 180 181
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;

  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
  pCt->version = atomic_load_32(&pCache->version);

  char* key = indexCacheTermGet(pCt);
dengyihao's avatar
dengyihao 已提交
182 183 184 185 186 187 188 189

  SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
190
    TExeCond   cond = cmpFn(c->colVal, pCt->colVal, pCt->colType);
dengyihao's avatar
dengyihao 已提交
191 192 193 194 195 196 197 198 199
    if (cond == MATCH) {
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else if (cond == CONTINUE) {
dengyihao's avatar
dengyihao 已提交
200
      continue;
dengyihao's avatar
dengyihao 已提交
201 202 203 204
    } else if (cond == BREAK) {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
205
  taosMemoryFree(pCt);
dengyihao's avatar
dengyihao 已提交
206 207 208
  tSkipListDestroyIter(iter);
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
209 210 211 212 213
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, LT);
}
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, LE);
dengyihao's avatar
dengyihao 已提交
214
}
dengyihao's avatar
dengyihao 已提交
215 216 217 218 219 220 221 222
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, GT);
}
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc(cache, term, tr, s, GE);
}

static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
  if (cache == NULL) {
    return 0;
  }
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;

  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
  pCt->version = atomic_load_32(&pCache->version);

  char* exBuf = NULL;
  if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
    exBuf = indexPackJsonData(term);
    pCt->colVal = exBuf;
  }
  char* key = indexCacheTermGet(pCt);

  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
247

dengyihao's avatar
dengyihao 已提交
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
    if (0 == strcmp(c->colVal, pCt->colVal)) {
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else {
      break;
    }
  }

  taosMemoryFree(pCt);
  taosMemoryFree(exBuf);
  tSkipListDestroyIter(iter);
  return 0;

dengyihao's avatar
dengyihao 已提交
266 267 268 269 270 271 272 273 274 275
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
276
}
dengyihao's avatar
dengyihao 已提交
277 278
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT);
dengyihao's avatar
dengyihao 已提交
279
}
dengyihao's avatar
dengyihao 已提交
280 281
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, LE);
dengyihao's avatar
dengyihao 已提交
282
}
dengyihao's avatar
dengyihao 已提交
283 284 285 286 287 288 289 290 291 292 293 294
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, GT);
}
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE);
}
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
  return TSDB_CODE_SUCCESS;
}

static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
                                           RangeType type) {
dengyihao's avatar
dengyihao 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
  if (cache == NULL) {
    return 0;
  }
  _cache_range_compare cmpFn = rangeCompare[type];

  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;

  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
  pCt->version = atomic_load_32(&pCache->version);

  int8_t dType = INDEX_TYPE_GET_TYPE(term->colType);
  int    skip = 0;
  char*  exBuf = NULL;

  if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
    exBuf = indexPackJsonDataPrefix(term, &skip);
    pCt->colVal = exBuf;
  }
  char* key = indexCacheTermGet(pCt);

317
  // SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
dengyihao's avatar
dengyihao 已提交
318 319 320 321 322 323 324
  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
325 326 327 328
    printf("json val: %s\n", c->colVal);
    if (0 != strncmp(c->colVal, term->colName, term->nColName)) {
      continue;
    }
dengyihao's avatar
dengyihao 已提交
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349

    TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType);
    if (cond == MATCH) {
      if (c->operaType == ADD_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
        INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
      }
    } else if (cond == CONTINUE) {
      continue;
    } else if (cond == BREAK) {
      break;
    }
  }

  taosMemoryFree(pCt);
  taosMemoryFree(exBuf);
  tSkipListDestroyIter(iter);

dengyihao's avatar
dengyihao 已提交
350 351 352
  return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
353 354 355
  // impl later
  return 0;
}
356
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
dengyihao's avatar
dengyihao 已提交
357

dengyihao's avatar
dengyihao 已提交
358
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
wafwerar's avatar
wafwerar 已提交
359
  IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache));
dengyihao's avatar
dengyihao 已提交
360 361 362
  if (cache == NULL) {
    indexError("failed to create index cache");
    return NULL;
dengyihao's avatar
dengyihao 已提交
363
  };
dengyihao's avatar
dengyihao 已提交
364

dengyihao's avatar
dengyihao 已提交
365
  cache->mem = indexInternalCacheCreate(type);
dengyihao's avatar
dengyihao 已提交
366
  cache->mem->pCache = cache;
dengyihao's avatar
dengyihao 已提交
367
  cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
dengyihao's avatar
dengyihao 已提交
368 369 370
  cache->type = type;
  cache->index = idx;
  cache->version = 0;
dengyihao's avatar
dengyihao 已提交
371
  cache->suid = suid;
dengyihao's avatar
dengyihao 已提交
372
  cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
373

wafwerar's avatar
wafwerar 已提交
374 375
  taosThreadMutexInit(&cache->mtx, NULL);
  taosThreadCondInit(&cache->finished, NULL);
dengyihao's avatar
dengyihao 已提交
376

dengyihao's avatar
dengyihao 已提交
377
  indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
378 379
  return cache;
}
380
void indexCacheDebug(IndexCache* cache) {
381 382
  MemTable* tbl = NULL;

wafwerar's avatar
wafwerar 已提交
383
  taosThreadMutexLock(&cache->mtx);
384 385
  tbl = cache->mem;
  indexMemRef(tbl);
wafwerar's avatar
wafwerar 已提交
386
  taosThreadMutexUnlock(&cache->mtx);
387

dengyihao's avatar
dengyihao 已提交
388 389 390 391 392 393 394 395 396 397
  {
    SSkipList*         slt = tbl->mem;
    SSkipListIterator* iter = tSkipListCreateIter(slt);
    while (tSkipListIterNext(iter)) {
      SSkipListNode* node = tSkipListIterGet(iter);
      CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (ct != NULL) {
        // TODO, add more debug info
        indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
      }
398
    }
dengyihao's avatar
dengyihao 已提交
399 400 401
    tSkipListDestroyIter(iter);

    indexMemUnRef(tbl);
402
  }
403

dengyihao's avatar
dengyihao 已提交
404
  {
wafwerar's avatar
wafwerar 已提交
405
    taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
406 407
    tbl = cache->imm;
    indexMemRef(tbl);
wafwerar's avatar
wafwerar 已提交
408
    taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
    if (tbl != NULL) {
      SSkipList*         slt = tbl->mem;
      SSkipListIterator* iter = tSkipListCreateIter(slt);
      while (tSkipListIterNext(iter)) {
        SSkipListNode* node = tSkipListIterGet(iter);
        CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
        if (ct != NULL) {
          // TODO, add more debug info
          indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
        }
      }
      tSkipListDestroyIter(iter);
    }

    indexMemUnRef(tbl);
  }
425
}
dengyihao's avatar
dengyihao 已提交
426

dengyihao's avatar
dengyihao 已提交
427 428 429 430 431
void indexCacheDestroySkiplist(SSkipList* slt) {
  SSkipListIterator* iter = tSkipListCreateIter(slt);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
432
    if (ct != NULL) {
wafwerar's avatar
wafwerar 已提交
433 434
      taosMemoryFree(ct->colVal);
      taosMemoryFree(ct);
dengyihao's avatar
dengyihao 已提交
435
    }
dengyihao's avatar
dengyihao 已提交
436 437
  }
  tSkipListDestroyIter(iter);
438
  tSkipListDestroy(slt);
dengyihao's avatar
dengyihao 已提交
439
}
dengyihao's avatar
dengyihao 已提交
440
void indexCacheDestroyImm(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
441 442 443
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
444

445
  MemTable* tbl = NULL;
wafwerar's avatar
wafwerar 已提交
446
  taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
447

448
  tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
449
  cache->imm = NULL;  // or throw int bg thread
wafwerar's avatar
wafwerar 已提交
450
  taosThreadCondBroadcast(&cache->finished);
dengyihao's avatar
dengyihao 已提交
451

wafwerar's avatar
wafwerar 已提交
452
  taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
453 454

  indexMemUnRef(tbl);
455
  indexMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
456
}
dengyihao's avatar
dengyihao 已提交
457 458
void indexCacheDestroy(void* cache) {
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
459 460 461
  if (pCache == NULL) {
    return;
  }
462 463
  indexMemUnRef(pCache->mem);
  indexMemUnRef(pCache->imm);
wafwerar's avatar
wafwerar 已提交
464
  taosMemoryFree(pCache->colName);
dengyihao's avatar
dengyihao 已提交
465

wafwerar's avatar
wafwerar 已提交
466 467
  taosThreadMutexDestroy(&pCache->mtx);
  taosThreadCondDestroy(&pCache->finished);
dengyihao's avatar
dengyihao 已提交
468

wafwerar's avatar
wafwerar 已提交
469
  taosMemoryFree(pCache);
dengyihao's avatar
dengyihao 已提交
470 471
}

dengyihao's avatar
dengyihao 已提交
472
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
wafwerar's avatar
wafwerar 已提交
473
  Iterate* iiter = taosMemoryCalloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
474 475 476
  if (iiter == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
477

wafwerar's avatar
wafwerar 已提交
478
  taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
479 480

  indexMemRef(cache->imm);
dengyihao's avatar
dengyihao 已提交
481

482
  MemTable* tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
483
  iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
484
  iiter->val.colVal = NULL;
485
  iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
dengyihao's avatar
dengyihao 已提交
486 487 488
  iiter->next = indexCacheIteratorNext;
  iiter->getValue = indexCacheIteratorGetValue;

wafwerar's avatar
wafwerar 已提交
489
  taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
490

dengyihao's avatar
dengyihao 已提交
491 492 493
  return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
494 495 496
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
497 498
  tSkipListDestroyIter(iter->iter);
  iterateValueDestroy(&iter->val, true);
wafwerar's avatar
wafwerar 已提交
499
  taosMemoryFree(iter);
dengyihao's avatar
dengyihao 已提交
500
}
dengyihao's avatar
dengyihao 已提交
501 502 503 504 505 506 507 508 509

int indexCacheSchedToMerge(IndexCache* pCache) {
  SSchedMsg schedMsg = {0};
  schedMsg.fp = doMergeWork;
  schedMsg.ahandle = pCache;
  schedMsg.thandle = NULL;
  schedMsg.msg = NULL;

  taosScheduleTask(indexQhandle, &schedMsg);
510 511

  return 0;
dengyihao's avatar
dengyihao 已提交
512
}
513

dengyihao's avatar
dengyihao 已提交
514 515
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
  while (true) {
dengyihao's avatar
dengyihao 已提交
516
    if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
dengyihao's avatar
dengyihao 已提交
517 518 519
      break;
    } else if (cache->imm != NULL) {
      // TODO: wake up by condition variable
wafwerar's avatar
wafwerar 已提交
520
      taosThreadCondWait(&cache->finished, &cache->mtx);
dengyihao's avatar
dengyihao 已提交
521
    } else {
dengyihao's avatar
dengyihao 已提交
522
      indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
523 524
      cache->imm = cache->mem;
      cache->mem = indexInternalCacheCreate(cache->type);
dengyihao's avatar
dengyihao 已提交
525
      cache->mem->pCache = cache;
dengyihao's avatar
dengyihao 已提交
526
      cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
527 528 529 530 531 532
      // sched to merge
      // unref cache in bgwork
      indexCacheSchedToMerge(cache);
    }
  }
}
dengyihao's avatar
dengyihao 已提交
533
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
534 535 536
  if (cache == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
537
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
538

dengyihao's avatar
dengyihao 已提交
539
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
540
  indexCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
541
  // encode data
wafwerar's avatar
wafwerar 已提交
542
  CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm));
dengyihao's avatar
dengyihao 已提交
543 544 545
  if (cache == NULL) {
    return -1;
  }
546 547
  // set up key
  ct->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
548
  if (hasJson) {
dengyihao's avatar
add UT  
dengyihao 已提交
549
    ct->colVal = indexPackJsonData(term);
dengyihao's avatar
dengyihao 已提交
550
  } else {
wafwerar's avatar
wafwerar 已提交
551
    ct->colVal = (char*)taosMemoryCalloc(1, sizeof(char) * (term->nColVal + 1));
dengyihao's avatar
dengyihao 已提交
552 553
    memcpy(ct->colVal, term->colVal, term->nColVal);
  }
dengyihao's avatar
dengyihao 已提交
554 555
  ct->version = atomic_add_fetch_32(&pCache->version, 1);
  // set value
556 557
  ct->uid = uid;
  ct->operaType = term->operType;
dengyihao's avatar
dengyihao 已提交
558
  // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
559
  int64_t estimate = sizeof(ct) + strlen(ct->colVal);
dengyihao's avatar
dengyihao 已提交
560

wafwerar's avatar
wafwerar 已提交
561
  taosThreadMutexLock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
562
  pCache->occupiedMem += estimate;
dengyihao's avatar
dengyihao 已提交
563
  indexCacheMakeRoomForWrite(pCache);
564 565 566 567 568
  MemTable* tbl = pCache->mem;
  indexMemRef(tbl);
  tSkipListPut(tbl->mem, (char*)ct);
  indexMemUnRef(tbl);

wafwerar's avatar
wafwerar 已提交
569
  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
570 571

  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
572
  return 0;
dengyihao's avatar
dengyihao 已提交
573
  // encode end
dengyihao's avatar
dengyihao 已提交
574
}
dengyihao's avatar
dengyihao 已提交
575
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
dengyihao's avatar
dengyihao 已提交
576
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
577
  return 0;
dengyihao's avatar
dengyihao 已提交
578
}
579

dengyihao's avatar
dengyihao 已提交
580
static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
581 582 583
  if (mem == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
584 585 586 587 588 589 590 591 592

  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;

  if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
    return cacheSearch[1][qtype](mem, term, tr, s);
  } else {
    return cacheSearch[0][qtype](mem, term, tr, s);
  }
dengyihao's avatar
dengyihao 已提交
593
}
dengyihao's avatar
dengyihao 已提交
594
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
dengyihao's avatar
add UT  
dengyihao 已提交
595
  int64_t st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
596 597 598
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
599 600 601
  IndexCache* pCache = cache;

  MemTable *mem = NULL, *imm = NULL;
wafwerar's avatar
wafwerar 已提交
602
  taosThreadMutexLock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
603 604 605 606
  mem = pCache->mem;
  imm = pCache->imm;
  indexMemRef(mem);
  indexMemRef(imm);
wafwerar's avatar
wafwerar 已提交
607
  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
608

dengyihao's avatar
dengyihao 已提交
609
  int ret = indexQueryMem(mem, query, result, s);
dengyihao's avatar
dengyihao 已提交
610 611
  if (ret == 0 && *s != kTypeDeletion) {
    // continue search in imm
dengyihao's avatar
dengyihao 已提交
612
    ret = indexQueryMem(imm, query, result, s);
dengyihao's avatar
dengyihao 已提交
613
  }
dengyihao's avatar
dengyihao 已提交
614

615 616
  indexMemUnRef(mem);
  indexMemUnRef(imm);
dengyihao's avatar
add UT  
dengyihao 已提交
617
  indexInfo("cache search, time cost %" PRIu64 "us", taosGetTimestampUs() - st);
dengyihao's avatar
dengyihao 已提交
618 619

  return ret;
dengyihao's avatar
dengyihao 已提交
620
}
dengyihao's avatar
dengyihao 已提交
621 622

void indexCacheRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
623 624 625
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
626 627 628 629
  int ref = T_REF_INC(cache);
  UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
630 631 632
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
633
  int ref = T_REF_DEC(cache);
dengyihao's avatar
dengyihao 已提交
634 635 636
  if (ref == 0) {
    indexCacheDestroy(cache);
  }
dengyihao's avatar
dengyihao 已提交
637
}
638 639

void indexMemRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
640 641 642
  if (tbl == NULL) {
    return;
  }
643 644 645 646
  int ref = T_REF_INC(tbl);
  UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
647 648 649
  if (tbl == NULL) {
    return;
  }
650 651 652 653
  int ref = T_REF_DEC(tbl);
  if (ref == 0) {
    SSkipList* slt = tbl->mem;
    indexCacheDestroySkiplist(slt);
wafwerar's avatar
wafwerar 已提交
654
    taosMemoryFree(tbl);
655 656 657
  }
}

dengyihao's avatar
dengyihao 已提交
658
static void indexCacheTermDestroy(CacheTerm* ct) {
dengyihao's avatar
dengyihao 已提交
659 660 661
  if (ct == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
662 663
  taosMemoryFree(ct->colVal);
  taosMemoryFree(ct);
664
}
dengyihao's avatar
dengyihao 已提交
665
static char* indexCacheTermGet(const void* pData) {
666 667 668
  CacheTerm* p = (CacheTerm*)pData;
  return (char*)p;
}
dengyihao's avatar
dengyihao 已提交
669
static int32_t indexCacheTermCompare(const void* l, const void* r) {
670 671 672
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;
  // compare colVal
dengyihao's avatar
dengyihao 已提交
673
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
dengyihao's avatar
dengyihao 已提交
674 675 676
  if (cmp == 0) {
    return rt->version - lt->version;
  }
dengyihao's avatar
dengyihao 已提交
677
  return cmp;
678 679
}

680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715
static int indexFindCh(char* a, char c) {
  char* p = a;
  while (*p != 0 && *p++ != c) {
  }
  return p - a;
}
static int indexCacheJsonTermCompareImpl(char* a, char* b) {
  int alen = indexFindCh(a, '&');
  int blen = indexFindCh(b, '&');

  int cmp = strncmp(a, b, MIN(alen, blen));
  if (cmp == 0) {
    cmp = alen - blen;
    if (cmp != 0) {
      return cmp;
    }
    cmp = *(a + alen) - *(b + blen);
    if (cmp != 0) {
      return cmp;
    }
    alen += 2;
    blen += 2;
    cmp = strcmp(a + alen, b + blen);
  }
  return cmp;
}
static int32_t indexCacheJsonTermCompare(const void* l, const void* r) {
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;
  // compare colVal
  int cmp = indexCacheJsonTermCompareImpl(lt->colVal, rt->colVal);
  if (cmp == 0) {
    return rt->version - lt->version;
  }
  return cmp;
}
716
static MemTable* indexInternalCacheCreate(int8_t type) {
717 718 719
  int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;
  int32_t (*cmpFn)(const void* l, const void* r) =
      INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? indexCacheJsonTermCompare : indexCacheTermCompare;
dengyihao's avatar
dengyihao 已提交
720

wafwerar's avatar
wafwerar 已提交
721
  MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
722
  indexMemRef(tbl);
723 724 725
  if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) {
    tbl->mem =
        tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, indexCacheTermGet);
726 727 728 729 730 731 732
  }
  return tbl;
}

static void doMergeWork(SSchedMsg* msg) {
  IndexCache* pCache = msg->ahandle;
  SIndex*     sidx = (SIndex*)pCache->index;
dengyihao's avatar
dengyihao 已提交
733
  indexFlushCacheToTFile(sidx, pCache);
734 735 736
}
static bool indexCacheIteratorNext(Iterate* itera) {
  SSkipListIterator* iter = itera->iter;
dengyihao's avatar
dengyihao 已提交
737 738 739
  if (iter == NULL) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
740
  IterateValue* iv = &itera->val;
741 742 743 744 745 746 747 748
  iterateValueDestroy(iv, false);

  bool next = tSkipListIterNext(iter);
  if (next) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);

    iv->type = ct->operaType;
dengyihao's avatar
dengyihao 已提交
749
    iv->ver = ct->version;
dengyihao's avatar
dengyihao 已提交
750
    iv->colVal = tstrdup(ct->colVal);
751 752 753 754 755 756

    taosArrayPush(iv->val, &ct->uid);
  }
  return next;
}

dengyihao's avatar
dengyihao 已提交
757 758 759 760
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
  // opt later
  return &iter->val;
}