indexCache.c 24.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17 18
#include "indexCache.h"
#include "indexComm.h"
#include "indexUtil.h"
19
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
20
#include "tsched.h"
dengyihao's avatar
dengyihao 已提交
21

22
#define MAX_INDEX_KEY_LEN 256  // test only, change later
dengyihao's avatar
dengyihao 已提交
23

dengyihao's avatar
dengyihao 已提交
24
#define MEM_TERM_LIMIT     10 * 10000
dengyihao's avatar
dengyihao 已提交
25 26
#define MEM_THRESHOLD      128 * 1024 * 1024  // 8M
#define MEM_SIGNAL_QUIT    MEM_THRESHOLD * 5
dengyihao's avatar
dengyihao 已提交
27
#define MEM_ESTIMATE_RADIO 1.5
dengyihao's avatar
dengyihao 已提交
28

dengyihao's avatar
dengyihao 已提交
29 30
static void idxMemRef(MemTable* tbl);
static void idxMemUnRef(MemTable* tbl);
31

dengyihao's avatar
dengyihao 已提交
32 33 34 35
static void    idxCacheTermDestroy(CacheTerm* ct);
static int32_t idxCacheTermCompare(const void* l, const void* r);
static int32_t idxCacheJsonTermCompare(const void* l, const void* r);
static char*   idxCacheTermGet(const void* pData);
36

dengyihao's avatar
dengyihao 已提交
37
static MemTable* idxInternalCacheCreate(int8_t type);
38

dengyihao's avatar
dengyihao 已提交
39 40 41 42 43 44 45 46 47
static int32_t cacheSearchTerm(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchRegex(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
dengyihao's avatar
dengyihao 已提交
48
/*comm compare func, used in (LE/LT/GE/GT compare)*/
dengyihao's avatar
dengyihao 已提交
49 50
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s, RangeType type);
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
dengyihao's avatar
dengyihao 已提交
51
static int32_t cacheSearchEqual_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
dengyihao's avatar
dengyihao 已提交
52 53 54 55 56 57 58 59 60 61
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);

static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s,
dengyihao's avatar
dengyihao 已提交
62
                                           RangeType type);
dengyihao's avatar
dengyihao 已提交
63

dengyihao's avatar
dengyihao 已提交
64
static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s) = {
dengyihao's avatar
dengyihao 已提交
65 66
    {cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual,
     cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange},
dengyihao's avatar
dengyihao 已提交
67
    {cacheSearchEqual_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON,
dengyihao's avatar
dengyihao 已提交
68 69
     cacheSearchLessThan_JSON, cacheSearchLessEqual_JSON, cacheSearchGreaterThan_JSON, cacheSearchGreaterEqual_JSON,
     cacheSearchRange_JSON}};
dengyihao's avatar
dengyihao 已提交
70

dengyihao's avatar
dengyihao 已提交
71
static void idxDoMergeWork(SSchedMsg* msg);
dengyihao's avatar
dengyihao 已提交
72
static bool idxCacheIteratorNext(Iterate* itera);
dengyihao's avatar
dengyihao 已提交
73

dengyihao's avatar
dengyihao 已提交
74
static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
75 76 77
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
78 79
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;
dengyihao's avatar
dengyihao 已提交
80

dengyihao's avatar
dengyihao 已提交
81 82
  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
dengyihao's avatar
dengyihao 已提交
83
  pCt->version = atomic_load_64(&pCache->version);
dengyihao's avatar
dengyihao 已提交
84

dengyihao's avatar
dengyihao 已提交
85
  char* key = idxCacheTermGet(pCt);
dengyihao's avatar
dengyihao 已提交
86 87 88 89 90 91 92 93

  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
