index_cache.c 11.1 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "index_cache.h"
dengyihao's avatar
dengyihao 已提交
17
#include "index_util.h"
18
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tsched.h"
dengyihao's avatar
dengyihao 已提交
20

21
#define MAX_INDEX_KEY_LEN 256  // test only, change later
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23
#define MEM_TERM_LIMIT 10 * 10000
dengyihao's avatar
dengyihao 已提交
24
#define MEM_THRESHOLD 1024 * 1024
dengyihao's avatar
dengyihao 已提交
25
#define MEM_ESTIMATE_RADIO 1.5
dengyihao's avatar
dengyihao 已提交
26

dengyihao's avatar
dengyihao 已提交
27 28
static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl);
29

dengyihao's avatar
dengyihao 已提交
30 31 32
static void    indexCacheTermDestroy(CacheTerm* ct);
static int32_t indexCacheTermCompare(const void* l, const void* r);
static char*   indexCacheTermGet(const void* pData);
33

34
static MemTable* indexInternalCacheCreate(int8_t type);
35

dengyihao's avatar
dengyihao 已提交
36 37 38
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);

39
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
dengyihao's avatar
dengyihao 已提交
40

dengyihao's avatar
dengyihao 已提交
41
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
dengyihao's avatar
dengyihao 已提交
42
  IndexCache* cache = calloc(1, sizeof(IndexCache));
dengyihao's avatar
dengyihao 已提交
43 44 45
  if (cache == NULL) {
    indexError("failed to create index cache");
    return NULL;
dengyihao's avatar
dengyihao 已提交
46 47
  };
  cache->mem = indexInternalCacheCreate(type);
dengyihao's avatar
dengyihao 已提交
48
  cache->colName = tstrdup(colName);
dengyihao's avatar
dengyihao 已提交
49 50 51
  cache->type = type;
  cache->index = idx;
  cache->version = 0;
dengyihao's avatar
dengyihao 已提交
52
  cache->suid = suid;
dengyihao's avatar
dengyihao 已提交
53
  cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
54

dengyihao's avatar
dengyihao 已提交
55
  pthread_mutex_init(&cache->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
56 57
  pthread_cond_init(&cache->finished, NULL);

dengyihao's avatar
dengyihao 已提交
58
  indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
59 60
  return cache;
}
61
void indexCacheDebug(IndexCache* cache) {
62 63 64 65 66 67 68
  MemTable* tbl = NULL;

  pthread_mutex_lock(&cache->mtx);
  tbl = cache->mem;
  indexMemRef(tbl);
  pthread_mutex_unlock(&cache->mtx);

dengyihao's avatar
dengyihao 已提交
69 70 71 72 73 74 75 76 77 78
  {
    SSkipList*         slt = tbl->mem;
    SSkipListIterator* iter = tSkipListCreateIter(slt);
    while (tSkipListIterNext(iter)) {
      SSkipListNode* node = tSkipListIterGet(iter);
      CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (ct != NULL) {
        // TODO, add more debug info
        indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
      }
79
    }
dengyihao's avatar
dengyihao 已提交
80 81 82
    tSkipListDestroyIter(iter);

    indexMemUnRef(tbl);
83
  }
84

dengyihao's avatar
dengyihao 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
  {
    pthread_mutex_lock(&cache->mtx);
    tbl = cache->imm;
    indexMemRef(tbl);
    pthread_mutex_unlock(&cache->mtx);
    if (tbl != NULL) {
      SSkipList*         slt = tbl->mem;
      SSkipListIterator* iter = tSkipListCreateIter(slt);
      while (tSkipListIterNext(iter)) {
        SSkipListNode* node = tSkipListIterGet(iter);
        CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
        if (ct != NULL) {
          // TODO, add more debug info
          indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
        }
      }
      tSkipListDestroyIter(iter);
    }

    indexMemUnRef(tbl);
  }
106
}
dengyihao's avatar
dengyihao 已提交
107

