index_cache.c 10.3 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "index_cache.h"
dengyihao's avatar
dengyihao 已提交
17
#include "index_util.h"
18
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tsched.h"
dengyihao's avatar
dengyihao 已提交
20

21
#define MAX_INDEX_KEY_LEN 256  // test only, change later
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23
#define MEM_TERM_LIMIT 10000 * 10
24
// ref index_cache.h:22
25
//#define CACHE_KEY_LEN(p) \
dengyihao's avatar
dengyihao 已提交
26 27
//  (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
//  sizeof(p->operType))
dengyihao's avatar
dengyihao 已提交
28

dengyihao's avatar
dengyihao 已提交
29 30
static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl);
31

dengyihao's avatar
dengyihao 已提交
32 33 34
static void    cacheTermDestroy(CacheTerm* ct);
static char*   getIndexKey(const void* pData);
static int32_t compareKey(const void* l, const void* r);
35

36
static MemTable* indexInternalCacheCreate(int8_t type);
37

dengyihao's avatar
dengyihao 已提交
38 39 40
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);

41
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
dengyihao's avatar
dengyihao 已提交
42 43

IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
dengyihao's avatar
dengyihao 已提交
44
  IndexCache* cache = calloc(1, sizeof(IndexCache));
dengyihao's avatar
dengyihao 已提交
45 46 47
  if (cache == NULL) {
    indexError("failed to create index cache");
    return NULL;
dengyihao's avatar
dengyihao 已提交
48 49 50 51 52 53 54 55 56
  };
  cache->mem = indexInternalCacheCreate(type);

  cache->colName = calloc(1, strlen(colName) + 1);
  memcpy(cache->colName, colName, strlen(colName));
  cache->type = type;
  cache->index = idx;
  cache->version = 0;

dengyihao's avatar
dengyihao 已提交
57
  pthread_mutex_init(&cache->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
58
  indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
59 60
  return cache;
}
61
void indexCacheDebug(IndexCache* cache) {
62 63 64 65 66 67 68
  MemTable* tbl = NULL;

  pthread_mutex_lock(&cache->mtx);
  tbl = cache->mem;
  indexMemRef(tbl);
  pthread_mutex_unlock(&cache->mtx);

dengyihao's avatar
dengyihao 已提交
69 70 71 72 73 74 75 76 77 78
  {
    SSkipList*         slt = tbl->mem;
    SSkipListIterator* iter = tSkipListCreateIter(slt);
    while (tSkipListIterNext(iter)) {
      SSkipListNode* node = tSkipListIterGet(iter);
      CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (ct != NULL) {
        // TODO, add more debug info
        indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
      }
79
    }
dengyihao's avatar
dengyihao 已提交
80 81 82
    tSkipListDestroyIter(iter);

    indexMemUnRef(tbl);
83
  }
84

dengyihao's avatar
dengyihao 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
  {
    pthread_mutex_lock(&cache->mtx);
    tbl = cache->imm;
    indexMemRef(tbl);
    pthread_mutex_unlock(&cache->mtx);
    if (tbl != NULL) {
      SSkipList*         slt = tbl->mem;
      SSkipListIterator* iter = tSkipListCreateIter(slt);
      while (tSkipListIterNext(iter)) {
        SSkipListNode* node = tSkipListIterGet(iter);
        CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
        if (ct != NULL) {
          // TODO, add more debug info
          indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
        }
      }
      tSkipListDestroyIter(iter);
    }

    indexMemUnRef(tbl);
  }
106
}
dengyihao's avatar
dengyihao 已提交
107

dengyihao's avatar
dengyihao 已提交
108 109 110 111 112
void indexCacheDestroySkiplist(SSkipList* slt) {
  SSkipListIterator* iter = tSkipListCreateIter(slt);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
113 114 115 116
    if (ct != NULL) {
      free(ct->colVal);
      free(ct);
    }
dengyihao's avatar
dengyihao 已提交
117 118
  }
  tSkipListDestroyIter(iter);
119
  tSkipListDestroy(slt);
dengyihao's avatar
dengyihao 已提交
120
}
dengyihao's avatar
dengyihao 已提交
121
void indexCacheDestroyImm(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
122 123
  if (cache == NULL) { return; }

124
  MemTable* tbl = NULL;
dengyihao's avatar
dengyihao 已提交
125
  pthread_mutex_lock(&cache->mtx);
126
  tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
127 128
  cache->imm = NULL;  // or throw int bg thread
  pthread_mutex_unlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
129 130

  indexMemUnRef(tbl);
131
  indexMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
132
}
dengyihao's avatar
dengyihao 已提交
133 134
void indexCacheDestroy(void* cache) {
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
135
  if (pCache == NULL) { return; }
136 137
  indexMemUnRef(pCache->mem);
  indexMemUnRef(pCache->imm);
dengyihao's avatar
dengyihao 已提交
138
  free(pCache->colName);
dengyihao's avatar
dengyihao 已提交
139

dengyihao's avatar
dengyihao 已提交
140
  free(pCache);
dengyihao's avatar
dengyihao 已提交
141 142
}

