index.c 20.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
dengyihao's avatar
dengyihao 已提交
5 6
 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
 * Software Foundation.
H
refact  
Hongze Cheng 已提交
7 8 9 10 11 12 13 14 15
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include "indexCache.h"
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
20 21
#include "indexTfile.h"
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
22 23
#include "tcoding.h"
#include "tdataformat.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
26
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
27

dengyihao's avatar
dengyihao 已提交
28 29
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
30 31
#endif

dengyihao's avatar
dengyihao 已提交
32
#define INDEX_NUM_OF_THREADS 5
dengyihao's avatar
dengyihao 已提交
33
#define INDEX_QUEUE_SIZE     200
dengyihao's avatar
dengyihao 已提交
34

dengyihao's avatar
dengyihao 已提交
35 36 37
#define INDEX_DATA_BOOL_NULL      0x02
#define INDEX_DATA_TINYINT_NULL   0x80
#define INDEX_DATA_SMALLINT_NULL  0x8000
wafwerar's avatar
wafwerar 已提交
38 39
#define INDEX_DATA_INT_NULL       0x80000000LL
#define INDEX_DATA_BIGINT_NULL    0x8000000000000000LL
dengyihao's avatar
dengyihao 已提交
40 41
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL

dengyihao's avatar
dengyihao 已提交
42
#define INDEX_DATA_FLOAT_NULL    0x7FF00000            // it is an NAN
wafwerar's avatar
wafwerar 已提交
43
#define INDEX_DATA_DOUBLE_NULL   0x7FFFFF0000000000LL  // an NAN
dengyihao's avatar
dengyihao 已提交
44 45 46 47
#define INDEX_DATA_NCHAR_NULL    0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL   0xFF
#define INDEX_DATA_JSON_NULL     0xFFFFFFFF
#define INDEX_DATA_JSON_null     0xFFFFFFFE
dengyihao's avatar
dengyihao 已提交
48 49
#define INDEX_DATA_JSON_NOT_NULL 0x01

dengyihao's avatar
dengyihao 已提交
50
#define INDEX_DATA_UTINYINT_NULL  0xFF
dengyihao's avatar
dengyihao 已提交
51
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
dengyihao's avatar
dengyihao 已提交
52 53
#define INDEX_DATA_UINT_NULL      0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL   0xFFFFFFFFFFFFFFFFL
dengyihao's avatar
dengyihao 已提交
54

dengyihao's avatar
dengyihao 已提交
55
#define INDEX_DATA_NULL_STR   "NULL"
dengyihao's avatar
dengyihao 已提交
56 57
#define INDEX_DATA_NULL_STR_L "null"

dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
void*   indexQhandle = NULL;
int32_t indexRefMgt;

static void indexDestroy(void* sIdx);

dengyihao's avatar
dengyihao 已提交
63 64
void indexInit() {
  // refactor later
D
dapan1121 已提交
65
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index", NULL);
dengyihao's avatar
dengyihao 已提交
66
  indexRefMgt = taosOpenRef(1000, indexDestroy);
dengyihao's avatar
dengyihao 已提交
67
}
dengyihao's avatar
dengyihao 已提交
68
void indexCleanup() {
dengyihao's avatar
dengyihao 已提交
69 70
  // refacto later
  taosCleanUpScheduler(indexQhandle);
dengyihao's avatar
dengyihao 已提交
71
  taosCloseRef(indexRefMgt);
dengyihao's avatar
dengyihao 已提交
72
}
dengyihao's avatar
dengyihao 已提交
73