94
    if (0 == strcmp(c->colVal, pCt->colVal) && strlen(pCt->colVal) == strlen(c->colVal)) {
dengyihao's avatar
dengyihao 已提交
95
      if (c->operaType == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
96
        INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
dengyihao's avatar
dengyihao 已提交
97 98
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
99
        INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
dengyihao's avatar
dengyihao 已提交
100 101 102 103 104
      }
    } else {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
105 106

  taosMemoryFree(pCt);
dengyihao's avatar
dengyihao 已提交
107
  tSkipListDestroyIter(iter);
dengyihao's avatar
dengyihao 已提交
108 109
  return 0;
}
dengyihao's avatar
dengyihao 已提交
110
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
111 112 113
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
114
static int32_t cacheSearchSuffix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
115 116 117
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
118
static int32_t cacheSearchRegex(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
119 120 121
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
122
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s, RangeType type) {
dengyihao's avatar
dengyihao 已提交
123 124 125
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
126 127 128
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;

dengyihao's avatar
dengyihao 已提交
129
  _cache_range_compare cmpFn = idxGetCompare(type);
dengyihao's avatar
dengyihao 已提交
130

dengyihao's avatar
dengyihao 已提交
131 132
  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
dengyihao's avatar
dengyihao 已提交
133
  pCt->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
134
  pCt->version = atomic_load_64(&pCache->version);
dengyihao's avatar
dengyihao 已提交
135

dengyihao's avatar
dengyihao 已提交
136
  char* key = idxCacheTermGet(pCt);
dengyihao's avatar
dengyihao 已提交
137 138 139 140 141 142 143 144

  SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
145
    TExeCond   cond = cmpFn(c->colVal, pCt->colVal, pCt->colType);
dengyihao's avatar
dengyihao 已提交
146 147
    if (cond == MATCH) {
      if (c->operaType == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
148
        INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
dengyihao's avatar
dengyihao 已提交
149 150 151
        // taosArrayPush(result, &c->uid);
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
152
        INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
dengyihao's avatar
dengyihao 已提交
153 154
      }
    } else if (cond == CONTINUE) {
dengyihao's avatar
dengyihao 已提交
155
      continue;
dengyihao's avatar
dengyihao 已提交
156 157 158 159
    } else if (cond == BREAK) {
      break;
    }
  }
dengyihao's avatar
dengyihao 已提交
160
  taosMemoryFree(pCt);
dengyihao's avatar
dengyihao 已提交
161 162 163
  tSkipListDestroyIter(iter);
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
164
static int32_t cacheSearchLessThan(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
165 166
  return cacheSearchCompareFunc(cache, term, tr, s, LT);
}
dengyihao's avatar
dengyihao 已提交
167
static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
168
  return cacheSearchCompareFunc(cache, term, tr, s, LE);
dengyihao's avatar
dengyihao 已提交
169
}
dengyihao's avatar
dengyihao 已提交
170
static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
171 172
  return cacheSearchCompareFunc(cache, term, tr, s, GT);
}
dengyihao's avatar
dengyihao 已提交
173
static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
174 175 176
  return cacheSearchCompareFunc(cache, term, tr, s, GE);
}

dengyihao's avatar
dengyihao 已提交
177
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
178 179 180 181 182 183 184 185
  if (cache == NULL) {
    return 0;
  }
  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;

  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
dengyihao's avatar
dengyihao 已提交
186
  pCt->version = atomic_load_64(&pCache->version);
dengyihao's avatar
dengyihao 已提交
187 188

  char* exBuf = NULL;
dengyihao's avatar
dengyihao 已提交
189
  if (IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
dengyihao's avatar
dengyihao 已提交
190
    exBuf = idxPackJsonData(term);
dengyihao's avatar
dengyihao 已提交
191 192
    pCt->colVal = exBuf;
  }
dengyihao's avatar
dengyihao 已提交
193
  char* key = idxCacheTermGet(pCt);
dengyihao's avatar
dengyihao 已提交
194 195 196 197 198 199 200 201

  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
202

dengyihao's avatar
dengyihao 已提交
203 204
    if (0 == strcmp(c->colVal, pCt->colVal)) {
      if (c->operaType == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
205
        INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
dengyihao's avatar
dengyihao 已提交
206 207
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
208
        INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
dengyihao's avatar
dengyihao 已提交
209 210 211 212 213 214 215 216 217 218 219
      }
    } else {
      break;
    }
  }

  taosMemoryFree(pCt);
  taosMemoryFree(exBuf);
  tSkipListDestroyIter(iter);
  return 0;

