index.c 20.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
dengyihao's avatar
dengyihao 已提交
5 6
 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
 * Software Foundation.
H
refact  
Hongze Cheng 已提交
7 8 9 10 11 12 13 14 15
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include "indexCache.h"
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
20 21
#include "indexTfile.h"
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
22 23
#include "tcoding.h"
#include "tdataformat.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
26
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
27

dengyihao's avatar
dengyihao 已提交
28 29
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
30 31
#endif

dengyihao's avatar
dengyihao 已提交
32
#define INDEX_NUM_OF_THREADS 5
dengyihao's avatar
dengyihao 已提交
33
#define INDEX_QUEUE_SIZE     200
dengyihao's avatar
dengyihao 已提交
34

dengyihao's avatar
dengyihao 已提交
35 36 37
#define INDEX_DATA_BOOL_NULL      0x02
#define INDEX_DATA_TINYINT_NULL   0x80
#define INDEX_DATA_SMALLINT_NULL  0x8000
wafwerar's avatar
wafwerar 已提交
38 39
#define INDEX_DATA_INT_NULL       0x80000000LL
#define INDEX_DATA_BIGINT_NULL    0x8000000000000000LL
dengyihao's avatar
dengyihao 已提交
40 41
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL

dengyihao's avatar
dengyihao 已提交
42
#define INDEX_DATA_FLOAT_NULL    0x7FF00000            // it is an NAN
wafwerar's avatar
wafwerar 已提交
43
#define INDEX_DATA_DOUBLE_NULL   0x7FFFFF0000000000LL  // an NAN
dengyihao's avatar
dengyihao 已提交
44 45 46 47
#define INDEX_DATA_NCHAR_NULL    0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL   0xFF
#define INDEX_DATA_JSON_NULL     0xFFFFFFFF
#define INDEX_DATA_JSON_null     0xFFFFFFFE
dengyihao's avatar
dengyihao 已提交
48 49
#define INDEX_DATA_JSON_NOT_NULL 0x01

dengyihao's avatar
dengyihao 已提交
50
#define INDEX_DATA_UTINYINT_NULL  0xFF
dengyihao's avatar
dengyihao 已提交
51
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
dengyihao's avatar
dengyihao 已提交
52 53
#define INDEX_DATA_UINT_NULL      0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL   0xFFFFFFFFFFFFFFFFL
dengyihao's avatar
dengyihao 已提交
54

dengyihao's avatar
dengyihao 已提交
55
#define INDEX_DATA_NULL_STR   "NULL"
dengyihao's avatar
dengyihao 已提交
56 57
#define INDEX_DATA_NULL_STR_L "null"

dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
void*   indexQhandle = NULL;
int32_t indexRefMgt;

static void indexDestroy(void* sIdx);

dengyihao's avatar
dengyihao 已提交
63 64
void indexInit() {
  // refactor later
dengyihao's avatar
dengyihao 已提交
65
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
dengyihao's avatar
dengyihao 已提交
66
  indexRefMgt = taosOpenRef(1000, indexDestroy);
dengyihao's avatar
dengyihao 已提交
67
}
dengyihao's avatar
dengyihao 已提交
68
void indexCleanup() {
dengyihao's avatar
dengyihao 已提交
69 70
  // refacto later
  taosCleanUpScheduler(indexQhandle);
dengyihao's avatar
dengyihao 已提交
71
  taosCloseRef(indexRefMgt);
dengyihao's avatar
dengyihao 已提交
72
}
dengyihao's avatar
dengyihao 已提交
73