dengyihao's avatar
dengyihao 已提交
74
typedef struct SIdxColInfo {
75
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
76
  int cVersion;
77
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
78

wafwerar's avatar
wafwerar 已提交
79
static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
80
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
81
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
82

dengyihao's avatar
dengyihao 已提交
83 84
static void idxInterRsltDestroy(SArray* results);
static int  idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
dengyihao's avatar
dengyihao 已提交
85

dengyihao's avatar
dengyihao 已提交
86
static int idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
dengyihao's avatar
dengyihao 已提交
87

dengyihao's avatar
dengyihao 已提交
88
// merge cache and tfile by opera type
dengyihao's avatar
dengyihao 已提交
89
static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91 92
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t        indexSerialKey(ICacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
93

dengyihao's avatar
dengyihao 已提交
94
static void idxPost(void* idx) {
dengyihao's avatar
dengyihao 已提交
95 96 97 98 99 100 101 102
  SIndex* pIdx = idx;
  tsem_post(&pIdx->sem);
}
static void indexWait(void* idx) {
  SIndex* pIdx = idx;
  tsem_wait(&pIdx->sem);
}

dengyihao's avatar
dengyihao 已提交
103
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
dengyihao's avatar
dengyihao 已提交
104
  int ret = TSDB_CODE_SUCCESS;
wafwerar's avatar
wafwerar 已提交
105
  taosThreadOnce(&isInit, indexInit);
dengyihao's avatar
dengyihao 已提交
106 107
  SIndex* idx = taosMemoryCalloc(1, sizeof(SIndex));
  if (idx == NULL) {
dengyihao's avatar
dengyihao 已提交
108
    return TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
109
  }
dengyihao's avatar
dengyihao 已提交
110

dengyihao's avatar
dengyihao 已提交
111 112 113 114 115
  idx->lru = taosLRUCacheInit(opts->cacheSize, -1, .5);
  if (idx->lru == NULL) {
    ret = TSDB_CODE_OUT_OF_MEMORY;
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
116
  taosLRUCacheSetStrictCapacity(idx->lru, false);
dengyihao's avatar
dengyihao 已提交
117 118 119

  idx->tindex = idxTFileCreate(idx, path);
  if (idx->tindex == NULL) {
dengyihao's avatar
dengyihao 已提交
120
    ret = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
121 122
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
123

dengyihao's avatar
dengyihao 已提交
124 125 126 127 128
  idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  idx->cVersion = 1;
  idx->path = tstrdup(path);
  taosThreadMutexInit(&idx->mtx, NULL);
  tsem_init(&idx->sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
129

dengyihao's avatar
dengyihao 已提交
130
  idx->refId = idxAddRef(idx);
dengyihao's avatar
dengyihao 已提交
131
  idx->opts = *opts;
dengyihao's avatar
dengyihao 已提交
132
  idxAcquireRef(idx->refId);
dengyihao's avatar
dengyihao 已提交
133

dengyihao's avatar
dengyihao 已提交
134
  *index = idx;
dengyihao's avatar
dengyihao 已提交
135
  return ret;
dengyihao's avatar
dengyihao 已提交
136

dengyihao's avatar
dengyihao 已提交
137
END:
dengyihao's avatar
dengyihao 已提交
138 139
  if (idx != NULL) {
    indexClose(idx);
dengyihao's avatar
dengyihao 已提交
140
  }
dengyihao's avatar
dengyihao 已提交
141
  *index = NULL;
dengyihao's avatar
dengyihao 已提交
142
  return ret;
H
refact  
Hongze Cheng 已提交
143
}
dengyihao's avatar
dengyihao 已提交
144

dengyihao's avatar
dengyihao 已提交
145
void indexDestroy(void* handle) {
dengyihao's avatar
dengyihao 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158
  SIndex* idx = handle;
  taosThreadMutexDestroy(&idx->mtx);
  tsem_destroy(&idx->sem);
  idxTFileDestroy(idx->tindex);
  taosMemoryFree(idx->path);

  SLRUCache* lru = idx->lru;
  if (lru != NULL) {
    taosLRUCacheEraseUnrefEntries(lru);
    taosLRUCacheCleanup(lru);
  }
  idx->lru = NULL;
  taosMemoryFree(idx);
dengyihao's avatar
dengyihao 已提交
159 160 161 162
  return;
}
void indexClose(SIndex* sIdx) {
  bool ref = 0;
dengyihao's avatar
dengyihao 已提交
163 164 165 166
  if (sIdx->colObj != NULL) {
    void* iter = taosHashIterate(sIdx->colObj, NULL);
    while (iter) {
      IndexCache** pCache = iter;
dengyihao's avatar
dengyihao 已提交
167
      idxCacheForceToMerge((void*)(*pCache));
168
      indexInfo("%s wait to merge", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
169
      indexWait((void*)(sIdx));
dengyihao's avatar
dengyihao 已提交
170
      indexInfo("%s finish to wait", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
171
      iter = taosHashIterate(sIdx->colObj, iter);
dengyihao's avatar
dengyihao 已提交
172
      idxCacheUnRef(*pCache);
dengyihao's avatar
dengyihao 已提交
173
    }
dengyihao's avatar
dengyihao 已提交
174 175
    taosHashCleanup(sIdx->colObj);
    sIdx->colObj = NULL;
dengyihao's avatar
dengyihao 已提交
176
  }
dengyihao's avatar
dengyihao 已提交
177

dengyihao's avatar
dengyihao 已提交
178 179
  idxReleaseRef(sIdx->refId);
  idxRemoveRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
180
}
dengyihao's avatar
dengyihao 已提交
181
int64_t idxAddRef(void* p) {
dengyihao's avatar
dengyihao 已提交
182 183 184
  // impl
  return taosAddRef(indexRefMgt, p);
}
dengyihao's avatar
dengyihao 已提交
185
int32_t idxRemoveRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
186 187 188 189
  // impl later
  return taosRemoveRef(indexRefMgt, ref);
}

dengyihao's avatar
dengyihao 已提交
190
void idxAcquireRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
191 192 193
  // impl
  taosAcquireRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
194
void idxReleaseRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
195 196 197
  // impl
  taosReleaseRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
198

dengyihao's avatar
dengyihao 已提交
199
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
200
  // TODO(yihao): reduce the lock range
wafwerar's avatar
wafwerar 已提交
201
  taosThreadMutexLock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
202
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
203 204
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
205 206
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
207
    int32_t   sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
208 209

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
210
    if (cache == NULL) {
dengyihao's avatar
dengyihao 已提交
211
      IndexCache* pCache = idxCacheCreate(index, p->suid, p->colName, p->colType);
dengyihao's avatar
dengyihao 已提交
212
      taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
213
    }
214
  }
dengyihao's avatar
dengyihao 已提交
215
  taosThreadMutexUnlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
216 217

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
218 219
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
220 221
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
222
    int32_t   sz = idxSerialCacheKey(&key, buf);
S
Shengliang Guan 已提交
223
    indexDebug("w suid:%" PRIu64 ", colName:%s, colType:%d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
224 225

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
226
    assert(*cache != NULL);
dengyihao's avatar
dengyihao 已提交
227
    int ret = idxCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
228 229 230
    if (ret != 0) {
      return ret;
    }
dengyihao's avatar
dengyihao 已提交
231
  }
dengyihao's avatar
dengyihao 已提交
232
  return 0;
dengyihao's avatar
dengyihao 已提交
233
}
dengyihao's avatar
dengyihao 已提交
234
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
235 236
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
237
  SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
238
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
239
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
240 241
    SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
    SArray*          trslt = NULL;
dengyihao's avatar
dengyihao 已提交
242
    idxTermSearch(index, qterm, &trslt);
dengyihao's avatar
dengyihao 已提交
243
    taosArrayPush(iRslts, (void*)&trslt);
244
  }
dengyihao's avatar
dengyihao 已提交
245 246
  idxMergeFinalResults(iRslts, opera, result);
  idxInterRsltDestroy(iRslts);
dengyihao's avatar
dengyihao 已提交
247
  return 0;
dengyihao's avatar
dengyihao 已提交
248 249
}

250
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
dengyihao's avatar
dengyihao 已提交
251
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
dengyihao's avatar
dengyihao 已提交
252

dengyihao's avatar
dengyihao 已提交
253 254 255 256 257 258
SIndexOpts* indexOptsCreate(int32_t cacheSize) {
  SIndexOpts* opts = taosMemoryCalloc(1, sizeof(SIndexOpts));
  opts->cacheSize = cacheSize;
  return opts;
}
void indexOptsDestroy(SIndexOpts* opts) { return taosMemoryFree(opts); }
dengyihao's avatar
dengyihao 已提交
259 260 261 262
/*
 * @param: oper
 *
 */
dengyihao's avatar
dengyihao 已提交
263
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
dengyihao's avatar
dengyihao 已提交
264 265
  SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
  if (mtq == NULL) {
dengyihao's avatar
dengyihao 已提交
266 267
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
268 269 270
  mtq->opera = opera;
  mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
  return mtq;
dengyihao's avatar
dengyihao 已提交
271
}
dengyihao's avatar
dengyihao 已提交
272
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
273
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
274
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
275
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
276
  }
