index_cache.c 10.5 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "index_cache.h"
dengyihao's avatar
dengyihao 已提交
17
#include "index_util.h"
18
#include "tcompare.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tsched.h"
dengyihao's avatar
dengyihao 已提交
20

21
#define MAX_INDEX_KEY_LEN 256  // test only, change later
dengyihao's avatar
dengyihao 已提交
22

dengyihao's avatar
dengyihao 已提交
23
#define MEM_TERM_LIMIT 5 * 10000
24
// ref index_cache.h:22
25
//#define CACHE_KEY_LEN(p) \
dengyihao's avatar
dengyihao 已提交
26 27
//  (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
//  sizeof(p->operType))
dengyihao's avatar
dengyihao 已提交
28

dengyihao's avatar
dengyihao 已提交
29 30
static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl);
31

dengyihao's avatar
dengyihao 已提交
32 33 34
static void    cacheTermDestroy(CacheTerm* ct);
static char*   getIndexKey(const void* pData);
static int32_t compareKey(const void* l, const void* r);
35

36
static MemTable* indexInternalCacheCreate(int8_t type);
37

dengyihao's avatar
dengyihao 已提交
38 39 40
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);

41
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
dengyihao's avatar
dengyihao 已提交
42

dengyihao's avatar
dengyihao 已提交
43
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
dengyihao's avatar
dengyihao 已提交
44
  IndexCache* cache = calloc(1, sizeof(IndexCache));
dengyihao's avatar
dengyihao 已提交
45 46 47
  if (cache == NULL) {
    indexError("failed to create index cache");
    return NULL;
dengyihao's avatar
dengyihao 已提交
48 49 50 51 52 53 54 55
  };
  cache->mem = indexInternalCacheCreate(type);

  cache->colName = calloc(1, strlen(colName) + 1);
  memcpy(cache->colName, colName, strlen(colName));
  cache->type = type;
  cache->index = idx;
  cache->version = 0;
dengyihao's avatar
dengyihao 已提交
56
  cache->suid = suid;
dengyihao's avatar
dengyihao 已提交
57
  pthread_mutex_init(&cache->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
58
  indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
59 60
  return cache;
}
61
void indexCacheDebug(IndexCache* cache) {
62 63 64 65 66 67 68
  MemTable* tbl = NULL;

  pthread_mutex_lock(&cache->mtx);
  tbl = cache->mem;
  indexMemRef(tbl);
  pthread_mutex_unlock(&cache->mtx);

dengyihao's avatar
dengyihao 已提交
69 70 71 72 73 74 75 76 77 78
  {
    SSkipList*         slt = tbl->mem;
    SSkipListIterator* iter = tSkipListCreateIter(slt);
    while (tSkipListIterNext(iter)) {
      SSkipListNode* node = tSkipListIterGet(iter);
      CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (ct != NULL) {
        // TODO, add more debug info
        indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
      }
79
    }
dengyihao's avatar
dengyihao 已提交
80 81 82
    tSkipListDestroyIter(iter);

    indexMemUnRef(tbl);
83
  }
84

dengyihao's avatar
dengyihao 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
  {
    pthread_mutex_lock(&cache->mtx);
    tbl = cache->imm;
    indexMemRef(tbl);
    pthread_mutex_unlock(&cache->mtx);
    if (tbl != NULL) {
      SSkipList*         slt = tbl->mem;
      SSkipListIterator* iter = tSkipListCreateIter(slt);
      while (tSkipListIterNext(iter)) {
        SSkipListNode* node = tSkipListIterGet(iter);
        CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
        if (ct != NULL) {
          // TODO, add more debug info
          indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
        }
      }
      tSkipListDestroyIter(iter);
    }

    indexMemUnRef(tbl);
  }
106
}
dengyihao's avatar
dengyihao 已提交
107

dengyihao's avatar
dengyihao 已提交
108 109 110 111 112
void indexCacheDestroySkiplist(SSkipList* slt) {
  SSkipListIterator* iter = tSkipListCreateIter(slt);
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);
dengyihao's avatar
dengyihao 已提交
113 114 115 116
    if (ct != NULL) {
      free(ct->colVal);
      free(ct);
    }