dengyihao's avatar
dengyihao 已提交
74
typedef struct SIdxColInfo {
75
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
76
  int cVersion;
77
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
78

wafwerar's avatar
wafwerar 已提交
79
static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
80
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
81
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
82

dengyihao's avatar
dengyihao 已提交
83 84
static void idxInterRsltDestroy(SArray* results);
static int  idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
dengyihao's avatar
dengyihao 已提交
85

dengyihao's avatar
dengyihao 已提交
86
static int idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
dengyihao's avatar
dengyihao 已提交
87

dengyihao's avatar
dengyihao 已提交
88
// merge cache and tfile by opera type
dengyihao's avatar
dengyihao 已提交
89
static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91 92
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t        indexSerialKey(ICacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
93

dengyihao's avatar
dengyihao 已提交
94
static void idxPost(void* idx) {
dengyihao's avatar
dengyihao 已提交
95 96 97 98 99 100 101 102
  SIndex* pIdx = idx;
  tsem_post(&pIdx->sem);
}
static void indexWait(void* idx) {
  SIndex* pIdx = idx;
  tsem_wait(&pIdx->sem);
}

dengyihao's avatar
dengyihao 已提交
103
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
dengyihao's avatar
dengyihao 已提交
104
  int ret = TSDB_CODE_SUCCESS;
wafwerar's avatar
wafwerar 已提交
105
  taosThreadOnce(&isInit, indexInit);
dengyihao's avatar
dengyihao 已提交
106 107
  SIndex* idx = taosMemoryCalloc(1, sizeof(SIndex));
  if (idx == NULL) {
dengyihao's avatar
dengyihao 已提交
108
    return TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
109
  }
dengyihao's avatar
dengyihao 已提交
110

dengyihao's avatar
dengyihao 已提交
111 112 113 114 115 116 117 118 119
  idx->lru = taosLRUCacheInit(opts->cacheSize, -1, .5);
  if (idx->lru == NULL) {
    ret = TSDB_CODE_OUT_OF_MEMORY;
    goto END;
  }
  taosLRUCacheSetStrictCapacity(idx->lru, true);

  idx->tindex = idxTFileCreate(idx, path);
  if (idx->tindex == NULL) {
dengyihao's avatar
dengyihao 已提交
120
    ret = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
121 122
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
123

dengyihao's avatar
dengyihao 已提交
124 125 126 127 128
  idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  idx->cVersion = 1;
  idx->path = tstrdup(path);
  taosThreadMutexInit(&idx->mtx, NULL);
  tsem_init(&idx->sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
129

dengyihao's avatar
dengyihao 已提交
130 131 132
  idx->refId = idxAddRef(idx);
  idx->opts = opts;
  idxAcquireRef(idx->refId);
dengyihao's avatar
dengyihao 已提交
133

dengyihao's avatar
dengyihao 已提交
134
  *index = idx;
dengyihao's avatar
dengyihao 已提交
135
  return ret;
dengyihao's avatar
dengyihao 已提交
136

dengyihao's avatar
dengyihao 已提交
137
END:
dengyihao's avatar
dengyihao 已提交
138 139
  if (idx != NULL) {
    indexClose(idx);
dengyihao's avatar
dengyihao 已提交
140
  }
dengyihao's avatar
dengyihao 已提交
141
  *index = NULL;
dengyihao's avatar
dengyihao 已提交
142
  return ret;
H
refact  
Hongze Cheng 已提交
143
}
dengyihao's avatar
dengyihao 已提交
144

dengyihao's avatar
dengyihao 已提交
145
void indexDestroy(void* handle) {
dengyihao's avatar
dengyihao 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
  SIndex* idx = handle;
  taosThreadMutexDestroy(&idx->mtx);
  tsem_destroy(&idx->sem);
  idxTFileDestroy(idx->tindex);
  taosMemoryFree(idx->path);

  SLRUCache* lru = idx->lru;
  if (lru != NULL) {
    taosLRUCacheEraseUnrefEntries(lru);
    taosLRUCacheCleanup(lru);
  }
  idx->lru = NULL;

  indexOptsDestroy(idx->opts);
  taosMemoryFree(idx);
dengyihao's avatar
dengyihao 已提交
161 162 163 164
  return;
}
void indexClose(SIndex* sIdx) {
  bool ref = 0;
dengyihao's avatar
dengyihao 已提交
165 166 167 168
  if (sIdx->colObj != NULL) {
    void* iter = taosHashIterate(sIdx->colObj, NULL);
    while (iter) {
      IndexCache** pCache = iter;
dengyihao's avatar
dengyihao 已提交
169
      idxCacheForceToMerge((void*)(*pCache));
170
      indexInfo("%s wait to merge", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
171
      indexWait((void*)(sIdx));
dengyihao's avatar
dengyihao 已提交
172
      indexInfo("%s finish to wait", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
173
      iter = taosHashIterate(sIdx->colObj, iter);
dengyihao's avatar
dengyihao 已提交
174
      idxCacheUnRef(*pCache);
dengyihao's avatar
dengyihao 已提交
175
    }
dengyihao's avatar
dengyihao 已提交
176 177
    taosHashCleanup(sIdx->colObj);
    sIdx->colObj = NULL;
dengyihao's avatar
dengyihao 已提交
178
  }
dengyihao's avatar
dengyihao 已提交
179

dengyihao's avatar
dengyihao 已提交
180 181
  idxReleaseRef(sIdx->refId);
  idxRemoveRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
182
}
dengyihao's avatar
dengyihao 已提交
183
int64_t idxAddRef(void* p) {
dengyihao's avatar
dengyihao 已提交
184 185 186
  // impl
  return taosAddRef(indexRefMgt, p);
}
dengyihao's avatar
dengyihao 已提交
187
int32_t idxRemoveRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
188 189 190 191
  // impl later
  return taosRemoveRef(indexRefMgt, ref);
}

dengyihao's avatar
dengyihao 已提交
192
void idxAcquireRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
193 194 195
  // impl
  taosAcquireRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
196
void idxReleaseRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
197 198 199
  // impl
  taosReleaseRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
200

dengyihao's avatar
dengyihao 已提交
201
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
202
  // TODO(yihao): reduce the lock range
wafwerar's avatar
wafwerar 已提交
203
  taosThreadMutexLock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
204
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
205 206
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
207 208
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
209
    int32_t   sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
210 211

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
212
    if (cache == NULL) {
dengyihao's avatar
dengyihao 已提交
213
      IndexCache* pCache = idxCacheCreate(index, p->suid, p->colName, p->colType);
dengyihao's avatar
dengyihao 已提交
214
      taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
215
    }
216
  }
dengyihao's avatar
dengyihao 已提交
217
  taosThreadMutexUnlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
218 219

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
220 221
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
222 223
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
224
    int32_t   sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
225
    indexDebug("w suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
226 227

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
228
    assert(*cache != NULL);
dengyihao's avatar
dengyihao 已提交
229
    int ret = idxCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
230 231 232
    if (ret != 0) {
      return ret;
    }
dengyihao's avatar
dengyihao 已提交
233
  }
dengyihao's avatar
dengyihao 已提交
234
  return 0;
dengyihao's avatar
dengyihao 已提交
235
}
dengyihao's avatar
dengyihao 已提交
236
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
237 238
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
239
  SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
240
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
241
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
242 243
    SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
    SArray*          trslt = NULL;
dengyihao's avatar
dengyihao 已提交
244
    idxTermSearch(index, qterm, &trslt);
dengyihao's avatar
dengyihao 已提交
245
    taosArrayPush(iRslts, (void*)&trslt);
246
  }
dengyihao's avatar
dengyihao 已提交
247 248
  idxMergeFinalResults(iRslts, opera, result);
  idxInterRsltDestroy(iRslts);
dengyihao's avatar
dengyihao 已提交
249
  return 0;
dengyihao's avatar
dengyihao 已提交
250 251
}