277
  taosArrayDestroy(pQuery->query);
wafwerar's avatar
wafwerar 已提交
278
  taosMemoryFree(pQuery);
dengyihao's avatar
dengyihao 已提交
279
};
dengyihao's avatar
dengyihao 已提交
280
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
281
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
282 283
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
284 285
}

dengyihao's avatar
dengyihao 已提交
286 287
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
                            int32_t nColName, const char* colVal, int32_t nColVal) {
dengyihao's avatar
dengyihao 已提交
288 289
  SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
  if (tm == NULL) {
dengyihao's avatar
dengyihao 已提交
290 291
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
292

dengyihao's avatar
dengyihao 已提交
293 294 295
  tm->suid = suid;
  tm->operType = oper;
  tm->colType = colType;
dengyihao's avatar
dengyihao 已提交
296

dengyihao's avatar
dengyihao 已提交
297 298 299 300 301
  tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
  memcpy(tm->colName, colName, nColName);
  tm->nColName = nColName;

  char*   buf = NULL;
dengyihao's avatar
dengyihao 已提交
302 303 304 305 306 307 308 309 310 311 312
  int32_t len = 0;
  if (colVal != NULL && nColVal != 0) {
    len = idxConvertDataToStr((void*)colVal, IDX_TYPE_GET_TYPE(colType), (void**)&buf);
  } else if (colVal == NULL) {
    buf = strndup(INDEX_DATA_NULL_STR, (int32_t)strlen(INDEX_DATA_NULL_STR));
    len = (int32_t)strlen(INDEX_DATA_NULL_STR);
  } else {
    const char* emptyStr = " ";
    buf = strndup(emptyStr, (int32_t)strlen(emptyStr));
    len = (int32_t)strlen(emptyStr);
  }
dengyihao's avatar
dengyihao 已提交
313 314 315
  tm->colVal = buf;
  tm->nColVal = len;

dengyihao's avatar
dengyihao 已提交
316
  return tm;
dengyihao's avatar
dengyihao 已提交
317
}
dengyihao's avatar
dengyihao 已提交
318

dengyihao's avatar
dengyihao 已提交
319
void indexTermDestroy(SIndexTerm* p) {
wafwerar's avatar
wafwerar 已提交
320 321 322
  taosMemoryFree(p->colName);
  taosMemoryFree(p->colVal);
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
323 324
}

dengyihao's avatar
dengyihao 已提交
325
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
326

dengyihao's avatar
dengyihao 已提交
327
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
328 329
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
330
}
dengyihao's avatar
dengyihao 已提交
331
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
332
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
333
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
334 335
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
336
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
337
}
dengyihao's avatar
dengyihao 已提交
338