dengyihao's avatar
dengyihao 已提交
117 118
  }
  tSkipListDestroyIter(iter);
119
  tSkipListDestroy(slt);
dengyihao's avatar
dengyihao 已提交
120
}
dengyihao's avatar
dengyihao 已提交
121
void indexCacheDestroyImm(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
122 123
  if (cache == NULL) { return; }

124
  MemTable* tbl = NULL;
dengyihao's avatar
dengyihao 已提交
125
  pthread_mutex_lock(&cache->mtx);
126
  tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
127 128
  cache->imm = NULL;  // or throw int bg thread
  pthread_mutex_unlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
129 130

  indexMemUnRef(tbl);
131
  indexMemUnRef(tbl);
dengyihao's avatar
dengyihao 已提交
132
}
dengyihao's avatar
dengyihao 已提交
133 134
void indexCacheDestroy(void* cache) {
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
135
  if (pCache == NULL) { return; }
136 137
  indexMemUnRef(pCache->mem);
  indexMemUnRef(pCache->imm);
dengyihao's avatar
dengyihao 已提交
138
  free(pCache->colName);
dengyihao's avatar
dengyihao 已提交
139

dengyihao's avatar
dengyihao 已提交
140
  free(pCache);
dengyihao's avatar
dengyihao 已提交
141 142
}

dengyihao's avatar
dengyihao 已提交
143 144
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
  Iterate* iiter = calloc(1, sizeof(Iterate));
dengyihao's avatar
dengyihao 已提交
145 146 147 148 149
  if (iiter == NULL) { return NULL; }

  pthread_mutex_lock(&cache->mtx);

  indexMemRef(cache->imm);
dengyihao's avatar
dengyihao 已提交
150

151
  MemTable* tbl = cache->imm;
dengyihao's avatar
dengyihao 已提交
152
  iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
153
  iiter->val.colVal = NULL;
154
  iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
dengyihao's avatar
dengyihao 已提交
155 156 157
  iiter->next = indexCacheIteratorNext;
  iiter->getValue = indexCacheIteratorGetValue;

dengyihao's avatar
dengyihao 已提交
158 159
  pthread_mutex_unlock(&cache->mtx);

dengyihao's avatar
dengyihao 已提交
160 161 162
  return iiter;
}
void indexCacheIteratorDestroy(Iterate* iter) {
dengyihao's avatar
dengyihao 已提交
163
  if (iter == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
164 165 166 167
  tSkipListDestroyIter(iter->iter);
  iterateValueDestroy(&iter->val, true);
  free(iter);
}
dengyihao's avatar
dengyihao 已提交
168 169 170 171 172 173 174 175 176 177

int indexCacheSchedToMerge(IndexCache* pCache) {
  SSchedMsg schedMsg = {0};
  schedMsg.fp = doMergeWork;
  schedMsg.ahandle = pCache;
  schedMsg.thandle = NULL;
  schedMsg.msg = NULL;

  taosScheduleTask(indexQhandle, &schedMsg);
}
dengyihao's avatar
dengyihao 已提交
178 179 180 181 182 183 184
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
  while (true) {
    if (cache->nTerm < MEM_TERM_LIMIT) {
      cache->nTerm += 1;
      break;
    } else if (cache->imm != NULL) {
      // TODO: wake up by condition variable
dengyihao's avatar
dengyihao 已提交
185
      pthread_mutex_unlock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
186
      taosMsleep(50);
dengyihao's avatar
dengyihao 已提交
187
      pthread_mutex_lock(&cache->mtx);
dengyihao's avatar
dengyihao 已提交
188
    } else {
dengyihao's avatar
dengyihao 已提交
189
      indexCacheRef(cache);
dengyihao's avatar
dengyihao 已提交
190 191 192 193 194 195 196 197 198 199
      cache->imm = cache->mem;
      cache->mem = indexInternalCacheCreate(cache->type);
      cache->nTerm = 1;
      // sched to merge
      // unref cache in bgwork
      indexCacheSchedToMerge(cache);
    }
  }
}