dengyihao's avatar
dengyihao 已提交
143 144
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
  Iterate* iiter = calloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
145 146 147 148 149
  if (iiter == NULL) { return NULL; }

  pthread_mutex_lock(&cache->mtx);

  indexMemRef(cache->imm);
dengyihao's avatar
dengyihao 已提交
150

151
  MemTable* tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
152
  iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
153
  iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
dengyihao's avatar
dengyihao 已提交
154 155 156
  iiter->next = indexCacheIteratorNext;
  iiter->getValue = indexCacheIteratorGetValue;

dengyihao's avatar
dengyihao 已提交
157 158
  pthread_mutex_unlock(&cache->mtx);

dengyihao's avatar
dengyihao 已提交
159 160 161
  return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
162
  if (iter == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
163 164 165 166
  tSkipListDestroyIter(iter->iter);
  iterateValueDestroy(&iter->val, true);
  free(iter);
}
dengyihao's avatar
dengyihao 已提交
167 168 169 170 171 172 173 174 175 176

int indexCacheSchedToMerge(IndexCache* pCache) {
  SSchedMsg schedMsg = {0};
  schedMsg.fp = doMergeWork;
  schedMsg.ahandle = pCache;
  schedMsg.thandle = NULL;
  schedMsg.msg = NULL;

  taosScheduleTask(indexQhandle, &schedMsg);
}
dengyihao's avatar
dengyihao 已提交
177 178 179 180 181 182 183
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
  while (true) {
    if (cache->nTerm < MEM_TERM_LIMIT) {
      cache->nTerm += 1;
      break;
    } else if (cache->imm != NULL) {
      // TODO: wake up by condition variable
dengyihao's avatar
dengyihao 已提交
184
      // pthread_mutex_unlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
185
      taosMsleep(50);
dengyihao's avatar
dengyihao 已提交
186
      // pthread_mutex_lock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
187
    } else {
dengyihao's avatar
dengyihao 已提交
188
      indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
189 190 191 192 193 194 195 196 197 198
      cache->imm = cache->mem;
      cache->mem = indexInternalCacheCreate(cache->type);
      cache->nTerm = 1;
      // sched to merge
      // unref cache in bgwork
      indexCacheSchedToMerge(cache);
    }
  }
}

dengyihao's avatar
dengyihao 已提交
199
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
200
  if (cache == NULL) { return -1; }
dengyihao's avatar
dengyihao 已提交
201

dengyihao's avatar
dengyihao 已提交
202
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
203
  indexCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
204
  // encode data
205
  CacheTerm* ct = calloc(1, sizeof(CacheTerm));
dengyihao's avatar
dengyihao 已提交
206
  if (cache == NULL) { return -1; }
207 208
  // set up key
  ct->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
209 210
  ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
  memcpy(ct->colVal, term->colVal, term->nColVal);
dengyihao's avatar
dengyihao 已提交
211 212
  ct->version = atomic_add_fetch_32(&pCache->version, 1);
  // set value
213 214 215
  ct->uid = uid;
  ct->operaType = term->operType;

dengyihao's avatar
dengyihao 已提交
216 217
  // ugly code, refactor later
  pthread_mutex_lock(&pCache->mtx);
218

dengyihao's avatar
dengyihao 已提交
219
  indexCacheMakeRoomForWrite(pCache);
220 221 222 223 224
  MemTable* tbl = pCache->mem;
  indexMemRef(tbl);
  tSkipListPut(tbl->mem, (char*)ct);
  indexMemUnRef(tbl);

dengyihao's avatar
dengyihao 已提交
225
  pthread_mutex_unlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
226 227

  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
228
  return 0;
dengyihao's avatar
dengyihao 已提交
229
  // encode end
dengyihao's avatar
dengyihao 已提交
230
}
dengyihao's avatar
dengyihao 已提交
231
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
dengyihao's avatar
dengyihao 已提交
232
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
233
  return 0;
dengyihao's avatar
dengyihao 已提交
234
}
235

dengyihao's avatar
dengyihao 已提交
236 237
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
  if (mem == NULL) { return 0; }
238
  char* key = getIndexKey(ct);
dengyihao's avatar
dengyihao 已提交
239

240
  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