dengyihao's avatar
dengyihao 已提交
339 340 341 342 343
/*
 * rebuild index
 */

static void idxSchedRebuildIdx(SSchedMsg* msg) {
dengyihao's avatar
dengyihao 已提交
344
  // TODO, no need rebuild index
dengyihao's avatar
dengyihao 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
  SIndex* idx = msg->ahandle;

  int8_t st = kFinished;
  atomic_store_8(&idx->status, st);
  idxReleaseRef(idx->refId);
}
void indexRebuild(SIndexJson* idx, void* iter) {
  // set up rebuild status
  int8_t st = kRebuild;
  atomic_store_8(&idx->status, st);

  // task put into BG thread
  SSchedMsg schedMsg = {0};
  schedMsg.fp = idxSchedRebuildIdx;
  schedMsg.ahandle = idx;
  idxAcquireRef(idx->refId);
  taosScheduleTask(indexQhandle, &schedMsg);
}

/*
 * check index json status
 **/
bool indexIsRebuild(SIndex* idx) {
  // idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}
/*
 * rebuild index
 */
void indexJsonRebuild(SIndexJson* idx, void* iter) {
  // idx rebuild or not
  indexRebuild(idx, iter);
}

/*
 * check index json status
 **/
bool indexJsonIsRebuild(SIndexJson* idx) {
  // load idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}

dengyihao's avatar
dengyihao 已提交
387
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
dengyihao's avatar
dengyihao 已提交
388 389
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
390
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
391

dengyihao's avatar
dengyihao 已提交
392 393
  // Get col info
  IndexCache* cache = NULL;
dengyihao's avatar
dengyihao 已提交
394 395

  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
396 397
  ICacheKey key = {
      .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
S
Shengliang Guan 已提交
398
  indexDebug("r suid:%" PRIu64 ", colName:%s, colType:%d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
399
  int32_t sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
400

wafwerar's avatar
wafwerar 已提交
401
  taosThreadMutexLock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
402
  IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
403
  cache = (pCache == NULL) ? NULL : *pCache;
wafwerar's avatar
wafwerar 已提交
404
  taosThreadMutexUnlock(&sIdx->mtx);
405

dengyihao's avatar
dengyihao 已提交
406
  *result = taosArrayInit(4, sizeof(uint64_t));
407
  // TODO: iterator mem and tidex
dengyihao's avatar
dengyihao 已提交
408
  STermValueType s = kTypeValue;
dengyihao's avatar
dengyihao 已提交
409

dengyihao's avatar
add UT  
dengyihao 已提交
410 411
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
412
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
413
  if (0 == idxCacheSearch(cache, query, tr, &s)) {
dengyihao's avatar
dengyihao 已提交
414
    if (s == kTypeDeletion) {
dengyihao's avatar
dengyihao 已提交
415
      indexInfo("col: %s already drop by", term->colName);
416
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
417 418
      return 0;
    } else {
dengyihao's avatar
add UT  
dengyihao 已提交
419
      st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
420
      if (0 != idxTFileSearch(sIdx->tindex, query, tr)) {
421
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
422
        goto END;
423
      }
dengyihao's avatar
add UT  
dengyihao 已提交
424 425
      int64_t tfCost = taosGetTimestampUs() - st;
      indexInfo("tfile search cost: %" PRIu64 "us", tfCost);
dengyihao's avatar
dengyihao 已提交
426 427 428
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
429
    goto END;
dengyihao's avatar
dengyihao 已提交
430
  }
dengyihao's avatar
add UT  
dengyihao 已提交
431 432
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("search cost: %" PRIu64 "us", cost);
dengyihao's avatar
dengyihao 已提交
433

dengyihao's avatar
dengyihao 已提交
434
  idxTRsltMergeTo(tr, *result);
dengyihao's avatar
add UT  
dengyihao 已提交
435

dengyihao's avatar
dengyihao 已提交
436
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
437
  return 0;
dengyihao's avatar
dengyihao 已提交
438
END:
dengyihao's avatar
dengyihao 已提交
439
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
440
  return -1;
dengyihao's avatar
dengyihao 已提交
441
}
dengyihao's avatar
dengyihao 已提交
442
static void idxInterRsltDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
443 444 445
  if (results == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
446 447 448

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
449
    SArray* p = taosArrayGetP(results, i);
450 451
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
452 453
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
454

dengyihao's avatar
dengyihao 已提交
455
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
456
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
457 458
  for (int i = 0; i < taosArrayGetSize(in); i--) {
    SArray* t = taosArrayGetP(in, i);
dengyihao's avatar
dengyihao 已提交
459 460 461
    taosArraySort(t, uidCompare);
    taosArrayRemoveDuplicate(t, uidCompare, NULL);
  }
dengyihao's avatar
dengyihao 已提交
462

dengyihao's avatar
dengyihao 已提交
463
  if (oType == MUST) {
dengyihao's avatar
dengyihao 已提交
464
    iIntersection(in, out);
dengyihao's avatar
dengyihao 已提交
465
  } else if (oType == SHOULD) {
dengyihao's avatar
dengyihao 已提交
466
    iUnion(in, out);
dengyihao's avatar
dengyihao 已提交
467
  } else if (oType == NOT) {
468
    // just one column index, enhance later
dengyihao's avatar
dengyihao 已提交
469
    // taosArrayAddAll(fResults, interResults);
470
    // not use currently
dengyihao's avatar
dengyihao 已提交
471 472 473
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
474

dengyihao's avatar
dengyihao 已提交
475
static void idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
476 477 478 479
  int32_t sz = taosArrayGetSize(result);
  if (sz > 0) {
    TFileValue* lv = taosArrayGetP(result, sz - 1);
    if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
dengyihao's avatar
dengyihao 已提交
480 481
      idxTRsltMergeTo(tr, lv->tableId);
      idxTRsltClear(tr);
482 483 484

      taosArrayPush(result, &tfv);
    } else if (tfv == NULL) {
dengyihao's avatar
dengyihao 已提交
485
      // handle last iterator
dengyihao's avatar
dengyihao 已提交
486
      idxTRsltMergeTo(tr, lv->tableId);
487 488 489 490 491 492 493
    } else {
      tfileValueDestroy(tfv);
    }
  } else {
    taosArrayPush(result, &tfv);
  }
}
dengyihao's avatar
dengyihao 已提交
494
static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
495
  char*       colVal = (cv != NULL) ? cv->colVal : tv->colVal;
dengyihao's avatar
dengyihao 已提交
496
  TFileValue* tfv = tfileValueCreate(colVal);
497

dengyihao's avatar
dengyihao 已提交
498
  idxMayMergeTempToFinalRslt(result, tfv, tr);
499

dengyihao's avatar
dengyihao 已提交
500
  if (cv != NULL) {
501
    uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
dengyihao's avatar
dengyihao 已提交
502
    uint32_t ver = cv->ver;
dengyihao's avatar
dengyihao 已提交
503
    if (cv->type == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
504
      INDEX_MERGE_ADD_DEL(tr->del, tr->add, id)
dengyihao's avatar
dengyihao 已提交
505
    } else if (cv->type == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
506
      INDEX_MERGE_ADD_DEL(tr->add, tr->del, id)
dengyihao's avatar
dengyihao 已提交
507 508 509
    }
  }
  if (tv != NULL) {
dengyihao's avatar
dengyihao 已提交
510
    taosArrayAddAll(tr->total, tv->val);
dengyihao's avatar
dengyihao 已提交
511 512
  }
}
dengyihao's avatar
dengyihao 已提交
513
static void idxDestroyFinalRslt(SArray* result) {
dengyihao's avatar
dengyihao 已提交
514 515 516 517 518 519 520
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  for (size_t i = 0; i < sz; i++) {
    TFileValue* tv = taosArrayGetP(result, i);
    tfileValueDestroy(tv);
  }
  taosArrayDestroy(result);
}
521

dengyihao's avatar
dengyihao 已提交
522
int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
dengyihao's avatar
dengyihao 已提交
523 524 525
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
526
  indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
527

dengyihao's avatar
dengyihao 已提交
528 529
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
530 531
  IndexCache* pCache = (IndexCache*)cache;

dengyihao's avatar
dengyihao 已提交
532 533
  while (quit && atomic_load_32(&pCache->merging) == 1)
    ;
dengyihao's avatar
dengyihao 已提交
534
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
dengyihao's avatar
dengyihao 已提交
535 536 537
  if (pReader == NULL) {
    indexWarn("empty tfile reader found");
  }
dengyihao's avatar
dengyihao 已提交
538
  // handle flush
dengyihao's avatar
dengyihao 已提交
539
  Iterate* cacheIter = idxCacheIteratorCreate(pCache);
dengyihao's avatar
dengyihao 已提交
540 541
  if (cacheIter == NULL) {
    indexError("%p immtable is empty, ignore merge opera", pCache);
dengyihao's avatar
dengyihao 已提交
542
    idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
543
    tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
544 545
    atomic_store_32(&pCache->merging, 0);
    if (quit) {
dengyihao's avatar
dengyihao 已提交
546
      idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
547
    }
dengyihao's avatar
dengyihao 已提交
548
    idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
549 550 551
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
552
  Iterate* tfileIter = tfileIteratorCreate(pReader);
dengyihao's avatar
dengyihao 已提交
553 554 555
  if (tfileIter == NULL) {
    indexWarn("empty tfile reader iterator");
  }
dengyihao's avatar
dengyihao 已提交
556 557 558

  SArray* result = taosArrayInit(1024, sizeof(void*));

dengyihao's avatar
dengyihao 已提交
559 560
  bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
  bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
561

dengyihao's avatar
dengyihao 已提交
562
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
563 564 565 566 567 568 569 570 571 572 573 574
  while (cn == true || tn == true) {
    IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
    IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;

    int comp = 0;
    if (cn == true && tn == true) {
      comp = strcmp(cv->colVal, tv->colVal);
    } else if (cn == true) {
      comp = -1;
    } else {
      comp = 1;
    }
dengyihao's avatar
dengyihao 已提交
575
    if (comp == 0) {
dengyihao's avatar
dengyihao 已提交
576
      idxMergeCacheAndTFile(result, cv, tv, tr);
dengyihao's avatar
dengyihao 已提交
577 578 579
      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
    } else if (comp < 0) {
dengyihao's avatar
dengyihao 已提交
580
      idxMergeCacheAndTFile(result, cv, NULL, tr);
dengyihao's avatar
dengyihao 已提交
581 582
      cn = cacheIter->next(cacheIter);
    } else {
dengyihao's avatar
dengyihao 已提交
583
      idxMergeCacheAndTFile(result, NULL, tv, tr);
dengyihao's avatar
dengyihao 已提交
584 585 586
      tn = tfileIter->next(tfileIter);
    }
  }
dengyihao's avatar
dengyihao 已提交
587
  idxMayMergeTempToFinalRslt(result, NULL, tr);
dengyihao's avatar
dengyihao 已提交
588
  idxTRsltDestroy(tr);
589

dengyihao's avatar
dengyihao 已提交
590 591
  int ret = idxGenTFile(sIdx, pCache, result);
  idxDestroyFinalRslt(result);
dengyihao's avatar
dengyihao 已提交
592

dengyihao's avatar
dengyihao 已提交
593
  idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
594

dengyihao's avatar
dengyihao 已提交
595
  idxCacheIteratorDestroy(cacheIter);
dengyihao's avatar
dengyihao 已提交
596 597
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
598
  tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
599
  idxCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
600 601 602 603 604 605 606

  int64_t cost = taosGetTimestampUs() - st;
  if (ret != 0) {
    indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
  } else {
    indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
  }
dengyihao's avatar
dengyihao 已提交
607 608
  atomic_store_32(&pCache->merging, 0);
  if (quit) {
dengyihao's avatar
dengyihao 已提交
609
    idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
610
  }
dengyihao's avatar
dengyihao 已提交
611
  idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
612

dengyihao's avatar
dengyihao 已提交
613
  return ret;
dengyihao's avatar
dengyihao 已提交
614
}
dengyihao's avatar
dengyihao 已提交
615 616 617
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
dengyihao's avatar
dengyihao 已提交
618
    value->val = NULL;
dengyihao's avatar
dengyihao 已提交
619
  } else {
dengyihao's avatar
dengyihao 已提交
620 621 622
    if (value->val != NULL) {
      taosArrayClear(value->val);
    }
dengyihao's avatar
dengyihao 已提交
623
  }
wafwerar's avatar
wafwerar 已提交
624
  taosMemoryFree(value->colVal);
dengyihao's avatar
dengyihao 已提交
625 626
  value->colVal = NULL;
}
627

dengyihao's avatar
dengyihao 已提交
628
static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) {
629 630
  ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
  int64_t   ver = CACHE_VERSION(cache);
dengyihao's avatar
dengyihao 已提交
631

dengyihao's avatar
dengyihao 已提交
632
  IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
633 634

  taosThreadMutexLock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
635
  TFileReader* rd = tfileCacheGet(tf->cache, &key);
dengyihao's avatar
dengyihao 已提交
636 637 638
  taosThreadMutexUnlock(&tf->mtx);

  if (rd != NULL) {
dengyihao's avatar
dengyihao 已提交
639
    ver = (ver > rd->header.version ? ver : rd->header.version) + 1;
dengyihao's avatar
dengyihao 已提交
640
    indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver);
641
  }