dengyihao's avatar
dengyihao 已提交
220 221
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
222
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
223 224
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
225
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
226
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
227
}
dengyihao's avatar
dengyihao 已提交
228 229 230 231 232 233
static int32_t cacheSearchEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, EQ);
}
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, CONTAINS);
}
dengyihao's avatar
dengyihao 已提交
234
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
235
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT);
dengyihao's avatar
dengyihao 已提交
236
}
dengyihao's avatar
dengyihao 已提交
237
static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
238
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, LE);
dengyihao's avatar
dengyihao 已提交
239
}
dengyihao's avatar
dengyihao 已提交
240
static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
241 242
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, GT);
}
dengyihao's avatar
dengyihao 已提交
243
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
244 245
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE);
}
dengyihao's avatar
dengyihao 已提交
246 247 248
static int32_t cacheSearchContain_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
  return cacheSearchCompareFunc_JSON(cache, term, tr, s, CONTAINS);
}
dengyihao's avatar
dengyihao 已提交
249
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
250 251 252
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
253
static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s,
dengyihao's avatar
dengyihao 已提交
254
                                           RangeType type) {
dengyihao's avatar
dengyihao 已提交
255 256 257
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
258
  _cache_range_compare cmpFn = idxGetCompare(type);
dengyihao's avatar
dengyihao 已提交
259 260 261 262 263 264

  MemTable*   mem = cache;
  IndexCache* pCache = mem->pCache;

  CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
  pCt->colVal = term->colVal;
dengyihao's avatar
dengyihao 已提交
265
  pCt->version = atomic_load_64(&pCache->version);
dengyihao's avatar
dengyihao 已提交
266

dengyihao's avatar
dengyihao 已提交
267
  int8_t dType = IDX_TYPE_GET_TYPE(term->colType);
dengyihao's avatar
dengyihao 已提交
268 269
  int    skip = 0;
  char*  exBuf = NULL;
dengyihao's avatar
dengyihao 已提交
270 271 272 273 274 275
  if (type == CONTAINS) {
    SIndexTerm tm = {.suid = term->suid,
                     .operType = term->operType,
                     .colType = term->colType,
                     .colName = term->colVal,
                     .nColName = term->nColVal};
dengyihao's avatar
dengyihao 已提交
276
    exBuf = idxPackJsonDataPrefixNoType(&tm, &skip);
dengyihao's avatar
dengyihao 已提交
277 278
    pCt->colVal = exBuf;
  } else {
dengyihao's avatar
dengyihao 已提交
279
    exBuf = idxPackJsonDataPrefix(term, &skip);
dengyihao's avatar
dengyihao 已提交
280 281
    pCt->colVal = exBuf;
  }
dengyihao's avatar
dengyihao 已提交
282
  char* key = idxCacheTermGet(pCt);
dengyihao's avatar
dengyihao 已提交
283 284 285 286 287 288 289 290

  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node == NULL) {
      break;
    }
    CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
291 292 293 294 295 296
    TExeCond   cond = CONTINUE;
    if (type == CONTAINS) {
      if (0 == strncmp(c->colVal, pCt->colVal, skip)) {
        cond = MATCH;
      }
    } else {
dengyihao's avatar
dengyihao 已提交
297
      if (0 != strncmp(c->colVal, pCt->colVal, skip - 1)) {
dengyihao's avatar
dengyihao 已提交
298
        break;
dengyihao's avatar
dengyihao 已提交
299 300 301 302 303 304
      } else if (0 != strncmp(c->colVal, pCt->colVal, skip)) {
        continue;
      } else {
        char* p = taosMemoryCalloc(1, strlen(c->colVal) + 1);
        memcpy(p, c->colVal, strlen(c->colVal));
        cond = cmpFn(p + skip, term->colVal, dType);
dengyihao's avatar
dengyihao 已提交
305
        taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
306
      }
307
    }
dengyihao's avatar
dengyihao 已提交
308 309
    if (cond == MATCH) {
      if (c->operaType == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
310
        INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
dengyihao's avatar
dengyihao 已提交
311 312
        *s = kTypeValue;
      } else if (c->operaType == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
313
        INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
dengyihao's avatar
dengyihao 已提交
314 315 316 317 318 319 320 321 322 323 324 325
      }
    } else if (cond == CONTINUE) {
      continue;
    } else if (cond == BREAK) {
      break;
    }
  }

  taosMemoryFree(pCt);
  taosMemoryFree(exBuf);
  tSkipListDestroyIter(iter);

dengyihao's avatar
dengyihao 已提交
326 327
  return TSDB_CODE_SUCCESS;
}
dengyihao's avatar
dengyihao 已提交
328
static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
329 330 331
  // impl later
  return 0;
}
dengyihao's avatar
dengyihao 已提交
332
static IterateValue* idxCacheIteratorGetValue(Iterate* iter);
dengyihao's avatar
dengyihao 已提交
333

dengyihao's avatar
dengyihao 已提交
334
IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
wafwerar's avatar
wafwerar 已提交
335
  IndexCache* cache = taosMemoryCalloc(1, sizeof(IndexCache));
dengyihao's avatar
dengyihao 已提交
336 337 338
  if (cache == NULL) {
    indexError("failed to create index cache");
    return NULL;
dengyihao's avatar
dengyihao 已提交
339
  };