252
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
dengyihao's avatar
dengyihao 已提交
253
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
dengyihao's avatar
dengyihao 已提交
254

dengyihao's avatar
dengyihao 已提交
255 256 257 258 259 260
SIndexOpts* indexOptsCreate(int32_t cacheSize) {
  SIndexOpts* opts = taosMemoryCalloc(1, sizeof(SIndexOpts));
  opts->cacheSize = cacheSize;
  return opts;
}
void indexOptsDestroy(SIndexOpts* opts) { return taosMemoryFree(opts); }
dengyihao's avatar
dengyihao 已提交
261 262 263 264
/*
 * @param: oper
 *
 */
dengyihao's avatar
dengyihao 已提交
265
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
dengyihao's avatar
dengyihao 已提交
266 267
  SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
  if (mtq == NULL) {
dengyihao's avatar
dengyihao 已提交
268 269
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
270 271 272
  mtq->opera = opera;
  mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
  return mtq;
dengyihao's avatar
dengyihao 已提交
273
}
dengyihao's avatar
dengyihao 已提交
274
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
275
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
276
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
277
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
278
  }
279
  taosArrayDestroy(pQuery->query);
wafwerar's avatar
wafwerar 已提交
280
  taosMemoryFree(pQuery);
dengyihao's avatar
dengyihao 已提交
281
};
dengyihao's avatar
dengyihao 已提交
282
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
283
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
284 285
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
286 287
}

