index_cache.c 10.2 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "index_cache.h"
dengyihao's avatar
dengyihao 已提交
17
#include "index_util.h"
18
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tsched.h"
dengyihao's avatar
dengyihao 已提交
20

21
#define MAX_INDEX_KEY_LEN 256  // test only, change later
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23
#define MEM_TERM_LIMIT 10 * 10000
dengyihao's avatar
dengyihao 已提交
24

dengyihao's avatar
dengyihao 已提交
25 26
static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl);
27

dengyihao's avatar
dengyihao 已提交
28 29 30
static void    cacheTermDestroy(CacheTerm* ct);
static char*   getIndexKey(const void* pData);
static int32_t compareKey(const void* l, const void* r);
31

32
static MemTable* indexInternalCacheCreate(int8_t type);
33

dengyihao's avatar
dengyihao 已提交
34 35 36
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);

37
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
dengyihao's avatar
dengyihao 已提交
38

dengyihao's avatar
dengyihao 已提交
39
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
dengyihao's avatar
dengyihao 已提交
40
  IndexCache* cache = calloc(1, sizeof(IndexCache));
dengyihao's avatar
dengyihao 已提交
41 42 43
  if (cache == NULL) {
    indexError("failed to create index cache");
    return NULL;
dengyihao's avatar
dengyihao 已提交
44 45 46 47 48 49 50 51
  };
  cache->mem = indexInternalCacheCreate(type);

  cache->colName = calloc(1, strlen(colName) + 1);
  memcpy(cache->colName, colName, strlen(colName));
  cache->type = type;
  cache->index = idx;
  cache->version = 0;
dengyihao's avatar
dengyihao 已提交
52
  cache->suid = suid;
dengyihao's avatar
dengyihao 已提交
53
  pthread_mutex_init(&cache->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
54
  indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
55 56
  return cache;
}
57
void indexCacheDebug(IndexCache* cache) {
58 59 60 61 62 63 64
  MemTable* tbl = NULL;

  pthread_mutex_lock(&cache->mtx);
  tbl = cache->mem;
  indexMemRef(tbl);
  pthread_mutex_unlock(&cache->mtx);

dengyihao's avatar
dengyihao 已提交
65 66 67 68 69 70 71 72 73 74
  {
    SSkipList*         slt = tbl->mem;
    SSkipListIterator* iter = tSkipListCreateIter(slt);
    while (tSkipListIterNext(iter)) {
      SSkipListNode* node = tSkipListIterGet(iter);
      CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (ct != NULL) {
        // TODO, add more debug info
        indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
      }
75
    }
dengyihao's avatar
dengyihao 已提交
76 77 78
    tSkipListDestroyIter(iter);

    indexMemUnRef(tbl);
79
  }
80

dengyihao's avatar
dengyihao 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
  {
    pthread_mutex_lock(&cache->mtx);
    tbl = cache->imm;
    indexMemRef(tbl);
    pthread_mutex_unlock(&cache->mtx);
    if (tbl != NULL) {
      SSkipList*         slt = tbl->mem;
      SSkipListIterator* iter = tSkipListCreateIter(slt);
      while (tSkipListIterNext(iter)) {
        SSkipListNode* node = tSkipListIterGet(iter);
        CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
        if (ct != NULL) {
          // TODO, add more debug info
          indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
        }
      }
      tSkipListDestroyIter(iter);
    }

    indexMemUnRef(tbl);
  }
102
}
dengyihao's avatar
dengyihao 已提交
103

dengyihao's avatar
dengyihao 已提交
104 105 106 107 108
void indexCacheDestroySkiplist(SSkipList* slt) {
  SSkipListIterator* iter = tSkipListCreateIter(slt);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
109 110 111 112
    if (ct != NULL) {
      free(ct->colVal);
      free(ct);
    }
dengyihao's avatar
dengyihao 已提交
113 114
  }
  tSkipListDestroyIter(iter);
115
  tSkipListDestroy(slt);
dengyihao's avatar
dengyihao 已提交
116
}
dengyihao's avatar
dengyihao 已提交
117
void indexCacheDestroyImm(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
118 119 120
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
121

122
  MemTable* tbl = NULL;
dengyihao's avatar
dengyihao 已提交
123
  pthread_mutex_lock(&cache->mtx);
124
  tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
125 126
  cache->imm = NULL;  // or throw int bg thread
  pthread_mutex_unlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
127 128

  indexMemUnRef(tbl);
129
  indexMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
130
}
dengyihao's avatar
dengyihao 已提交
131 132
void indexCacheDestroy(void* cache) {
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
133 134 135
  if (pCache == NULL) {
    return;
  }
136 137
  indexMemUnRef(pCache->mem);
  indexMemUnRef(pCache->imm);
dengyihao's avatar
dengyihao 已提交
138
  free(pCache->colName);
dengyihao's avatar
dengyihao 已提交
139

dengyihao's avatar
dengyihao 已提交
140
  free(pCache);
dengyihao's avatar
dengyihao 已提交
141 142
}