dengyihao's avatar
dengyihao 已提交
340

dengyihao's avatar
dengyihao 已提交
341
  cache->mem = idxInternalCacheCreate(type);
dengyihao's avatar
dengyihao 已提交
342
  cache->mem->pCache = cache;
dengyihao's avatar
dengyihao 已提交
343
  cache->colName = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
dengyihao's avatar
dengyihao 已提交
344 345 346
  cache->type = type;
  cache->index = idx;
  cache->version = 0;
dengyihao's avatar
dengyihao 已提交
347
  cache->suid = suid;
dengyihao's avatar
dengyihao 已提交
348
  cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
349

wafwerar's avatar
wafwerar 已提交
350 351
  taosThreadMutexInit(&cache->mtx, NULL);
  taosThreadCondInit(&cache->finished, NULL);
dengyihao's avatar
dengyihao 已提交
352

dengyihao's avatar
dengyihao 已提交
353
  idxCacheRef(cache);
354
  if (idx != NULL) {
dengyihao's avatar
dengyihao 已提交
355
    idxAcquireRef(idx->refId);
356
  }
dengyihao's avatar
dengyihao 已提交
357 358
  return cache;
}
dengyihao's avatar
dengyihao 已提交
359
void idxCacheDebug(IndexCache* cache) {
360 361
  MemTable* tbl = NULL;

wafwerar's avatar
wafwerar 已提交
362
  taosThreadMutexLock(&cache->mtx);
363
  tbl = cache->mem;
dengyihao's avatar
dengyihao 已提交
364
  idxMemRef(tbl);
wafwerar's avatar
wafwerar 已提交
365
  taosThreadMutexUnlock(&cache->mtx);
366

dengyihao's avatar
dengyihao 已提交
367 368 369 370 371 372 373 374
  {
    SSkipList*         slt = tbl->mem;
    SSkipListIterator* iter = tSkipListCreateIter(slt);
    while (tSkipListIterNext(iter)) {
      SSkipListNode* node = tSkipListIterGet(iter);
      CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (ct != NULL) {
        // TODO, add more debug info
dengyihao's avatar
dengyihao 已提交
375
        indexInfo("{colVal: %s, version: %" PRId64 "} \t", ct->colVal, ct->version);
dengyihao's avatar
dengyihao 已提交
376
      }
377
    }
dengyihao's avatar
dengyihao 已提交
378 379
    tSkipListDestroyIter(iter);

dengyihao's avatar
dengyihao 已提交
380
    idxMemUnRef(tbl);
381
  }
382

dengyihao's avatar
dengyihao 已提交
383
  {
wafwerar's avatar
wafwerar 已提交
384
    taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
385
    tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
386
    idxMemRef(tbl);
wafwerar's avatar
wafwerar 已提交
387
    taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
388 389 390 391 392 393 394 395
    if (tbl != NULL) {
      SSkipList*         slt = tbl->mem;
      SSkipListIterator* iter = tSkipListCreateIter(slt);
      while (tSkipListIterNext(iter)) {
        SSkipListNode* node = tSkipListIterGet(iter);
        CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
        if (ct != NULL) {
          // TODO, add more debug info
dengyihao's avatar
dengyihao 已提交
396
          indexInfo("{colVal: %s, version: %" PRId64 "} \t", ct->colVal, ct->version);
dengyihao's avatar
dengyihao 已提交
397 398 399 400 401
        }
      }
      tSkipListDestroyIter(iter);
    }

dengyihao's avatar
dengyihao 已提交
402
    idxMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
403
  }
404
}
dengyihao's avatar
dengyihao 已提交
405