dengyihao's avatar
dengyihao 已提交
200
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
dengyihao's avatar
dengyihao 已提交
201
  if (cache == NULL) { return -1; }
dengyihao's avatar
dengyihao 已提交
202

dengyihao's avatar
dengyihao 已提交
203
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
204
  indexCacheRef(pCache);
dengyihao's avatar
dengyihao 已提交
205
  // encode data
206
  CacheTerm* ct = calloc(1, sizeof(CacheTerm));
dengyihao's avatar
dengyihao 已提交
207
  if (cache == NULL) { return -1; }
208 209
  // set up key
  ct->colType = term->colType;
dengyihao's avatar
dengyihao 已提交
210 211
  ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
  memcpy(ct->colVal, term->colVal, term->nColVal);
dengyihao's avatar
dengyihao 已提交
212 213
  ct->version = atomic_add_fetch_32(&pCache->version, 1);
  // set value
214 215 216
  ct->uid = uid;
  ct->operaType = term->operType;

dengyihao's avatar
dengyihao 已提交
217 218
  // ugly code, refactor later
  pthread_mutex_lock(&pCache->mtx);
219

dengyihao's avatar
dengyihao 已提交
220
  indexCacheMakeRoomForWrite(pCache);
221 222 223 224 225
  MemTable* tbl = pCache->mem;
  indexMemRef(tbl);
  tSkipListPut(tbl->mem, (char*)ct);
  indexMemUnRef(tbl);

dengyihao's avatar
dengyihao 已提交
226
  pthread_mutex_unlock(&pCache->mtx);
dengyihao's avatar
dengyihao 已提交
227 228

  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
229
  return 0;
dengyihao's avatar
dengyihao 已提交
230
  // encode end
dengyihao's avatar
dengyihao 已提交
231
}
dengyihao's avatar
dengyihao 已提交
232
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
dengyihao's avatar
dengyihao 已提交
233
  IndexCache* pCache = cache;
dengyihao's avatar
dengyihao 已提交
234
  return 0;
dengyihao's avatar
dengyihao 已提交
235
}
236

dengyihao's avatar
dengyihao 已提交
237 238
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
  if (mem == NULL) { return 0; }
239
  char* key = getIndexKey(ct);
dengyihao's avatar
dengyihao 已提交
240

241
  SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
242 243 244 245 246
  while (tSkipListIterNext(iter)) {
    SSkipListNode* node = tSkipListIterGet(iter);
    if (node != NULL) {
      CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
      if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
dengyihao's avatar
dengyihao 已提交
247
        if (strcmp(c->colVal, ct->colVal) == 0) {
248 249 250 251 252 253 254 255 256 257 258 259 260
          taosArrayPush(result, &c->uid);
          *s = kTypeValue;
        } else {
          break;
        }
      } else if (c->operaType == DEL_VALUE) {
        // table is del, not need
        *s = kTypeDeletion;
        break;
      }
    }
  }
  tSkipListDestroyIter(iter);
dengyihao's avatar
dengyihao 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
  return 0;
}
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
  if (cache == NULL) { return -1; }
  IndexCache* pCache = cache;

  MemTable *mem = NULL, *imm = NULL;
  pthread_mutex_lock(&pCache->mtx);
  mem = pCache->mem;
  imm = pCache->imm;
  indexMemRef(mem);
  indexMemRef(imm);
  pthread_mutex_unlock(&pCache->mtx);

  SIndexTerm*     term = query->term;
  EIndexQueryType qtype = query->qType;
  CacheTerm       ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
dengyihao's avatar
dengyihao 已提交
278
  // indexCacheDebug(pCache);
dengyihao's avatar
dengyihao 已提交
279 280 281 282 283

  int ret = indexQueryMem(mem, &ct, qtype, result, s);
  if (ret == 0 && *s != kTypeDeletion) {
    // continue search in imm
    ret = indexQueryMem(imm, &ct, qtype, result, s);
dengyihao's avatar
dengyihao 已提交
284
  }