dengyihao's avatar
dengyihao 已提交
288 289
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
                            int32_t nColName, const char* colVal, int32_t nColVal) {
dengyihao's avatar
dengyihao 已提交
290 291
  SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
  if (tm == NULL) {
dengyihao's avatar
dengyihao 已提交
292 293
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
294

dengyihao's avatar
dengyihao 已提交
295 296 297
  tm->suid = suid;
  tm->operType = oper;
  tm->colType = colType;
dengyihao's avatar
dengyihao 已提交
298

dengyihao's avatar
dengyihao 已提交
299 300 301 302 303
  tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
  memcpy(tm->colName, colName, nColName);
  tm->nColName = nColName;

  char*   buf = NULL;
dengyihao's avatar
dengyihao 已提交
304 305 306 307 308 309 310 311 312 313 314
  int32_t len = 0;
  if (colVal != NULL && nColVal != 0) {
    len = idxConvertDataToStr((void*)colVal, IDX_TYPE_GET_TYPE(colType), (void**)&buf);
  } else if (colVal == NULL) {
    buf = strndup(INDEX_DATA_NULL_STR, (int32_t)strlen(INDEX_DATA_NULL_STR));
    len = (int32_t)strlen(INDEX_DATA_NULL_STR);
  } else {
    const char* emptyStr = " ";
    buf = strndup(emptyStr, (int32_t)strlen(emptyStr));
    len = (int32_t)strlen(emptyStr);
  }
dengyihao's avatar
dengyihao 已提交
315 316 317
  tm->colVal = buf;
  tm->nColVal = len;

dengyihao's avatar
dengyihao 已提交
318
  return tm;
dengyihao's avatar
dengyihao 已提交
319
}
dengyihao's avatar
dengyihao 已提交
320

dengyihao's avatar
dengyihao 已提交
321
void indexTermDestroy(SIndexTerm* p) {
wafwerar's avatar
wafwerar 已提交
322 323 324
  taosMemoryFree(p->colName);
  taosMemoryFree(p->colVal);
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
325 326
}

dengyihao's avatar
dengyihao 已提交
327
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
328

dengyihao's avatar
dengyihao 已提交
329
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
330 331
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
332
}
dengyihao's avatar
dengyihao 已提交
333
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
334
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
335
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
336 337
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
338
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
339
}
dengyihao's avatar
dengyihao 已提交
340

dengyihao's avatar
dengyihao 已提交
341 342 343 344 345
/*
 * rebuild index
 */

static void idxSchedRebuildIdx(SSchedMsg* msg) {
dengyihao's avatar
dengyihao 已提交
346
  // TODO, no need rebuild index
dengyihao's avatar
dengyihao 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
  SIndex* idx = msg->ahandle;

  int8_t st = kFinished;
  atomic_store_8(&idx->status, st);
  idxReleaseRef(idx->refId);
}
void indexRebuild(SIndexJson* idx, void* iter) {
  // set up rebuild status
  int8_t st = kRebuild;
  atomic_store_8(&idx->status, st);

  // task put into BG thread
  SSchedMsg schedMsg = {0};
  schedMsg.fp = idxSchedRebuildIdx;
  schedMsg.ahandle = idx;
  idxAcquireRef(idx->refId);
  taosScheduleTask(indexQhandle, &schedMsg);
}