dengyihao's avatar
dengyihao 已提交
406
void idxCacheDestroySkiplist(SSkipList* slt) {
dengyihao's avatar
dengyihao 已提交
407
  SSkipListIterator* iter = tSkipListCreateIter(slt);
dengyihao's avatar
dengyihao 已提交
408
  while (iter != NULL && tSkipListIterNext(iter)) {
dengyihao's avatar
dengyihao 已提交
409 410
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
411
    if (ct != NULL) {
wafwerar's avatar
wafwerar 已提交
412 413
      taosMemoryFree(ct->colVal);
      taosMemoryFree(ct);
dengyihao's avatar
dengyihao 已提交
414
    }
dengyihao's avatar
dengyihao 已提交
415 416
  }
  tSkipListDestroyIter(iter);
417
  tSkipListDestroy(slt);
dengyihao's avatar
dengyihao 已提交
418
}
dengyihao's avatar
dengyihao 已提交
419
void idxCacheBroadcast(void* cache) {
dengyihao's avatar
dengyihao 已提交
420 421 422
  IndexCache* pCache = cache;
  taosThreadCondBroadcast(&pCache->finished);
}
dengyihao's avatar
dengyihao 已提交
423
void idxCacheWait(void* cache) {
dengyihao's avatar
dengyihao 已提交
424 425 426
  IndexCache* pCache = cache;
  taosThreadCondWait(&pCache->finished, &pCache->mtx);
}
dengyihao's avatar
dengyihao 已提交
427
void idxCacheDestroyImm(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
428 429 430
  if (cache == NULL) {
    return;
  }
431
  MemTable* tbl = NULL;
wafwerar's avatar
wafwerar 已提交
432
  taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
433

434
  tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
435
  cache->imm = NULL;  // or throw int bg thread
dengyihao's avatar
dengyihao 已提交
436
  idxCacheBroadcast(cache);
dengyihao's avatar
dengyihao 已提交
437

wafwerar's avatar
wafwerar 已提交
438
  taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
439

dengyihao's avatar
dengyihao 已提交
440 441
  idxMemUnRef(tbl);
  idxMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
442
}
dengyihao's avatar
dengyihao 已提交
443
void idxCacheDestroy(void* cache) {
dengyihao's avatar
dengyihao 已提交
444
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
445 446 447
  if (pCache == NULL) {
    return;
  }
448

dengyihao's avatar
dengyihao 已提交
449 450
  idxMemUnRef(pCache->mem);
  idxMemUnRef(pCache->imm);
wafwerar's avatar
wafwerar 已提交
451
  taosMemoryFree(pCache->colName);
dengyihao's avatar
dengyihao 已提交
452

wafwerar's avatar
wafwerar 已提交
453 454
  taosThreadMutexDestroy(&pCache->mtx);
  taosThreadCondDestroy(&pCache->finished);
455
  if (pCache->index != NULL) {
dengyihao's avatar
dengyihao 已提交
456
    idxReleaseRef(((SIndex*)pCache->index)->refId);
457
  }
wafwerar's avatar
wafwerar 已提交
458
  taosMemoryFree(pCache);
dengyihao's avatar
dengyihao 已提交
459 460
}

dengyihao's avatar
dengyihao 已提交
461
Iterate* idxCacheIteratorCreate(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
462 463 464
  if (cache->imm == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
465 466
  Iterate* iter = taosMemoryCalloc(1, sizeof(Iterate));
  if (iter == NULL) {
dengyihao's avatar
dengyihao 已提交
467 468
    return NULL;
  }
wafwerar's avatar
wafwerar 已提交
469
  taosThreadMutexLock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
470

dengyihao's avatar
dengyihao 已提交
471
  idxMemRef(cache->imm);
dengyihao's avatar
dengyihao 已提交
472

473
  MemTable* tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
474 475 476 477 478
  iter->val.val = taosArrayInit(1, sizeof(uint64_t));
  iter->val.colVal = NULL;
  iter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
  iter->next = idxCacheIteratorNext;
  iter->getValue = idxCacheIteratorGetValue;
dengyihao's avatar
dengyihao 已提交
479

wafwerar's avatar
wafwerar 已提交
480
  taosThreadMutexUnlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
481

dengyihao's avatar
dengyihao 已提交
482
  return iter;
dengyihao's avatar
dengyihao 已提交
483
}
dengyihao's avatar
dengyihao 已提交
484
void idxCacheIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
485 486 487
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
488 489
  tSkipListDestroyIter(iter->iter);
  iterateValueDestroy(&iter->val, true);
wafwerar's avatar
wafwerar 已提交
490
  taosMemoryFree(iter);
dengyihao's avatar
dengyihao 已提交
491
}
dengyihao's avatar
dengyihao 已提交
492

dengyihao's avatar
dengyihao 已提交
493
int idxCacheSchedToMerge(IndexCache* pCache, bool notify) {
dengyihao's avatar
dengyihao 已提交
494
  SSchedMsg schedMsg = {0};
dengyihao's avatar
dengyihao 已提交
495
  schedMsg.fp = idxDoMergeWork;
dengyihao's avatar
dengyihao 已提交
496
  schedMsg.ahandle = pCache;
dengyihao's avatar
dengyihao 已提交
497 498 499
  if (notify) {
    schedMsg.thandle = taosMemoryMalloc(1);
  }
dengyihao's avatar
dengyihao 已提交
500
  schedMsg.msg = NULL;
dengyihao's avatar
dengyihao 已提交
501
  idxAcquireRef(pCache->index->refId);
dengyihao's avatar
dengyihao 已提交
502
  taosScheduleTask(indexQhandle, &schedMsg);
503
  return 0;
dengyihao's avatar
dengyihao 已提交
504
}
505