dengyihao's avatar
dengyihao 已提交
108 109 110 111 112
void indexCacheDestroySkiplist(SSkipList* slt) {
  SSkipListIterator* iter = tSkipListCreateIter(slt);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
113 114 115 116
    if (ct != NULL) {
      free(ct->colVal);
      free(ct);
    }
dengyihao's avatar
dengyihao 已提交
117 118
  }
  tSkipListDestroyIter(iter);
119
  tSkipListDestroy(slt);
dengyihao's avatar
dengyihao 已提交
120
}
dengyihao's avatar
dengyihao 已提交
121
void indexCacheDestroyImm(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
122 123 124
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
125

126
  MemTable* tbl = NULL;
dengyihao's avatar
dengyihao 已提交
127
  pthread_mutex_lock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
128

129
  tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
130
  cache->imm = NULL;  // or throw int bg thread
dengyihao's avatar
dengyihao 已提交
131
  pthread_cond_broadcast(&cache->finished);
dengyihao's avatar
dengyihao 已提交
132

dengyihao's avatar
dengyihao 已提交
133
  pthread_mutex_unlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
134 135

  indexMemUnRef(tbl);
136
  indexMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
137
}
dengyihao's avatar
dengyihao 已提交
138 139
void indexCacheDestroy(void* cache) {
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
140 141 142
  if (pCache == NULL) {
    return;
  }
143 144
  indexMemUnRef(pCache->mem);
  indexMemUnRef(pCache->imm);
dengyihao's avatar
dengyihao 已提交
145
  free(pCache->colName);
dengyihao's avatar
dengyihao 已提交
146

dengyihao's avatar
dengyihao 已提交
147 148 149
  pthread_mutex_destroy(&pCache->mtx);
  pthread_cond_destroy(&pCache->finished);

dengyihao's avatar
dengyihao 已提交
150
  free(pCache);
dengyihao's avatar
dengyihao 已提交
151 152
}

dengyihao's avatar
dengyihao 已提交
153 154
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
  Iterate* iiter = calloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
155 156 157
  if (iiter == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
158 159 160 161

  pthread_mutex_lock(&cache->mtx);

  indexMemRef(cache->imm);
dengyihao's avatar
dengyihao 已提交
162

163
  MemTable* tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
164
  iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
165
  iiter->val.colVal = NULL;
166
  iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
dengyihao's avatar
dengyihao 已提交
167 168 169
  iiter->next = indexCacheIteratorNext;
  iiter->getValue = indexCacheIteratorGetValue;

dengyihao's avatar
dengyihao 已提交
170 171
  pthread_mutex_unlock(&cache->mtx);

dengyihao's avatar
dengyihao 已提交
172 173 174
  return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
175 176 177
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
178 179 180 181
  tSkipListDestroyIter(iter->iter);
  iterateValueDestroy(&iter->val, true);
  free(iter);
}
dengyihao's avatar
dengyihao 已提交
182 183 184 185 186 187 188 189 190 191

int indexCacheSchedToMerge(IndexCache* pCache) {
  SSchedMsg schedMsg = {0};
  schedMsg.fp = doMergeWork;
  schedMsg.ahandle = pCache;
  schedMsg.thandle = NULL;
  schedMsg.msg = NULL;

  taosScheduleTask(indexQhandle, &schedMsg);
}
dengyihao's avatar
dengyihao 已提交
192 193
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
  while (true) {
dengyihao's avatar
dengyihao 已提交
194
    if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
dengyihao's avatar
dengyihao 已提交
195 196 197
      break;
    } else if (cache->imm != NULL) {
      // TODO: wake up by condition variable
dengyihao's avatar
dengyihao 已提交
198
      pthread_cond_wait(&cache->finished, &cache->mtx);
dengyihao's avatar
dengyihao 已提交
199
    } else {
dengyihao's avatar
dengyihao 已提交
200
      indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
201 202
      cache->imm = cache->mem;
      cache->mem = indexInternalCacheCreate(cache->type);
dengyihao's avatar
dengyihao 已提交
203
      cache->occupiedMem = 0;
dengyihao's avatar
dengyihao 已提交
204 205 206 207 208 209 210
      // sched to merge
      // unref cache in bgwork
      indexCacheSchedToMerge(cache);
    }
  }
}