dengyihao's avatar
dengyihao 已提交
642
  tfileReaderUnRef(rd);
643 644
  return ver;
}
dengyihao's avatar
dengyihao 已提交
645
static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
dengyihao's avatar
dengyihao 已提交
646
  int64_t version = idxGetAvailableVer(sIdx, cache);
647
  indexInfo("file name version: %" PRId64 "", version);
dengyihao's avatar
dengyihao 已提交
648 649
  uint8_t colType = cache->type;

dengyihao's avatar
dengyihao 已提交
650
  TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
dengyihao's avatar
dengyihao 已提交
651 652 653 654 655 656 657 658 659 660 661 662
  if (tw == NULL) {
    indexError("failed to open file to write");
    return -1;
  }

  int ret = tfileWriterPut(tw, batch, true);
  if (ret != 0) {
    indexError("failed to write into tindex ");
    goto END;
  }
  tfileWriterClose(tw);

dengyihao's avatar
dengyihao 已提交
663
  TFileReader* reader = tfileReaderOpen(sIdx, cache->suid, version, cache->colName);
dengyihao's avatar
dengyihao 已提交
664 665 666
  if (reader == NULL) {
    return -1;
  }
667
  indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
668

dengyihao's avatar
dengyihao 已提交
669 670
  IndexTFile* tf = (IndexTFile*)sIdx->tindex;