/*
 * check index json status
 **/
bool indexIsRebuild(SIndex* idx) {
  // idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}
/*
 * rebuild index
 */
void indexJsonRebuild(SIndexJson* idx, void* iter) {
  // idx rebuild or not
  indexRebuild(idx, iter);
}

/*
 * check index json status
 **/
bool indexJsonIsRebuild(SIndexJson* idx) {
  // load idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}

dengyihao's avatar
dengyihao 已提交
389
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
dengyihao's avatar
dengyihao 已提交
390 391
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
392
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
393

dengyihao's avatar
dengyihao 已提交
394 395
  // Get col info
  IndexCache* cache = NULL;
dengyihao's avatar
dengyihao 已提交
396 397

  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
398 399
  ICacheKey key = {
      .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
dengyihao's avatar
dengyihao 已提交
400
  indexDebug("r suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
401
  int32_t sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
402

wafwerar's avatar
wafwerar 已提交
403
  taosThreadMutexLock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
404
  IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
405
  cache = (pCache == NULL) ? NULL : *pCache;
wafwerar's avatar
wafwerar 已提交
406
  taosThreadMutexUnlock(&sIdx->mtx);
407

dengyihao's avatar
dengyihao 已提交
408
  *result = taosArrayInit(4, sizeof(uint64_t));
409
  // TODO: iterator mem and tidex
dengyihao's avatar
dengyihao 已提交
410
  STermValueType s = kTypeValue;
dengyihao's avatar
dengyihao 已提交
411

dengyihao's avatar
add UT  
dengyihao 已提交
412 413
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
414
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
415
  if (0 == idxCacheSearch(cache, query, tr, &s)) {
dengyihao's avatar
dengyihao 已提交
416
    if (s == kTypeDeletion) {
dengyihao's avatar
dengyihao 已提交
417
      indexInfo("col: %s already drop by", term->colName);
418
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
419 420
      return 0;
    } else {
dengyihao's avatar
add UT  
dengyihao 已提交
421
      st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
422
      if (0 != idxTFileSearch(sIdx->tindex, query, tr)) {
423
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
424
        goto END;
425
      }
dengyihao's avatar
add UT  
dengyihao 已提交
426 427
      int64_t tfCost = taosGetTimestampUs() - st;
      indexInfo("tfile search cost: %" PRIu64 "us", tfCost);
dengyihao's avatar
dengyihao 已提交
428 429 430
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
431
    goto END;
dengyihao's avatar
dengyihao 已提交
432
  }
dengyihao's avatar
add UT  
dengyihao 已提交
433 434
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("search cost: %" PRIu64 "us", cost);
dengyihao's avatar
dengyihao 已提交
435

dengyihao's avatar
dengyihao 已提交
436
  idxTRsltMergeTo(tr, *result);
dengyihao's avatar
add UT  
dengyihao 已提交
437

dengyihao's avatar
dengyihao 已提交
438
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
439
  return 0;
dengyihao's avatar
dengyihao 已提交
440
END:
dengyihao's avatar
dengyihao 已提交
441
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
442
  return -1;
dengyihao's avatar
dengyihao 已提交
443
}
dengyihao's avatar
dengyihao 已提交
444
static void idxInterRsltDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
445 446 447
  if (results == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
448 449 450

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
451
    SArray* p = taosArrayGetP(results, i);
452 453
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
454 455
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
456

dengyihao's avatar
dengyihao 已提交
457
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
458
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
459 460
  for (int i = 0; i < taosArrayGetSize(in); i--) {
    SArray* t = taosArrayGetP(in, i);
dengyihao's avatar
dengyihao 已提交
461 462 463
    taosArraySort(t, uidCompare);
    taosArrayRemoveDuplicate(t, uidCompare, NULL);
  }
dengyihao's avatar
dengyihao 已提交
464

dengyihao's avatar
dengyihao 已提交
465
  if (oType == MUST) {
dengyihao's avatar
dengyihao 已提交
466
    iIntersection(in, out);
dengyihao's avatar
dengyihao 已提交
467
  } else if (oType == SHOULD) {
dengyihao's avatar
dengyihao 已提交
468
    iUnion(in, out);
dengyihao's avatar
dengyihao 已提交
469
  } else if (oType == NOT) {
470
    // just one column index, enhance later
dengyihao's avatar
dengyihao 已提交
471
    // taosArrayAddAll(fResults, interResults);
472
    // not use currently
dengyihao's avatar
dengyihao 已提交
473 474 475
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
476

dengyihao's avatar
dengyihao 已提交
477
static void idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
478 479 480 481
  int32_t sz = taosArrayGetSize(result);
  if (sz > 0) {
    TFileValue* lv = taosArrayGetP(result, sz - 1);
    if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
dengyihao's avatar
dengyihao 已提交
482 483
      idxTRsltMergeTo(tr, lv->tableId);
      idxTRsltClear(tr);
484 485 486

      taosArrayPush(result, &tfv);
    } else if (tfv == NULL) {
dengyihao's avatar
dengyihao 已提交
487
      // handle last iterator
dengyihao's avatar
dengyihao 已提交
488
      idxTRsltMergeTo(tr, lv->tableId);
489
    } else {
dengyihao's avatar
dengyihao 已提交
490
      // temp result saved in help
491 492 493 494 495 496
      tfileValueDestroy(tfv);
    }
  } else {
    taosArrayPush(result, &tfv);
  }
}
dengyihao's avatar
dengyihao 已提交
497
static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
498
  char*       colVal = (cv != NULL) ? cv->colVal : tv->colVal;
dengyihao's avatar
dengyihao 已提交
499
  TFileValue* tfv = tfileValueCreate(colVal);
500

dengyihao's avatar
dengyihao 已提交
501
  idxMayMergeTempToFinalRslt(result, tfv, tr);
502

dengyihao's avatar
dengyihao 已提交
503
  if (cv != NULL) {
504
    uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
dengyihao's avatar
dengyihao 已提交
505
    uint32_t ver = cv->ver;
dengyihao's avatar
dengyihao 已提交
506
    if (cv->type == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
507
      INDEX_MERGE_ADD_DEL(tr->del, tr->add, id)
dengyihao's avatar
dengyihao 已提交
508
    } else if (cv->type == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
509
      INDEX_MERGE_ADD_DEL(tr->add, tr->del, id)
dengyihao's avatar
dengyihao 已提交
510 511 512
    }
  }
  if (tv != NULL) {
dengyihao's avatar
dengyihao 已提交
513
    taosArrayAddAll(tr->total, tv->val);
dengyihao's avatar
dengyihao 已提交
514 515
  }
}
dengyihao's avatar
dengyihao 已提交
516
static void idxDestroyFinalRslt(SArray* result) {
dengyihao's avatar
dengyihao 已提交
517 518 519 520 521 522 523
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  for (size_t i = 0; i < sz; i++) {
    TFileValue* tv = taosArrayGetP(result, i);
    tfileValueDestroy(tv);
  }
  taosArrayDestroy(result);
}
524

dengyihao's avatar
dengyihao 已提交
525
int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
dengyihao's avatar
dengyihao 已提交
526 527 528
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
529
  indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
530

dengyihao's avatar
dengyihao 已提交
531 532
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
533 534
  IndexCache* pCache = (IndexCache*)cache;

dengyihao's avatar
dengyihao 已提交
535 536
  while (quit && atomic_load_32(&pCache->merging) == 1)
    ;
dengyihao's avatar
dengyihao 已提交
537
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
dengyihao's avatar
dengyihao 已提交
538 539 540
  if (pReader == NULL) {
    indexWarn("empty tfile reader found");
  }
dengyihao's avatar
dengyihao 已提交
541
  // handle flush
dengyihao's avatar
dengyihao 已提交
542
  Iterate* cacheIter = idxCacheIteratorCreate(pCache);
dengyihao's avatar
dengyihao 已提交
543 544
  if (cacheIter == NULL) {
    indexError("%p immtable is empty, ignore merge opera", pCache);
dengyihao's avatar
dengyihao 已提交
545
    idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
546
    tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
547 548
    atomic_store_32(&pCache->merging, 0);
    if (quit) {
dengyihao's avatar
dengyihao 已提交
549
      idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
550
    }
dengyihao's avatar
dengyihao 已提交
551
    idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
552 553 554
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
555
  Iterate* tfileIter = tfileIteratorCreate(pReader);
dengyihao's avatar
dengyihao 已提交
556 557 558
  if (tfileIter == NULL) {
    indexWarn("empty tfile reader iterator");
  }
dengyihao's avatar
dengyihao 已提交
559 560 561

  SArray* result = taosArrayInit(1024, sizeof(void*));

dengyihao's avatar
dengyihao 已提交
562 563
  bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
  bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
564

dengyihao's avatar
dengyihao 已提交
565
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
566 567 568 569 570 571 572 573 574 575 576 577
  while (cn == true || tn == true) {
    IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
    IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;

    int comp = 0;
    if (cn == true && tn == true) {
      comp = strcmp(cv->colVal, tv->colVal);
    } else if (cn == true) {
      comp = -1;
    } else {
      comp = 1;
    }
dengyihao's avatar
dengyihao 已提交
578
    if (comp == 0) {
dengyihao's avatar
dengyihao 已提交
579
      idxMergeCacheAndTFile(result, cv, tv, tr);
dengyihao's avatar
dengyihao 已提交
580 581 582
      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
    } else if (comp < 0) {
dengyihao's avatar
dengyihao 已提交
583
      idxMergeCacheAndTFile(result, cv, NULL, tr);
dengyihao's avatar
dengyihao 已提交
584 585
      cn = cacheIter->next(cacheIter);
    } else {
dengyihao's avatar
dengyihao 已提交
586
      idxMergeCacheAndTFile(result, NULL, tv, tr);
dengyihao's avatar
dengyihao 已提交
587 588 589
      tn = tfileIter->next(tfileIter);
    }
  }
dengyihao's avatar
dengyihao 已提交
590
  idxMayMergeTempToFinalRslt(result, NULL, tr);
dengyihao's avatar
dengyihao 已提交
591
  idxTRsltDestroy(tr);
592

dengyihao's avatar
dengyihao 已提交
593 594
  int ret = idxGenTFile(sIdx, pCache, result);
  idxDestroyFinalRslt(result);
dengyihao's avatar
dengyihao 已提交
595

dengyihao's avatar
dengyihao 已提交
596
  idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
597

dengyihao's avatar
dengyihao 已提交
598
  idxCacheIteratorDestroy(cacheIter);
dengyihao's avatar
dengyihao 已提交
599 600
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
601
  tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
602
  idxCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
603 604 605 606 607 608 609

  int64_t cost = taosGetTimestampUs() - st;
  if (ret != 0) {
    indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
  } else {
    indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
  }
dengyihao's avatar
dengyihao 已提交
610 611
  atomic_store_32(&pCache->merging, 0);
  if (quit) {
dengyihao's avatar
dengyihao 已提交
612
    idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
613
  }
dengyihao's avatar
dengyihao 已提交
614
  idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
615

dengyihao's avatar
dengyihao 已提交
616
  return ret;
dengyihao's avatar
dengyihao 已提交
617
}
dengyihao's avatar
dengyihao 已提交
618 619 620
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
dengyihao's avatar
dengyihao 已提交
621
    value->val = NULL;
dengyihao's avatar
dengyihao 已提交
622
  } else {
dengyihao's avatar
dengyihao 已提交
623 624 625
    if (value->val != NULL) {
      taosArrayClear(value->val);
    }
dengyihao's avatar
dengyihao 已提交
626
  }
wafwerar's avatar
wafwerar 已提交
627
  taosMemoryFree(value->colVal);
dengyihao's avatar
dengyihao 已提交
628 629
  value->colVal = NULL;
}
630

dengyihao's avatar
dengyihao 已提交
631
static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) {
632 633
  ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
  int64_t   ver = CACHE_VERSION(cache);
dengyihao's avatar
dengyihao 已提交
634

dengyihao's avatar
dengyihao 已提交
635
  IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
636 637

  taosThreadMutexLock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
638
  TFileReader* rd = tfileCacheGet(tf->cache, &key);
dengyihao's avatar
dengyihao 已提交
639 640 641
  taosThreadMutexUnlock(&tf->mtx);

  if (rd != NULL) {
dengyihao's avatar
dengyihao 已提交
642
    ver = (ver > rd->header.version ? ver : rd->header.version) + 1;
dengyihao's avatar
dengyihao 已提交
643
    indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver);
644
  }
