index_cache.c 11.4 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "index_cache.h"
dengyihao's avatar
add UT  
dengyihao 已提交
17
#include "index_comm.h"
dengyihao's avatar
dengyihao 已提交
18
#include "index_util.h"
19
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
20
#include "tsched.h"
dengyihao's avatar
dengyihao 已提交
21

22
#define MAX_INDEX_KEY_LEN 256  // test only, change later
dengyihao's avatar
dengyihao 已提交
23

dengyihao's avatar
dengyihao 已提交
24
#define MEM_TERM_LIMIT 10 * 10000
dengyihao's avatar
dengyihao 已提交
25
#define MEM_THRESHOLD 1024 * 1024
dengyihao's avatar
dengyihao 已提交
26
#define MEM_ESTIMATE_RADIO 1.5
dengyihao's avatar
dengyihao 已提交
27

dengyihao's avatar
dengyihao 已提交
28 29
static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl);
30

dengyihao's avatar
dengyihao 已提交
31 32 33
static void    indexCacheTermDestroy(CacheTerm* ct);
static int32_t indexCacheTermCompare(const void* l, const void* r);
static char*   indexCacheTermGet(const void* pData);
34

35
static MemTable* indexInternalCacheCreate(int8_t type);
36

dengyihao's avatar
dengyihao 已提交
37 38 39
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);

40
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
dengyihao's avatar
dengyihao 已提交
41

dengyihao's avatar
dengyihao 已提交
42
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
dengyihao's avatar
dengyihao 已提交
43
  IndexCache* cache = calloc(1, sizeof(IndexCache));
dengyihao's avatar
dengyihao 已提交
44 45 46
  if (cache == NULL) {
    indexError("failed to create index cache");
    return NULL;
dengyihao's avatar
dengyihao 已提交
47
  };
dengyihao's avatar
dengyihao 已提交
48

dengyihao's avatar
dengyihao 已提交
49
  cache->mem = indexInternalCacheCreate(type);
dengyihao's avatar
dengyihao 已提交
50
  cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
dengyihao's avatar
dengyihao 已提交
51 52 53
  cache->type = type;
  cache->index = idx;
  cache->version = 0;
dengyihao's avatar
dengyihao 已提交
54
  cache->suid = suid;
dengyihao's avatar
dengyihao 已提交
55
  cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
56

dengyihao's avatar
dengyihao 已提交
57
  pthread_mutex_init(&cache->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
58 59
  pthread_cond_init(&cache->finished, NULL);

dengyihao's avatar
dengyihao 已提交
60
  indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
61 62
  return cache;
}
63
void indexCacheDebug(IndexCache* cache) {
64 65 66 67 68 69 70
  MemTable* tbl = NULL;

  pthread_mutex_lock(&cache->mtx);
  tbl = cache->mem;
  indexMemRef(tbl);
  pthread_mutex_unlock(&cache->mtx);

dengyihao's avatar
dengyihao 已提交
71 72 73 74 75 76 77 78 79 80
  {
    SSkipList*         slt = tbl->mem;
    SSkipListIterator* iter = tSkipListCreateIter(slt);
    while (tSkipListIterNext(iter)) {
      SSkipListNode* node = tSkipListIterGet(iter);
      CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (ct != NULL) {
        // TODO, add more debug info
        indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
      }
81
    }
dengyihao's avatar
dengyihao 已提交
82 83 84
    tSkipListDestroyIter(iter);

    indexMemUnRef(tbl);
85
  }
86

dengyihao's avatar
dengyihao 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
  {
    pthread_mutex_lock(&cache->mtx);
    tbl = cache->imm;
    indexMemRef(tbl);
    pthread_mutex_unlock(&cache->mtx);
    if (tbl != NULL) {
      SSkipList*         slt = tbl->mem;
      SSkipListIterator* iter = tSkipListCreateIter(slt);
      while (tSkipListIterNext(iter)) {
        SSkipListNode* node = tSkipListIterGet(iter);
        CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
        if (ct != NULL) {
          // TODO, add more debug info
          indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
        }
      }
      tSkipListDestroyIter(iter);
    }

    indexMemUnRef(tbl);
  }
108
}
dengyihao's avatar
dengyihao 已提交
109