dengyihao's avatar
dengyihao 已提交
671
  TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
672
  ICacheKey    key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
673

dengyihao's avatar
dengyihao 已提交
674 675 676
  taosThreadMutexLock(&tf->mtx);
  tfileCachePut(tf->cache, &key, reader);
  taosThreadMutexUnlock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
677

dengyihao's avatar
dengyihao 已提交
678 679
  return ret;
END:
dengyihao's avatar
dengyihao 已提交
680
  if (tw != NULL) {
dengyihao's avatar
dengyihao 已提交
681
    idxFileCtxDestroy(tw->ctx, true);
wafwerar's avatar
wafwerar 已提交
682
    taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
683
  }
dengyihao's avatar
dengyihao 已提交
684
  return -1;
dengyihao's avatar
dengyihao 已提交
685
}
dengyihao's avatar
dengyihao 已提交
686

dengyihao's avatar
dengyihao 已提交
687
int32_t idxSerialCacheKey(ICacheKey* key, char* buf) {
dengyihao's avatar
dengyihao 已提交
688
  bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
689

dengyihao's avatar
dengyihao 已提交
690
  char* p = buf;
dengyihao's avatar
dengyihao 已提交
691
  char  tbuf[65] = {0};
dengyihao's avatar
dengyihao 已提交
692
  idxInt2str((int64_t)key->suid, tbuf, 0);
dengyihao's avatar
dengyihao 已提交
693 694

  SERIALIZE_STR_VAR_TO_BUF(buf, tbuf, strlen(tbuf));
dengyihao's avatar
dengyihao 已提交
695
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
696 697 698 699 700
  if (hasJson) {
    SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
  } else {
    SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
  }
dengyihao's avatar
dengyihao 已提交
701 702
  return buf - p;
}