dengyihao's avatar
dengyihao 已提交
645
  tfileReaderUnRef(rd);
646 647
  return ver;
}
dengyihao's avatar
dengyihao 已提交
648
static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
dengyihao's avatar
dengyihao 已提交
649
  int64_t version = idxGetAvailableVer(sIdx, cache);
650
  indexInfo("file name version: %" PRId64 "", version);
dengyihao's avatar
dengyihao 已提交
651 652
  uint8_t colType = cache->type;

dengyihao's avatar
dengyihao 已提交
653
  TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
dengyihao's avatar
dengyihao 已提交
654 655 656 657 658 659 660 661 662 663 664 665
  if (tw == NULL) {
    indexError("failed to open file to write");
    return -1;
  }

  int ret = tfileWriterPut(tw, batch, true);
  if (ret != 0) {
    indexError("failed to write into tindex ");
    goto END;
  }
  tfileWriterClose(tw);

dengyihao's avatar
dengyihao 已提交
666
  TFileReader* reader = tfileReaderOpen(sIdx, cache->suid, version, cache->colName);
dengyihao's avatar
dengyihao 已提交
667 668 669
  if (reader == NULL) {
    return -1;
  }
670
  indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
671

dengyihao's avatar
dengyihao 已提交
672 673
  IndexTFile* tf = (IndexTFile*)sIdx->tindex;