dengyihao's avatar
dengyihao 已提交
285 286
  // cacheTermDestroy(ct);

287 288
  indexMemUnRef(mem);
  indexMemUnRef(imm);
dengyihao's avatar
dengyihao 已提交
289 290

  return ret;
dengyihao's avatar
dengyihao 已提交
291
}
dengyihao's avatar
dengyihao 已提交
292 293

void indexCacheRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
294
  if (cache == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
295 296 297 298
  int ref = T_REF_INC(cache);
  UNUSED(ref);
}
void indexCacheUnRef(IndexCache* cache) {
dengyihao's avatar
dengyihao 已提交
299
  if (cache == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
300
  int ref = T_REF_DEC(cache);
dengyihao's avatar
dengyihao 已提交
301
  if (ref == 0) { indexCacheDestroy(cache); }
dengyihao's avatar
dengyihao 已提交
302
}
303 304

void indexMemRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
305
  if (tbl == NULL) { return; }
306 307 308 309
  int ref = T_REF_INC(tbl);
  UNUSED(ref);
}
void indexMemUnRef(MemTable* tbl) {
dengyihao's avatar
dengyihao 已提交
310
  if (tbl == NULL) { return; }
311 312 313 314 315 316 317 318 319
  int ref = T_REF_DEC(tbl);
  if (ref == 0) {
    SSkipList* slt = tbl->mem;
    indexCacheDestroySkiplist(slt);
    free(tbl);
  }
}

static void cacheTermDestroy(CacheTerm* ct) {
dengyihao's avatar
dengyihao 已提交
320
  if (ct == NULL) { return; }
321 322 323 324 325 326 327 328 329 330 331 332 333
  free(ct->colVal);
  free(ct);
}
static char* getIndexKey(const void* pData) {
  CacheTerm* p = (CacheTerm*)pData;
  return (char*)p;
}

static int32_t compareKey(const void* l, const void* r) {
  CacheTerm* lt = (CacheTerm*)l;
  CacheTerm* rt = (CacheTerm*)r;

  // compare colVal
dengyihao's avatar
dengyihao 已提交
334
  int32_t cmp = strcmp(lt->colVal, rt->colVal);
dengyihao's avatar
dengyihao 已提交
335
  if (cmp == 0) { return rt->version - lt->version; }
dengyihao's avatar
dengyihao 已提交
336
  return cmp;
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
}

static MemTable* indexInternalCacheCreate(int8_t type) {
  MemTable* tbl = calloc(1, sizeof(MemTable));
  indexMemRef(tbl);
  if (type == TSDB_DATA_TYPE_BINARY) {
    tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
  }
  return tbl;
}

static void doMergeWork(SSchedMsg* msg) {
  IndexCache* pCache = msg->ahandle;
  SIndex*     sidx = (SIndex*)pCache->index;
  indexFlushCacheTFile(sidx, pCache);
}
static bool indexCacheIteratorNext(Iterate* itera) {
  SSkipListIterator* iter = itera->iter;
dengyihao's avatar
dengyihao 已提交
355
  if (iter == NULL) { return false; }
356
  IterateValue* iv = &itera->val;
dengyihao's avatar
dengyihao 已提交
357 358 359
  if (iv->colVal != NULL && iv->val != NULL) {
    // indexError("value in cache: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
  }
360 361 362 363 364 365 366 367
  iterateValueDestroy(iv, false);

  bool next = tSkipListIterNext(iter);
  if (next) {
    SSkipListNode* node = tSkipListIterGet(iter);
    CacheTerm*     ct = (CacheTerm*)SL_GET_NODE_DATA(node);

    iv->type = ct->operaType;
dengyihao's avatar
dengyihao 已提交
368 369
    iv->colVal = calloc(1, strlen(ct->colVal) + 1);
    memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
370 371 372 373 374 375

    taosArrayPush(iv->val, &ct->uid);
  }
  return next;
}

dengyihao's avatar
dengyihao 已提交
376
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; }