dengyihao's avatar
dengyihao 已提交
110 111 112 113 114
void indexCacheDestroySkiplist(SSkipList* slt) {
  SSkipListIterator* iter = tSkipListCreateIter(slt);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
115 116 117 118
    if (ct != NULL) {
      free(ct->colVal);
      free(ct);
    }
dengyihao's avatar
dengyihao 已提交
119 120
  }
  tSkipListDestroyIter(iter);
121
  tSkipListDestroy(slt);
dengyihao's avatar
dengyihao 已提交
122
}
dengyihao's avatar
dengyihao 已提交
123
void indexCacheDestroyImm(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
124 125 126
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
127

128
  MemTable* tbl = NULL;
dengyihao's avatar
dengyihao 已提交
129
  pthread_mutex_lock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
130

131
  tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
132
  cache->imm = NULL;  // or throw int bg thread
dengyihao's avatar
dengyihao 已提交
133
  pthread_cond_broadcast(&cache->finished);
dengyihao's avatar
dengyihao 已提交
134

dengyihao's avatar
dengyihao 已提交
135
  pthread_mutex_unlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
136 137

  indexMemUnRef(tbl);
138
  indexMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
139
}
dengyihao's avatar
dengyihao 已提交
140 141
void indexCacheDestroy(void* cache) {
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
142 143 144
  if (pCache == NULL) {
    return;
  }
145 146
  indexMemUnRef(pCache->mem);
  indexMemUnRef(pCache->imm);
dengyihao's avatar
dengyihao 已提交
147
  free(pCache->colName);
dengyihao's avatar
dengyihao 已提交
148

dengyihao's avatar
dengyihao 已提交
149 150 151
  pthread_mutex_destroy(&pCache->mtx);
  pthread_cond_destroy(&pCache->finished);

dengyihao's avatar
dengyihao 已提交
152
  free(pCache);
dengyihao's avatar
dengyihao 已提交
153 154
}

dengyihao's avatar
dengyihao 已提交
155 156
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
  Iterate* iiter = calloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
157 158 159
  if (iiter == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
160 161 162 163

  pthread_mutex_lock(&cache->mtx);

  indexMemRef(cache->imm);
dengyihao's avatar
dengyihao 已提交
164

165
  MemTable* tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
166
  iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
167
  iiter->val.colVal = NULL;
168
  iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
dengyihao's avatar
dengyihao 已提交
169 170 171
  iiter->next = indexCacheIteratorNext;
  iiter->getValue = indexCacheIteratorGetValue;

dengyihao's avatar
dengyihao 已提交
172 173
  pthread_mutex_unlock(&cache->mtx);

dengyihao's avatar
dengyihao 已提交
174 175 176
  return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
177 178 179
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
180 181 182 183
  tSkipListDestroyIter(iter->iter);
  iterateValueDestroy(&iter->val, true);
  free(iter);
}
dengyihao's avatar
dengyihao 已提交
184 185 186 187 188 189 190 191 192

int indexCacheSchedToMerge(IndexCache* pCache) {
  SSchedMsg schedMsg = {0};
  schedMsg.fp = doMergeWork;
  schedMsg.ahandle = pCache;
  schedMsg.thandle = NULL;
  schedMsg.msg = NULL;

  taosScheduleTask(indexQhandle, &schedMsg);
193 194

  return 0;
dengyihao's avatar
dengyihao 已提交
195
}
196