dengyihao's avatar
dengyihao 已提交
674
  TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
675
  ICacheKey    key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
676

dengyihao's avatar
dengyihao 已提交
677 678 679
  taosThreadMutexLock(&tf->mtx);
  tfileCachePut(tf->cache, &key, reader);
  taosThreadMutexUnlock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
680

dengyihao's avatar
dengyihao 已提交
681 682
  return ret;
END:
dengyihao's avatar
dengyihao 已提交
683
  if (tw != NULL) {
dengyihao's avatar
dengyihao 已提交
684
    idxFileCtxDestroy(tw->ctx, true);
wafwerar's avatar
wafwerar 已提交
685
    taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
686
  }
dengyihao's avatar
dengyihao 已提交
687
  return -1;
dengyihao's avatar
dengyihao 已提交
688
}
dengyihao's avatar
dengyihao 已提交
689

dengyihao's avatar
dengyihao 已提交
690
int32_t idxSerialCacheKey(ICacheKey* key, char* buf) {
dengyihao's avatar
dengyihao 已提交
691
  bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
692

dengyihao's avatar
dengyihao 已提交
693
  char* p = buf;
dengyihao's avatar
dengyihao 已提交
694
  char  tbuf[65] = {0};
dengyihao's avatar
dengyihao 已提交
695
  idxInt2str((int64_t)key->suid, tbuf, 0);
dengyihao's avatar
dengyihao 已提交
696 697

  SERIALIZE_STR_VAR_TO_BUF(buf, tbuf, strlen(tbuf));
dengyihao's avatar
dengyihao 已提交
698
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
699 700 701 702 703
  if (hasJson) {
    SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
  } else {
    SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
  }
dengyihao's avatar
dengyihao 已提交
704 705
  return buf - p;
}