dengyihao's avatar
dengyihao 已提交
506
static void idxCacheMakeRoomForWrite(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
507
  while (true) {
dengyihao's avatar
dengyihao 已提交
508
    if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
dengyihao's avatar
dengyihao 已提交
509 510 511
      break;
    } else if (cache->imm != NULL) {
      // TODO: wake up by condition variable
dengyihao's avatar
dengyihao 已提交
512
      idxCacheWait(cache);
dengyihao's avatar
dengyihao 已提交
513
    } else {
dengyihao's avatar
dengyihao 已提交
514
      bool quit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false;
dengyihao's avatar
dengyihao 已提交
515

dengyihao's avatar
dengyihao 已提交
516
      idxCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
517
      cache->imm = cache->mem;
dengyihao's avatar
dengyihao 已提交
518
      cache->mem = idxInternalCacheCreate(cache->type);
dengyihao's avatar
dengyihao 已提交
519

dengyihao's avatar
dengyihao 已提交
520
      cache->mem->pCache = cache;
dengyihao's avatar
dengyihao 已提交
521
      cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
522 523 524
      if (quit == false) {
        atomic_store_32(&cache->merging, 1);
      }
dengyihao's avatar
dengyihao 已提交
525 526
      // 1. sched to merge
      // 2. unref cache in bgwork
dengyihao's avatar
dengyihao 已提交
527
      idxCacheSchedToMerge(cache, quit);
dengyihao's avatar
dengyihao 已提交
528 529 530
    }
  }
}
dengyihao's avatar
dengyihao 已提交
531
int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
532 533 534
  if (cache == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
535
  bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
536

dengyihao's avatar
dengyihao 已提交
537
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
538
  idxCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
539
  // encode data
wafwerar's avatar
wafwerar 已提交
540
  CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm));
dengyihao's avatar
dengyihao 已提交
541
  if (ct == NULL) {
dengyihao's avatar
dengyihao 已提交
542 543
    return -1;
  }
544 545
  // set up key
  ct->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
546
  if (hasJson) {
dengyihao's avatar
dengyihao 已提交
547
    ct->colVal = idxPackJsonData(term);
dengyihao's avatar
dengyihao 已提交
548
  } else {
wafwerar's avatar
wafwerar 已提交
549
    ct->colVal = (char*)taosMemoryCalloc(1, sizeof(char) * (term->nColVal + 1));
dengyihao's avatar
dengyihao 已提交
550 551
    memcpy(ct->colVal, term->colVal, term->nColVal);
  }
dengyihao's avatar
dengyihao 已提交
552
  ct->version = atomic_add_fetch_64(&pCache->version, 1);
dengyihao's avatar
dengyihao 已提交
553
  // set value
554 555
  ct->uid = uid;
  ct->operaType = term->operType;
dengyihao's avatar
dengyihao 已提交
556
  // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
557
  int64_t estimate = sizeof(ct) + strlen(ct->colVal);
dengyihao's avatar
dengyihao 已提交
558

wafwerar's avatar
wafwerar 已提交
559
  taosThreadMutexLock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
560
  pCache->occupiedMem += estimate;
dengyihao's avatar
dengyihao 已提交
561
  idxCacheMakeRoomForWrite(pCache);
562
  MemTable* tbl = pCache->mem;
dengyihao's avatar
dengyihao 已提交
563
  idxMemRef(tbl);
564
  tSkipListPut(tbl->mem, (char*)ct);
dengyihao's avatar
dengyihao 已提交
565
  idxMemUnRef(tbl);
566

wafwerar's avatar
wafwerar 已提交
567
  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
568
  idxCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
569
  return 0;
dengyihao's avatar
dengyihao 已提交
570
}
dengyihao's avatar
dengyihao 已提交
571
void idxCacheForceToMerge(void* cache) {
dengyihao's avatar
dengyihao 已提交
572
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
573

dengyihao's avatar
dengyihao 已提交
574
  idxCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
575 576 577
  taosThreadMutexLock(&pCache->mtx);

  indexInfo("%p is forced to merge into tfile", pCache);
dengyihao's avatar
dengyihao 已提交
578
  pCache->occupiedMem += MEM_SIGNAL_QUIT;
dengyihao's avatar
dengyihao 已提交
579
  idxCacheMakeRoomForWrite(pCache);
dengyihao's avatar
dengyihao 已提交
580 581

  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
582
  idxCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
583 584
  return;
}
dengyihao's avatar
dengyihao 已提交
585
int idxCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
dengyihao's avatar
dengyihao 已提交
586
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
587
  return 0;