dengyihao's avatar
dengyihao 已提交
211
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
212 213 214
  if (cache == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
215

dengyihao's avatar
dengyihao 已提交
216
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
217
  indexCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
218
  // encode data
219
  CacheTerm* ct = calloc(1, sizeof(CacheTerm));
dengyihao's avatar
dengyihao 已提交
220 221 222
  if (cache == NULL) {
    return -1;
  }
223 224
  // set up key
  ct->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
225 226
  ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
  memcpy(ct->colVal, term->colVal, term->nColVal);
dengyihao's avatar
dengyihao 已提交
227 228
  ct->version = atomic_add_fetch_32(&pCache->version, 1);
  // set value
229 230
  ct->uid = uid;
  ct->operaType = term->operType;
dengyihao's avatar
dengyihao 已提交
231
  // ugly code, refactor later
dengyihao's avatar
dengyihao 已提交
232
  int64_t estimate = sizeof(ct) + strlen(ct->colVal);
dengyihao's avatar
dengyihao 已提交
233

dengyihao's avatar
dengyihao 已提交
234
  pthread_mutex_lock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
235
  pCache->occupiedMem += estimate;
dengyihao's avatar
dengyihao 已提交
236
  indexCacheMakeRoomForWrite(pCache);
237 238 239 240 241
  MemTable* tbl = pCache->mem;
  indexMemRef(tbl);
  tSkipListPut(tbl->mem, (char*)ct);
  indexMemUnRef(tbl);

dengyihao's avatar
dengyihao 已提交
242
  pthread_mutex_unlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
243 244

  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
245
  return 0;
dengyihao's avatar
dengyihao 已提交
246
  // encode end
dengyihao's avatar
dengyihao 已提交
247
}
dengyihao's avatar
dengyihao 已提交
248
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
dengyihao's avatar
dengyihao 已提交
249
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
250
  return 0;
dengyihao's avatar
dengyihao 已提交
251
}
252

dengyihao's avatar
dengyihao 已提交
253
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
254 255 256
  if (mem == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
257
  char* key = indexCacheTermGet(ct);
dengyihao's avatar
dengyihao 已提交
258

259
  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
260 261 262 263 264
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node != NULL) {
      CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
dengyihao's avatar
dengyihao 已提交
265
        if (strcmp(c->colVal, ct->colVal) == 0) {
266 267 268 269 270 271 272 273 274 275 276 277 278
          taosArrayPush(result, &c->uid);
          *s = kTypeValue;
        } else {
          break;
        }
      } else if (c->operaType == DEL_VALUE) {
        // table is del, not need
        *s = kTypeDeletion;
        break;
      }
    }
  }
  tSkipListDestroyIter(iter);
dengyihao's avatar
dengyihao 已提交
279 280 281
  return 0;
}
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
282 283 284
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
  IndexCache* pCache = cache;

  MemTable *mem = NULL, *imm = NULL;
  pthread_mutex_lock(&pCache->mtx);
  mem = pCache->mem;
  imm = pCache->imm;
  indexMemRef(mem);
  indexMemRef(imm);
  pthread_mutex_unlock(&pCache->mtx);

  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
  CacheTerm       ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};

  int ret = indexQueryMem(mem, &ct, qtype, result, s);
  if (ret == 0 && *s != kTypeDeletion) {
    // continue search in imm
    ret = indexQueryMem(imm, &ct, qtype, result, s);
dengyihao's avatar
dengyihao 已提交
303
  }
dengyihao's avatar
dengyihao 已提交
304

305 306
  indexMemUnRef(mem);
  indexMemUnRef(imm);
dengyihao's avatar
dengyihao 已提交
307 308

  return ret;
dengyihao's avatar
dengyihao 已提交
309
}
dengyihao's avatar
dengyihao 已提交
310 311

void indexCacheRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
312 313 314
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
315 316 317 318
  int ref = T_REF_INC(cache);
  UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
319 320 321
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
322
  int ref = T_REF_DEC(cache);