dengyihao's avatar
dengyihao 已提交
197 198
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
  while (true) {
dengyihao's avatar
dengyihao 已提交
199
    if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
dengyihao's avatar
dengyihao 已提交
200 201 202
      break;
    } else if (cache->imm != NULL) {
      // TODO: wake up by condition variable
dengyihao's avatar
dengyihao 已提交
203
      pthread_cond_wait(&cache->finished, &cache->mtx);
dengyihao's avatar
dengyihao 已提交
204
    } else {
dengyihao's avatar
dengyihao 已提交
205
      indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
206 207
      cache->imm = cache->mem;
      cache->mem = indexInternalCacheCreate(cache->type);
dengyihao's avatar
dengyihao 已提交
208
      cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
209 210 211 212 213 214
      // sched to merge
      // unref cache in bgwork
      indexCacheSchedToMerge(cache);
    }
  }
}
dengyihao's avatar
dengyihao 已提交
215
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
216 217 218
  if (cache == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
219
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
220

dengyihao's avatar
dengyihao 已提交
221
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
222
  indexCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
223
  // encode data
224
  CacheTerm* ct = calloc(1, sizeof(CacheTerm));
dengyihao's avatar
dengyihao 已提交
225 226 227
  if (cache == NULL) {
    return -1;
  }
228 229
  // set up key
  ct->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
230
  if (hasJson) {
dengyihao's avatar
add UT  
dengyihao 已提交
231
    ct->colVal = indexPackJsonData(term);
dengyihao's avatar
dengyihao 已提交
232 233 234 235
  } else {
    ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
    memcpy(ct->colVal, term->colVal, term->nColVal);
  }
dengyihao's avatar
dengyihao 已提交
236 237
  ct->version = atomic_add_fetch_32(&pCache->version, 1);
  // set value
238 239
  ct->uid = uid;
  ct->operaType = term->operType;
dengyihao's avatar
dengyihao 已提交
240
  // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
241
  int64_t estimate = sizeof(ct) + strlen(ct->colVal);
dengyihao's avatar
dengyihao 已提交
242

dengyihao's avatar
dengyihao 已提交
243
  pthread_mutex_lock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
244
  pCache->occupiedMem += estimate;
dengyihao's avatar
dengyihao 已提交
245
  indexCacheMakeRoomForWrite(pCache);
246 247 248 249 250
  MemTable* tbl = pCache->mem;
  indexMemRef(tbl);
  tSkipListPut(tbl->mem, (char*)ct);
  indexMemUnRef(tbl);

dengyihao's avatar
dengyihao 已提交
251
  pthread_mutex_unlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
252 253

  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
254
  return 0;
dengyihao's avatar
dengyihao 已提交
255
  // encode end
dengyihao's avatar
dengyihao 已提交
256
}
dengyihao's avatar
dengyihao 已提交
257
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
dengyihao's avatar
dengyihao 已提交
258
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
259
  return 0;
dengyihao's avatar
dengyihao 已提交
260
}
261

dengyihao's avatar
dengyihao 已提交
262
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SIdxTempResult* tr, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
263 264 265
  if (mem == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
266
  char* key = indexCacheTermGet(ct);
dengyihao's avatar
dengyihao 已提交
267

268
  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
269 270 271 272
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node != NULL) {
      CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
273 274 275 276 277 278 279 280 281
      if (qtype == QUERY_TERM) {
        if (0 == strcmp(c->colVal, ct->colVal)) {
          if (c->operaType == ADD_VALUE) {
            INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
            // taosArrayPush(result, &c->uid);
            *s = kTypeValue;
          } else if (c->operaType == DEL_VALUE) {
            INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
          }
dengyihao's avatar
add UT  
dengyihao 已提交
282 283
        } else {
          break;
284
        }
dengyihao's avatar
add UT  
dengyihao 已提交
285 286
      } else if (qtype == QUERY_PREFIX) {
      } else if (qtype == QUERY_SUFFIX) {
dengyihao's avatar
add UT  
dengyihao 已提交
287 288
      } else if (qtype == QUERY_RANGE) {
      }
289 290 291
    }
  }
  tSkipListDestroyIter(iter);
dengyihao's avatar
dengyihao 已提交
292 293
  return 0;
}
dengyihao's avatar
dengyihao 已提交
294
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
dengyihao's avatar
add UT  
dengyihao 已提交
295
  int64_t st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
296 297 298
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
299 300 301 302 303 304 305 306 307 308 309 310
  IndexCache* pCache = cache;

  MemTable *mem = NULL, *imm = NULL;
  pthread_mutex_lock(&pCache->mtx);
  mem = pCache->mem;
  imm = pCache->imm;
  indexMemRef(mem);
  indexMemRef(imm);
  pthread_mutex_unlock(&pCache->mtx);

  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
dengyihao's avatar
dengyihao 已提交
311 312 313 314

  bool  hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
  char* p = term->colVal;
  if (hasJson) {
dengyihao's avatar
add UT  
dengyihao 已提交
315
    p = indexPackJsonData(term);
dengyihao's avatar
dengyihao 已提交
316 317
  }
  CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)};
dengyihao's avatar
dengyihao 已提交
318 319 320 321 322

  int ret = indexQueryMem(mem, &ct, qtype, result, s);
  if (ret == 0 && *s != kTypeDeletion) {
    // continue search in imm
    ret = indexQueryMem(imm, &ct, qtype, result, s);
dengyihao's avatar
dengyihao 已提交
323
  }