dengyihao's avatar
dengyihao 已提交
588
}
589

dengyihao's avatar
dengyihao 已提交
590
static int32_t idxQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
591 592 593
  if (mem == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
594 595 596 597

  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;

dengyihao's avatar
dengyihao 已提交
598
  if (IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
dengyihao's avatar
dengyihao 已提交
599 600 601 602
    return cacheSearch[1][qtype](mem, term, tr, s);
  } else {
    return cacheSearch[0][qtype](mem, term, tr, s);
  }
dengyihao's avatar
dengyihao 已提交
603
}
dengyihao's avatar
dengyihao 已提交
604
int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
605 606 607
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
608

dengyihao's avatar
dengyihao 已提交
609 610 611
  IndexCache* pCache = cache;

  MemTable *mem = NULL, *imm = NULL;
wafwerar's avatar
wafwerar 已提交
612
  taosThreadMutexLock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
613 614
  mem = pCache->mem;
  imm = pCache->imm;
dengyihao's avatar
dengyihao 已提交
615 616
  idxMemRef(mem);
  idxMemRef(imm);
wafwerar's avatar
wafwerar 已提交
617
  taosThreadMutexUnlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
618

dengyihao's avatar
dengyihao 已提交
619 620
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
621
  int ret = (mem && mem->mem) ? idxQueryMem(mem, query, result, s) : 0;
dengyihao's avatar
dengyihao 已提交
622 623
  if (ret == 0 && *s != kTypeDeletion) {
    // continue search in imm
dengyihao's avatar
dengyihao 已提交
624
    ret = (imm && imm->mem) ? idxQueryMem(imm, query, result, s) : 0;
dengyihao's avatar
dengyihao 已提交
625
  }
dengyihao's avatar
dengyihao 已提交
626

dengyihao's avatar
dengyihao 已提交
627 628
  idxMemUnRef(mem);
  idxMemUnRef(imm);
dengyihao's avatar
add UT  
dengyihao 已提交
629
  indexInfo("cache search, time cost %" PRIu64 "us", taosGetTimestampUs() - st);
dengyihao's avatar
dengyihao 已提交
630 631

  return ret;
dengyihao's avatar
dengyihao 已提交
632
}
dengyihao's avatar
dengyihao 已提交
633

dengyihao's avatar
dengyihao 已提交
634
void idxCacheRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
635 636 637
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
638 639 640
  int ref = T_REF_INC(cache);
  UNUSED(ref);
}
dengyihao's avatar
dengyihao 已提交
641
void idxCacheUnRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
642 643 644
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
645
  int ref = T_REF_DEC(cache);
dengyihao's avatar
dengyihao 已提交
646
  if (ref == 0) {
dengyihao's avatar
dengyihao 已提交
647
    idxCacheDestroy(cache);
dengyihao's avatar
dengyihao 已提交
648
  }
dengyihao's avatar
dengyihao 已提交
649
}
650

dengyihao's avatar
dengyihao 已提交
651
void idxMemRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
652 653 654
  if (tbl == NULL) {
    return;
  }
655 656 657
  int ref = T_REF_INC(tbl);
  UNUSED(ref);
}
dengyihao's avatar
dengyihao 已提交
658
void idxMemUnRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
659 660 661
  if (tbl == NULL) {
    return;
  }
662 663 664
  int ref = T_REF_DEC(tbl);
  if (ref == 0) {
    SSkipList* slt = tbl->mem;
dengyihao's avatar
dengyihao 已提交
665
    idxCacheDestroySkiplist(slt);
wafwerar's avatar
wafwerar 已提交
666
    taosMemoryFree(tbl);
667 668 669
  }
}