241 242 243 244 245
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node != NULL) {
      CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
dengyihao's avatar
dengyihao 已提交
246
        if (strcmp(c->colVal, ct->colVal) == 0) {
247 248 249 250 251 252 253 254 255 256 257 258 259
          taosArrayPush(result, &c->uid);
          *s = kTypeValue;
        } else {
          break;
        }
      } else if (c->operaType == DEL_VALUE) {
        // table is del, not need
        *s = kTypeDeletion;
        break;
      }
    }
  }
  tSkipListDestroyIter(iter);
dengyihao's avatar
dengyihao 已提交
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
  return 0;
}
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
  if (cache == NULL) { return -1; }
  IndexCache* pCache = cache;

  MemTable *mem = NULL, *imm = NULL;
  pthread_mutex_lock(&pCache->mtx);
  mem = pCache->mem;
  imm = pCache->imm;
  indexMemRef(mem);
  indexMemRef(imm);
  pthread_mutex_unlock(&pCache->mtx);

  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
  CacheTerm       ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
dengyihao's avatar
dengyihao 已提交
277
  // indexCacheDebug(pCache);
dengyihao's avatar
dengyihao 已提交
278 279 280 281 282

  int ret = indexQueryMem(mem, &ct, qtype, result, s);
  if (ret == 0 && *s != kTypeDeletion) {
    // continue search in imm
    ret = indexQueryMem(imm, &ct, qtype, result, s);
dengyihao's avatar
dengyihao 已提交
283
  }
dengyihao's avatar
dengyihao 已提交
284 285
  // cacheTermDestroy(ct);

286 287
  indexMemUnRef(mem);
  indexMemUnRef(imm);
dengyihao's avatar
dengyihao 已提交
288 289

  return ret;
dengyihao's avatar
dengyihao 已提交
290
}
dengyihao's avatar
dengyihao 已提交
291 292

void indexCacheRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
293
  if (cache == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
294 295 296 297
  int ref = T_REF_INC(cache);
  UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
298
  if (cache == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
299
  int ref = T_REF_DEC(cache);
dengyihao's avatar
dengyihao 已提交
300
  if (ref == 0) { indexCacheDestroy(cache); }
dengyihao's avatar
dengyihao 已提交
301
}
302 303

void indexMemRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
304
  if (tbl == NULL) { return; }
305 306 307 308
  int ref = T_REF_INC(tbl);
  UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
309
  if (tbl == NULL) { return; }
310 311 312 313 314 315 316 317 318
  int ref = T_REF_DEC(tbl);
  if (ref == 0) {
    SSkipList* slt = tbl->mem;
    indexCacheDestroySkiplist(slt);
    free(tbl);
  }
}

static void cacheTermDestroy(CacheTerm* ct) {
dengyihao's avatar
dengyihao 已提交
319
  if (ct == NULL) { return; }
320 321 322 323 324 325 326 327 328 329 330 331 332
  free(ct->colVal);
  free(ct);
}
static char* getIndexKey(const void* pData) {
  CacheTerm* p = (CacheTerm*)pData;
  return (char*)p;
}

static int32_t compareKey(const void* l, const void* r) {
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;

  // compare colVal
dengyihao's avatar
dengyihao 已提交
333
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
dengyihao's avatar
dengyihao 已提交
334
  if (cmp == 0) { return rt->version - lt->version; }
dengyihao's avatar
dengyihao 已提交
335
  return cmp;
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
}

static MemTable* indexInternalCacheCreate(int8_t type) {
  MemTable* tbl = calloc(1, sizeof(MemTable));
  indexMemRef(tbl);
  if (type == TSDB_DATA_TYPE_BINARY) {
    tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
  }
  return tbl;
}

static void doMergeWork(SSchedMsg* msg) {
  IndexCache* pCache = msg->ahandle;
  SIndex*     sidx = (SIndex*)pCache->index;
  indexFlushCacheTFile(sidx, pCache);
}
static bool indexCacheIteratorNext(Iterate* itera) {
  SSkipListIterator* iter = itera->iter;
dengyihao's avatar
dengyihao 已提交
354
  if (iter == NULL) { return false; }
355 356 357 358 359 360 361 362 363
  IterateValue* iv = &itera->val;
  iterateValueDestroy(iv, false);

  bool next = tSkipListIterNext(iter);
  if (next) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);

    iv->type = ct->operaType;
dengyihao's avatar
dengyihao 已提交
364 365
    iv->colVal = calloc(1, strlen(ct->colVal) + 1);
    memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
366 367 368 369 370 371

    taosArrayPush(iv->val, &ct->uid);
  }
  return next;
}

dengyihao's avatar
dengyihao 已提交
372
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; }