dengyihao's avatar
dengyihao 已提交
143 144
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
  Iterate* iiter = calloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
145 146 147
  if (iiter == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
148 149 150 151

  pthread_mutex_lock(&cache->mtx);

  indexMemRef(cache->imm);
dengyihao's avatar
dengyihao 已提交
152

153
  MemTable* tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
154
  iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
155
  iiter->val.colVal = NULL;
156
  iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
dengyihao's avatar
dengyihao 已提交
157 158 159
  iiter->next = indexCacheIteratorNext;
  iiter->getValue = indexCacheIteratorGetValue;

dengyihao's avatar
dengyihao 已提交
160 161
  pthread_mutex_unlock(&cache->mtx);

dengyihao's avatar
dengyihao 已提交
162 163 164
  return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
165 166 167
  if (iter == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
168 169 170 171
  tSkipListDestroyIter(iter->iter);
  iterateValueDestroy(&iter->val, true);
  free(iter);
}
dengyihao's avatar
dengyihao 已提交
172 173 174 175 176 177 178 179 180 181

int indexCacheSchedToMerge(IndexCache* pCache) {
  SSchedMsg schedMsg = {0};
  schedMsg.fp = doMergeWork;
  schedMsg.ahandle = pCache;
  schedMsg.thandle = NULL;
  schedMsg.msg = NULL;

  taosScheduleTask(indexQhandle, &schedMsg);
}
dengyihao's avatar
dengyihao 已提交
182 183 184 185 186 187 188
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
  while (true) {
    if (cache->nTerm < MEM_TERM_LIMIT) {
      cache->nTerm += 1;
      break;
    } else if (cache->imm != NULL) {
      // TODO: wake up by condition variable
dengyihao's avatar
dengyihao 已提交
189
      pthread_mutex_unlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
190
      taosMsleep(50);
dengyihao's avatar
dengyihao 已提交
191
      pthread_mutex_lock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
192
    } else {
dengyihao's avatar
dengyihao 已提交
193
      indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
194 195 196 197 198 199 200 201 202 203
      cache->imm = cache->mem;
      cache->mem = indexInternalCacheCreate(cache->type);
      cache->nTerm = 1;
      // sched to merge
      // unref cache in bgwork
      indexCacheSchedToMerge(cache);
    }
  }
}

dengyihao's avatar
dengyihao 已提交
204
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
205 206 207
  if (cache == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
208

dengyihao's avatar
dengyihao 已提交
209
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
210
  indexCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
211
  // encode data
212
  CacheTerm* ct = calloc(1, sizeof(CacheTerm));
dengyihao's avatar
dengyihao 已提交
213 214 215
  if (cache == NULL) {
    return -1;
  }
216 217
  // set up key
  ct->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
218 219
  ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
  memcpy(ct->colVal, term->colVal, term->nColVal);
dengyihao's avatar
dengyihao 已提交
220 221
  ct->version = atomic_add_fetch_32(&pCache->version, 1);
  // set value
222 223 224
  ct->uid = uid;
  ct->operaType = term->operType;

dengyihao's avatar
dengyihao 已提交
225 226
  // ugly code, refactor later
  pthread_mutex_lock(&pCache->mtx);
227

dengyihao's avatar
dengyihao 已提交
228
  indexCacheMakeRoomForWrite(pCache);
229 230 231 232 233
  MemTable* tbl = pCache->mem;
  indexMemRef(tbl);
  tSkipListPut(tbl->mem, (char*)ct);
  indexMemUnRef(tbl);

dengyihao's avatar
dengyihao 已提交
234
  pthread_mutex_unlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
235 236

  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
237
  return 0;
dengyihao's avatar
dengyihao 已提交
238
  // encode end
dengyihao's avatar
dengyihao 已提交
239
}
dengyihao's avatar
dengyihao 已提交
240
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
dengyihao's avatar
dengyihao 已提交
241
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
242
  return 0;
dengyihao's avatar
dengyihao 已提交
243
}
244

dengyihao's avatar
dengyihao 已提交
245
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
246 247 248
  if (mem == NULL) {
    return 0;
  }
249
  char* key = getIndexKey(ct);
dengyihao's avatar
dengyihao 已提交
250

251
  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