dengyihao's avatar
dengyihao 已提交
670
static void idxCacheTermDestroy(CacheTerm* ct) {
dengyihao's avatar
dengyihao 已提交
671 672 673
  if (ct == NULL) {
    return;
  }
wafwerar's avatar
wafwerar 已提交
674 675
  taosMemoryFree(ct->colVal);
  taosMemoryFree(ct);
676
}
dengyihao's avatar
dengyihao 已提交
677
static char* idxCacheTermGet(const void* pData) {
678 679 680
  CacheTerm* p = (CacheTerm*)pData;
  return (char*)p;
}
dengyihao's avatar
dengyihao 已提交
681
static int32_t idxCacheTermCompare(const void* l, const void* r) {
682 683 684
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;
  // compare colVal
dengyihao's avatar
dengyihao 已提交
685
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
dengyihao's avatar
dengyihao 已提交
686
  if (cmp == 0) {
dengyihao's avatar
dengyihao 已提交
687 688 689 690 691
    if (rt->version == lt->version) {
      cmp = 0;
    } else {
      cmp = rt->version < lt->version ? -1 : 1;
    }
dengyihao's avatar
dengyihao 已提交
692
  }
dengyihao's avatar
dengyihao 已提交
693
  return cmp;
694 695
}

dengyihao's avatar
dengyihao 已提交
696
static int idxFindCh(char* a, char c) {
697 698 699 700 701
  char* p = a;
  while (*p != 0 && *p++ != c) {
  }
  return p - a;
}
dengyihao's avatar
dengyihao 已提交
702
static int idxCacheJsonTermCompareImpl(char* a, char* b) {
dengyihao's avatar
dengyihao 已提交
703 704
  // int alen = idxFindCh(a, '&');
  // int blen = idxFindCh(b, '&');
705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720

  // int cmp = strncmp(a, b, MIN(alen, blen));
  // if (cmp == 0) {
  //  cmp = alen - blen;
  //  if (cmp != 0) {
  //    return cmp;
  //  }
  //  cmp = *(a + alen) - *(b + blen);
  //  if (cmp != 0) {
  //    return cmp;
  //  }
  //  alen += 2;
  //  blen += 2;
  //  cmp = strcmp(a + alen, b + blen);
  //}
  return 0;
721
}
dengyihao's avatar
dengyihao 已提交
722
static int32_t idxCacheJsonTermCompare(const void* l, const void* r) {
723 724 725
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;
  // compare colVal
726
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
727 728 729 730 731
  if (cmp == 0) {
    return rt->version - lt->version;
  }
  return cmp;
}
dengyihao's avatar
dengyihao 已提交
732
static MemTable* idxInternalCacheCreate(int8_t type) {
dengyihao's avatar
dengyihao 已提交
733 734 735
  // int ttype = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY :
  // TSDB_DATA_TYPE_BINARY;
  int ttype = TSDB_DATA_TYPE_BINARY;
736
  int32_t (*cmpFn)(const void* l, const void* r) =
dengyihao's avatar
dengyihao 已提交
737
      IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? idxCacheJsonTermCompare : idxCacheTermCompare;
dengyihao's avatar
dengyihao 已提交
738

wafwerar's avatar
wafwerar 已提交
739
  MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
dengyihao's avatar
dengyihao 已提交
740
  idxMemRef(tbl);
dengyihao's avatar
dengyihao 已提交
741 742 743
  // if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) {
  tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, idxCacheTermGet);
  //}
744 745 746
  return tbl;
}

dengyihao's avatar
dengyihao 已提交
747
static void idxDoMergeWork(SSchedMsg* msg) {
748 749
  IndexCache* pCache = msg->ahandle;
  SIndex*     sidx = (SIndex*)pCache->index;
dengyihao's avatar
dengyihao 已提交
750

dengyihao's avatar
dengyihao 已提交
751
  int quit = msg->thandle ? true : false;
dengyihao's avatar
dengyihao 已提交
752
  taosMemoryFree(msg->thandle);
dengyihao's avatar
dengyihao 已提交
753
  idxFlushCacheToTFile(sidx, pCache, quit);
754
}
dengyihao's avatar
dengyihao 已提交
755
static bool idxCacheIteratorNext(Iterate* itera) {
756
  SSkipListIterator* iter = itera->iter;
dengyihao's avatar
dengyihao 已提交
757 758 759
  if (iter == NULL) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
760
  IterateValue* iv = &itera->val;
761 762 763 764 765 766 767 768
  iterateValueDestroy(iv, false);

  bool next = tSkipListIterNext(iter);
  if (next) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);

    iv->type = ct->operaType;
dengyihao's avatar
dengyihao 已提交
769
    iv->ver = ct->version;
dengyihao's avatar
dengyihao 已提交
770
    iv->colVal = tstrdup(ct->colVal);
771 772 773 774 775
    taosArrayPush(iv->val, &ct->uid);
  }
  return next;
}

dengyihao's avatar
dengyihao 已提交
776
static IterateValue* idxCacheIteratorGetValue(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
777 778 779
  // opt later
  return &iter->val;
}