dengyihao's avatar
dengyihao 已提交
323 324 325
  if (ref == 0) {
    indexCacheDestroy(cache);
  }
dengyihao's avatar
dengyihao 已提交
326
}
327 328

void indexMemRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
329 330 331
  if (tbl == NULL) {
    return;
  }
332 333 334 335
  int ref = T_REF_INC(tbl);
  UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
336 337 338
  if (tbl == NULL) {
    return;
  }
339 340 341 342 343 344 345 346
  int ref = T_REF_DEC(tbl);
  if (ref == 0) {
    SSkipList* slt = tbl->mem;
    indexCacheDestroySkiplist(slt);
    free(tbl);
  }
}

dengyihao's avatar
dengyihao 已提交
347
static void indexCacheTermDestroy(CacheTerm* ct) {
dengyihao's avatar
dengyihao 已提交
348 349 350
  if (ct == NULL) {
    return;
  }
351 352 353
  free(ct->colVal);
  free(ct);
}
dengyihao's avatar
dengyihao 已提交
354
static char* indexCacheTermGet(const void* pData) {
355 356 357
  CacheTerm* p = (CacheTerm*)pData;
  return (char*)p;
}
dengyihao's avatar
dengyihao 已提交
358
static int32_t indexCacheTermCompare(const void* l, const void* r) {
359 360 361
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;
  // compare colVal
dengyihao's avatar
dengyihao 已提交
362
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
dengyihao's avatar
dengyihao 已提交
363 364 365
  if (cmp == 0) {
    return rt->version - lt->version;
  }
dengyihao's avatar
dengyihao 已提交
366
  return cmp;
367 368 369 370 371
}

static MemTable* indexInternalCacheCreate(int8_t type) {
  MemTable* tbl = calloc(1, sizeof(MemTable));
  indexMemRef(tbl);
dengyihao's avatar
dengyihao 已提交
372
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
dengyihao's avatar
dengyihao 已提交
373 374
    tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY,
                               indexCacheTermGet);
375 376 377 378 379 380 381
  }
  return tbl;
}

static void doMergeWork(SSchedMsg* msg) {
  IndexCache* pCache = msg->ahandle;
  SIndex*     sidx = (SIndex*)pCache->index;
dengyihao's avatar
dengyihao 已提交
382
  indexFlushCacheToTFile(sidx, pCache);
383 384 385
}
static bool indexCacheIteratorNext(Iterate* itera) {
  SSkipListIterator* iter = itera->iter;
dengyihao's avatar
dengyihao 已提交
386 387 388
  if (iter == NULL) {
    return false;
  }
dengyihao's avatar
dengyihao 已提交
389
  IterateValue* iv = &itera->val;
390 391
  iterateValueDestroy(iv, false);

dengyihao's avatar
dengyihao 已提交
392 393 394
  // IterateValue* iv = &itera->val;
  // IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))};

395 396 397 398 399
  bool next = tSkipListIterNext(iter);
  if (next) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);

dengyihao's avatar
dengyihao 已提交
400 401 402 403 404 405 406
    // equal func
    // if (iv->colVal != NULL && ct->colVal != NULL) {
    //  if (0 == strcmp(iv->colVal, ct->colVal)) { if (iv->type == ADD_VALUE) }
    //} else {
    //  tIterVal.colVal = calloc(1, strlen(ct->colVal) + 1);
    //  tIterval.colVal = tstrdup(ct->colVal);
    //}
407
    iv->type = ct->operaType;
dengyihao's avatar
dengyihao 已提交
408 409 410
    iv->colVal = tstrdup(ct->colVal);
    // iv->colVal = calloc(1, strlen(ct->colVal) + 1);
    // memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
411 412 413

    taosArrayPush(iv->val, &ct->uid);
  }
dengyihao's avatar
dengyihao 已提交
414 415 416 417
  // IterateValue* iv = &itera->val;
  // iterateValueDestroy(iv, true);
  //*iv = tIterVal;

418 419 420
  return next;
}

dengyihao's avatar
dengyihao 已提交
421 422 423 424
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
  // opt later
  return &iter->val;
}