252 253 254 255 256
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node != NULL) {
      CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
dengyihao's avatar
dengyihao 已提交
257
        if (strcmp(c->colVal, ct->colVal) == 0) {
258 259 260 261 262 263 264 265 266 267 268 269 270
          taosArrayPush(result, &c->uid);
          *s = kTypeValue;
        } else {
          break;
        }
      } else if (c->operaType == DEL_VALUE) {
        // table is del, not need
        *s = kTypeDeletion;
        break;
      }
    }
  }
  tSkipListDestroyIter(iter);
dengyihao's avatar
dengyihao 已提交
271 272 273
  return 0;
}
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
dengyihao's avatar
dengyihao 已提交
274 275 276
  if (cache == NULL) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
  IndexCache* pCache = cache;

  MemTable *mem = NULL, *imm = NULL;
  pthread_mutex_lock(&pCache->mtx);
  mem = pCache->mem;
  imm = pCache->imm;
  indexMemRef(mem);
  indexMemRef(imm);
  pthread_mutex_unlock(&pCache->mtx);

  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
  CacheTerm       ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};

  int ret = indexQueryMem(mem, &ct, qtype, result, s);
  if (ret == 0 && *s != kTypeDeletion) {
    // continue search in imm
    ret = indexQueryMem(imm, &ct, qtype, result, s);
dengyihao's avatar
dengyihao 已提交
295
  }
dengyihao's avatar
dengyihao 已提交
296

297 298
  indexMemUnRef(mem);
  indexMemUnRef(imm);
dengyihao's avatar
dengyihao 已提交
299 300

  return ret;
dengyihao's avatar
dengyihao 已提交
301
}
dengyihao's avatar
dengyihao 已提交
302 303

void indexCacheRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
304 305 306
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
307 308 309 310
  int ref = T_REF_INC(cache);
  UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
311 312 313
  if (cache == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
314
  int ref = T_REF_DEC(cache);
dengyihao's avatar
dengyihao 已提交
315 316 317
  if (ref == 0) {
    indexCacheDestroy(cache);
  }
dengyihao's avatar
dengyihao 已提交
318
}
319 320

void indexMemRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
321 322 323
  if (tbl == NULL) {
    return;
  }
324 325 326 327
  int ref = T_REF_INC(tbl);
  UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
328 329 330
  if (tbl == NULL) {
    return;
  }
331 332 333 334 335 336 337 338 339
  int ref = T_REF_DEC(tbl);
  if (ref == 0) {
    SSkipList* slt = tbl->mem;
    indexCacheDestroySkiplist(slt);
    free(tbl);
  }
}

static void cacheTermDestroy(CacheTerm* ct) {
dengyihao's avatar
dengyihao 已提交
340 341 342
  if (ct == NULL) {
    return;
  }
343 344 345 346 347 348 349 350 351 352 353 354 355
  free(ct->colVal);
  free(ct);
}
static char* getIndexKey(const void* pData) {
  CacheTerm* p = (CacheTerm*)pData;
  return (char*)p;
}

static int32_t compareKey(const void* l, const void* r) {
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;

  // compare colVal
dengyihao's avatar
dengyihao 已提交
356
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
dengyihao's avatar
dengyihao 已提交
357 358 359
  if (cmp == 0) {
    return rt->version - lt->version;
  }
dengyihao's avatar
dengyihao 已提交
360
  return cmp;
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
}

static MemTable* indexInternalCacheCreate(int8_t type) {
  MemTable* tbl = calloc(1, sizeof(MemTable));
  indexMemRef(tbl);
  if (type == TSDB_DATA_TYPE_BINARY) {
    tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
  }
  return tbl;
}

static void doMergeWork(SSchedMsg* msg) {
  IndexCache* pCache = msg->ahandle;
  SIndex*     sidx = (SIndex*)pCache->index;
  indexFlushCacheTFile(sidx, pCache);
}
static bool indexCacheIteratorNext(Iterate* itera) {
  SSkipListIterator* iter = itera->iter;
dengyihao's avatar
dengyihao 已提交
379 380
  if (iter == NULL) {
    return false;
dengyihao's avatar
dengyihao 已提交
381
  }
dengyihao's avatar
dengyihao 已提交
382
  IterateValue* iv = &itera->val;
383 384 385 386 387 388 389 390
  iterateValueDestroy(iv, false);

  bool next = tSkipListIterNext(iter);
  if (next) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);

    iv->type = ct->operaType;
dengyihao's avatar
dengyihao 已提交
391 392
    iv->colVal = calloc(1, strlen(ct->colVal) + 1);
    memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
393 394 395 396 397 398

    taosArrayPush(iv->val, &ct->uid);
  }
  return next;
}

dengyihao's avatar
dengyihao 已提交
399
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; }