dengyihao's avatar
add UT  
dengyihao 已提交
324

dengyihao's avatar
dengyihao 已提交
325 326 327
  if (hasJson) {
    tfree(p);
  }
dengyihao's avatar
dengyihao 已提交
328

329 330
  indexMemUnRef(mem);
  indexMemUnRef(imm);
dengyihao's avatar
add UT  
dengyihao 已提交
331
  indexInfo("cache search, time cost %" PRIu64 "us", taosGetTimestampUs() - st);
dengyihao's avatar
dengyihao 已提交
332 333

  return ret;
dengyihao's avatar
dengyihao 已提交
334
}
dengyihao's avatar
dengyihao 已提交
335 336

void indexCacheRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
337 338 339
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
340 341 342 343
  int ref = T_REF_INC(cache);
  UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
344 345 346
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
347
  int ref = T_REF_DEC(cache);
dengyihao's avatar
dengyihao 已提交
348 349 350
  if (ref == 0) {
    indexCacheDestroy(cache);
  }
dengyihao's avatar
dengyihao 已提交
351
}
352 353

void indexMemRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
354 355 356
  if (tbl == NULL) {
    return;
  }
357 358 359 360
  int ref = T_REF_INC(tbl);
  UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
361 362 363
  if (tbl == NULL) {
    return;
  }
364 365 366 367 368 369 370 371
  int ref = T_REF_DEC(tbl);
  if (ref == 0) {
    SSkipList* slt = tbl->mem;
    indexCacheDestroySkiplist(slt);
    free(tbl);
  }
}

dengyihao's avatar
dengyihao 已提交
372
static void indexCacheTermDestroy(CacheTerm* ct) {
dengyihao's avatar
dengyihao 已提交
373 374 375
  if (ct == NULL) {
    return;
  }
376 377 378
  free(ct->colVal);
  free(ct);
}
dengyihao's avatar
dengyihao 已提交
379
static char* indexCacheTermGet(const void* pData) {
380 381 382
  CacheTerm* p = (CacheTerm*)pData;
  return (char*)p;
}
dengyihao's avatar
dengyihao 已提交
383
static int32_t indexCacheTermCompare(const void* l, const void* r) {
384 385 386
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;
  // compare colVal
dengyihao's avatar
dengyihao 已提交
387
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
dengyihao's avatar
dengyihao 已提交
388 389 390
  if (cmp == 0) {
    return rt->version - lt->version;
  }
dengyihao's avatar
dengyihao 已提交
391
  return cmp;
392 393 394
}

static MemTable* indexInternalCacheCreate(int8_t type) {
dengyihao's avatar
dengyihao 已提交
395 396
  type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;

397 398
  MemTable* tbl = calloc(1, sizeof(MemTable));
  indexMemRef(tbl);
dengyihao's avatar
dengyihao 已提交
399
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
dengyihao's avatar
dengyihao 已提交
400 401
    tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY,
                               indexCacheTermGet);
402 403 404 405 406 407 408
  }
  return tbl;
}

static void doMergeWork(SSchedMsg* msg) {
  IndexCache* pCache = msg->ahandle;
  SIndex*     sidx = (SIndex*)pCache->index;
dengyihao's avatar
dengyihao 已提交
409
  indexFlushCacheToTFile(sidx, pCache);
410 411 412
}
static bool indexCacheIteratorNext(Iterate* itera) {
  SSkipListIterator* iter = itera->iter;
dengyihao's avatar
dengyihao 已提交
413 414 415
  if (iter == NULL) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
416
  IterateValue* iv = &itera->val;
417 418 419 420 421 422 423 424
  iterateValueDestroy(iv, false);

  bool next = tSkipListIterNext(iter);
  if (next) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);

    iv->type = ct->operaType;
dengyihao's avatar
dengyihao 已提交
425
    iv->ver = ct->version;
dengyihao's avatar
dengyihao 已提交
426
    iv->colVal = tstrdup(ct->colVal);
427 428 429 430 431 432

    taosArrayPush(iv->val, &ct->uid);
  }
  return next;
}

dengyihao's avatar
dengyihao 已提交
433 434 435 436
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
  // opt later
  return &iter->val;
}