index.c 19.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
dengyihao's avatar
dengyihao 已提交
5 6
 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
 * Software Foundation.
H
refact  
Hongze Cheng 已提交
7 8 9 10 11 12 13 14 15
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include "indexCache.h"
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
20 21
#include "indexTfile.h"
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
22 23
#include "tcoding.h"
#include "tdataformat.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
26
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
27

dengyihao's avatar
dengyihao 已提交
28
#define INDEX_NUM_OF_THREADS 5
dengyihao's avatar
dengyihao 已提交
29
#define INDEX_QUEUE_SIZE     200
dengyihao's avatar
dengyihao 已提交
30

dengyihao's avatar
dengyihao 已提交
31 32 33
#define INDEX_DATA_BOOL_NULL      0x02
#define INDEX_DATA_TINYINT_NULL   0x80
#define INDEX_DATA_SMALLINT_NULL  0x8000
wafwerar's avatar
wafwerar 已提交
34 35
#define INDEX_DATA_INT_NULL       0x80000000LL
#define INDEX_DATA_BIGINT_NULL    0x8000000000000000LL
dengyihao's avatar
dengyihao 已提交
36 37
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL

dengyihao's avatar
dengyihao 已提交
38
#define INDEX_DATA_FLOAT_NULL    0x7FF00000            // it is an NAN
wafwerar's avatar
wafwerar 已提交
39
#define INDEX_DATA_DOUBLE_NULL   0x7FFFFF0000000000LL  // an NAN
dengyihao's avatar
dengyihao 已提交
40 41 42 43
#define INDEX_DATA_NCHAR_NULL    0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL   0xFF
#define INDEX_DATA_JSON_NULL     0xFFFFFFFF
#define INDEX_DATA_JSON_null     0xFFFFFFFE
dengyihao's avatar
dengyihao 已提交
44 45
#define INDEX_DATA_JSON_NOT_NULL 0x01

dengyihao's avatar
dengyihao 已提交
46
#define INDEX_DATA_UTINYINT_NULL  0xFF
dengyihao's avatar
dengyihao 已提交
47
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
dengyihao's avatar
dengyihao 已提交
48 49
#define INDEX_DATA_UINT_NULL      0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL   0xFFFFFFFFFFFFFFFFL
dengyihao's avatar
dengyihao 已提交
50

dengyihao's avatar
dengyihao 已提交
51
#define INDEX_DATA_NULL_STR   "NULL"
dengyihao's avatar
dengyihao 已提交
52 53
#define INDEX_DATA_NULL_STR_L "null"

dengyihao's avatar
dengyihao 已提交
54 55 56 57 58
void*   indexQhandle = NULL;
int32_t indexRefMgt;

static void indexDestroy(void* sIdx);

dengyihao's avatar
dengyihao 已提交
59 60
void indexInit() {
  // refactor later
D
dapan1121 已提交
61
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index", NULL);
dengyihao's avatar
dengyihao 已提交
62
  indexRefMgt = taosOpenRef(1000, indexDestroy);
dengyihao's avatar
dengyihao 已提交
63
}
dengyihao's avatar
dengyihao 已提交
64
void indexCleanup() {
dengyihao's avatar
dengyihao 已提交
65 66
  // refacto later
  taosCleanUpScheduler(indexQhandle);
67
  taosMemoryFreeClear(indexQhandle);
dengyihao's avatar
dengyihao 已提交
68
  taosCloseRef(indexRefMgt);
dengyihao's avatar
dengyihao 已提交
69
}
dengyihao's avatar
dengyihao 已提交
70

dengyihao's avatar
dengyihao 已提交
71
typedef struct SIdxColInfo {
72
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
73
  int version;
74
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
75

wafwerar's avatar
wafwerar 已提交
76
static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
77
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
78
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
79

dengyihao's avatar
dengyihao 已提交
80 81
static void idxInterRsltDestroy(SArray* results);
static int  idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
dengyihao's avatar
dengyihao 已提交
82

dengyihao's avatar
dengyihao 已提交
83
static int idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
dengyihao's avatar
dengyihao 已提交
84

dengyihao's avatar
dengyihao 已提交
85
// merge cache and tfile by opera type
dengyihao's avatar
dengyihao 已提交
86
static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
dengyihao's avatar
dengyihao 已提交
87

dengyihao's avatar
dengyihao 已提交
88 89
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t        indexSerialKey(ICacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91
static void idxPost(void* idx) {
dengyihao's avatar
dengyihao 已提交
92 93 94 95 96 97 98 99
  SIndex* pIdx = idx;
  tsem_post(&pIdx->sem);
}
static void indexWait(void* idx) {
  SIndex* pIdx = idx;
  tsem_wait(&pIdx->sem);
}

dengyihao's avatar
dengyihao 已提交
100
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
dengyihao's avatar
dengyihao 已提交
101
  int ret = TSDB_CODE_SUCCESS;
wafwerar's avatar
wafwerar 已提交
102
  taosThreadOnce(&isInit, indexInit);
dengyihao's avatar
dengyihao 已提交
103 104
  SIndex* idx = taosMemoryCalloc(1, sizeof(SIndex));
  if (idx == NULL) {
dengyihao's avatar
dengyihao 已提交
105
    return TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
106
  }
dengyihao's avatar
dengyihao 已提交
107

dengyihao's avatar
dengyihao 已提交
108 109 110 111 112
  idx->lru = taosLRUCacheInit(opts->cacheSize, -1, .5);
  if (idx->lru == NULL) {
    ret = TSDB_CODE_OUT_OF_MEMORY;
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
113
  taosLRUCacheSetStrictCapacity(idx->lru, false);
dengyihao's avatar
dengyihao 已提交
114 115 116

  idx->tindex = idxTFileCreate(idx, path);
  if (idx->tindex == NULL) {
dengyihao's avatar
dengyihao 已提交
117
    ret = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
118 119
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
120

dengyihao's avatar
dengyihao 已提交
121
  idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
dengyihao's avatar
dengyihao 已提交
122
  idx->version = 1;
dengyihao's avatar
dengyihao 已提交
123 124 125
  idx->path = tstrdup(path);
  taosThreadMutexInit(&idx->mtx, NULL);
  tsem_init(&idx->sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
126

dengyihao's avatar
dengyihao 已提交
127
  idx->refId = idxAddRef(idx);
dengyihao's avatar
dengyihao 已提交
128
  idx->opts = *opts;
dengyihao's avatar
dengyihao 已提交
129
  idxAcquireRef(idx->refId);
dengyihao's avatar
dengyihao 已提交
130

dengyihao's avatar
dengyihao 已提交
131
  *index = idx;
dengyihao's avatar
dengyihao 已提交
132
  return ret;
dengyihao's avatar
dengyihao 已提交
133

dengyihao's avatar
dengyihao 已提交
134
END:
dengyihao's avatar
dengyihao 已提交
135 136
  if (idx != NULL) {
    indexClose(idx);
dengyihao's avatar
dengyihao 已提交
137
  }
dengyihao's avatar
dengyihao 已提交
138
  *index = NULL;
dengyihao's avatar
dengyihao 已提交
139
  return ret;
H
refact  
Hongze Cheng 已提交
140
}
dengyihao's avatar
dengyihao 已提交
141

dengyihao's avatar
dengyihao 已提交
142
void indexDestroy(void* handle) {
dengyihao's avatar
dengyihao 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155
  SIndex* idx = handle;
  taosThreadMutexDestroy(&idx->mtx);
  tsem_destroy(&idx->sem);
  idxTFileDestroy(idx->tindex);
  taosMemoryFree(idx->path);

  SLRUCache* lru = idx->lru;
  if (lru != NULL) {
    taosLRUCacheEraseUnrefEntries(lru);
    taosLRUCacheCleanup(lru);
  }
  idx->lru = NULL;
  taosMemoryFree(idx);
dengyihao's avatar
dengyihao 已提交
156 157 158 159
  return;
}
void indexClose(SIndex* sIdx) {
  bool ref = 0;
dengyihao's avatar
dengyihao 已提交
160 161 162 163
  if (sIdx->colObj != NULL) {
    void* iter = taosHashIterate(sIdx->colObj, NULL);
    while (iter) {
      IndexCache** pCache = iter;
dengyihao's avatar
dengyihao 已提交
164
      idxCacheForceToMerge((void*)(*pCache));
165
      indexInfo("%s wait to merge", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
166
      indexWait((void*)(sIdx));
dengyihao's avatar
dengyihao 已提交
167
      indexInfo("%s finish to wait", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
168
      iter = taosHashIterate(sIdx->colObj, iter);
dengyihao's avatar
dengyihao 已提交
169
      idxCacheUnRef(*pCache);
dengyihao's avatar
dengyihao 已提交
170
    }
dengyihao's avatar
dengyihao 已提交
171 172
    taosHashCleanup(sIdx->colObj);
    sIdx->colObj = NULL;
dengyihao's avatar
dengyihao 已提交
173
  }
dengyihao's avatar
dengyihao 已提交
174

dengyihao's avatar
dengyihao 已提交
175 176
  idxReleaseRef(sIdx->refId);
  idxRemoveRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
177
}
dengyihao's avatar
dengyihao 已提交
178
int64_t idxAddRef(void* p) {
dengyihao's avatar
dengyihao 已提交
179 180 181
  // impl
  return taosAddRef(indexRefMgt, p);
}
dengyihao's avatar
dengyihao 已提交
182
int32_t idxRemoveRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
183 184 185 186
  // impl later
  return taosRemoveRef(indexRefMgt, ref);
}

dengyihao's avatar
dengyihao 已提交
187
void idxAcquireRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
188 189 190
  // impl
  taosAcquireRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
191
void idxReleaseRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
192 193 194
  // impl
  taosReleaseRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
195

dengyihao's avatar
dengyihao 已提交
196
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
197
  // TODO(yihao): reduce the lock range
wafwerar's avatar
wafwerar 已提交
198
  taosThreadMutexLock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
199
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
200 201
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
202 203
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
204
    int32_t   sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
205 206

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
207
    if (cache == NULL) {
dengyihao's avatar
dengyihao 已提交
208
      IndexCache* pCache = idxCacheCreate(index, p->suid, p->colName, p->colType);
dengyihao's avatar
dengyihao 已提交
209
      taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
210
    }
211
  }
dengyihao's avatar
dengyihao 已提交
212
  taosThreadMutexUnlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
213 214

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
215 216
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
217 218
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
219
    int32_t   sz = idxSerialCacheKey(&key, buf);
S
Shengliang Guan 已提交
220
    indexDebug("w suid:%" PRIu64 ", colName:%s, colType:%d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
221 222

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
223
    assert(*cache != NULL);
dengyihao's avatar
dengyihao 已提交
224
    int ret = idxCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
225 226 227
    if (ret != 0) {
      return ret;
    }
dengyihao's avatar
dengyihao 已提交
228
  }
dengyihao's avatar
dengyihao 已提交
229
  return 0;
dengyihao's avatar
dengyihao 已提交
230
}
dengyihao's avatar
dengyihao 已提交
231
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
232 233
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
234
  SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
235
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
236
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
237 238
    SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
    SArray*          trslt = NULL;
dengyihao's avatar
dengyihao 已提交
239
    idxTermSearch(index, qterm, &trslt);
dengyihao's avatar
dengyihao 已提交
240
    taosArrayPush(iRslts, (void*)&trslt);
241
  }
dengyihao's avatar
dengyihao 已提交
242 243
  idxMergeFinalResults(iRslts, opera, result);
  idxInterRsltDestroy(iRslts);
dengyihao's avatar
dengyihao 已提交
244
  return 0;
dengyihao's avatar
dengyihao 已提交
245 246
}

247
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
dengyihao's avatar
dengyihao 已提交
248
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
dengyihao's avatar
dengyihao 已提交
249

dengyihao's avatar
dengyihao 已提交
250 251 252 253 254 255
SIndexOpts* indexOptsCreate(int32_t cacheSize) {
  SIndexOpts* opts = taosMemoryCalloc(1, sizeof(SIndexOpts));
  opts->cacheSize = cacheSize;
  return opts;
}
void indexOptsDestroy(SIndexOpts* opts) { return taosMemoryFree(opts); }
dengyihao's avatar
dengyihao 已提交
256 257 258 259
/*
 * @param: oper
 *
 */
dengyihao's avatar
dengyihao 已提交
260
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
dengyihao's avatar
dengyihao 已提交
261 262
  SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
  if (mtq == NULL) {
dengyihao's avatar
dengyihao 已提交
263 264
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
265 266 267
  mtq->opera = opera;
  mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
  return mtq;
dengyihao's avatar
dengyihao 已提交
268
}
dengyihao's avatar
dengyihao 已提交
269
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
270
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
271
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
272
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
273
  }
274
  taosArrayDestroy(pQuery->query);
wafwerar's avatar
wafwerar 已提交
275
  taosMemoryFree(pQuery);
dengyihao's avatar
dengyihao 已提交
276
};
dengyihao's avatar
dengyihao 已提交
277
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
278
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
279 280
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
281 282
}

dengyihao's avatar
dengyihao 已提交
283 284
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
                            int32_t nColName, const char* colVal, int32_t nColVal) {
dengyihao's avatar
dengyihao 已提交
285 286
  SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
  if (tm == NULL) {
dengyihao's avatar
dengyihao 已提交
287 288
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
289

dengyihao's avatar
dengyihao 已提交
290 291 292
  tm->suid = suid;
  tm->operType = oper;
  tm->colType = colType;
dengyihao's avatar
dengyihao 已提交
293

dengyihao's avatar
dengyihao 已提交
294 295 296 297 298
  tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
  memcpy(tm->colName, colName, nColName);
  tm->nColName = nColName;

  char*   buf = NULL;
dengyihao's avatar
dengyihao 已提交
299 300 301 302 303 304 305
  int32_t len = 0;
  if (colVal != NULL && nColVal != 0) {
    len = idxConvertDataToStr((void*)colVal, IDX_TYPE_GET_TYPE(colType), (void**)&buf);
  } else if (colVal == NULL) {
    buf = strndup(INDEX_DATA_NULL_STR, (int32_t)strlen(INDEX_DATA_NULL_STR));
    len = (int32_t)strlen(INDEX_DATA_NULL_STR);
  } else {
dengyihao's avatar
dengyihao 已提交
306
    static const char* emptyStr = " ";
dengyihao's avatar
dengyihao 已提交
307 308 309
    buf = strndup(emptyStr, (int32_t)strlen(emptyStr));
    len = (int32_t)strlen(emptyStr);
  }
dengyihao's avatar
dengyihao 已提交
310 311 312
  tm->colVal = buf;
  tm->nColVal = len;

dengyihao's avatar
dengyihao 已提交
313
  return tm;
dengyihao's avatar
dengyihao 已提交
314
}
dengyihao's avatar
dengyihao 已提交
315

dengyihao's avatar
dengyihao 已提交
316
void indexTermDestroy(SIndexTerm* p) {
wafwerar's avatar
wafwerar 已提交
317 318 319
  taosMemoryFree(p->colName);
  taosMemoryFree(p->colVal);
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
320 321
}

dengyihao's avatar
dengyihao 已提交
322
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
323

dengyihao's avatar
dengyihao 已提交
324
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
325 326
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
327
}
dengyihao's avatar
dengyihao 已提交
328
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
329
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
330
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
331 332
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
333
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
334
}
dengyihao's avatar
dengyihao 已提交
335

dengyihao's avatar
dengyihao 已提交
336 337 338 339 340
/*
 * rebuild index
 */

static void idxSchedRebuildIdx(SSchedMsg* msg) {
dengyihao's avatar
dengyihao 已提交
341
  // TODO, no need rebuild index
dengyihao's avatar
dengyihao 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
  SIndex* idx = msg->ahandle;

  int8_t st = kFinished;
  atomic_store_8(&idx->status, st);
  idxReleaseRef(idx->refId);
}
void indexRebuild(SIndexJson* idx, void* iter) {
  // set up rebuild status
  int8_t st = kRebuild;
  atomic_store_8(&idx->status, st);

  // task put into BG thread
  SSchedMsg schedMsg = {0};
  schedMsg.fp = idxSchedRebuildIdx;
  schedMsg.ahandle = idx;
  idxAcquireRef(idx->refId);
  taosScheduleTask(indexQhandle, &schedMsg);
}

/*
 * check index json status
 **/
bool indexIsRebuild(SIndex* idx) {
  // idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}
/*
 * rebuild index
 */
void indexJsonRebuild(SIndexJson* idx, void* iter) {
  // idx rebuild or not
  indexRebuild(idx, iter);
}

/*
 * check index json status
 **/
bool indexJsonIsRebuild(SIndexJson* idx) {
  // load idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}

dengyihao's avatar
dengyihao 已提交
384
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
dengyihao's avatar
dengyihao 已提交
385 386
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
387
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
388

dengyihao's avatar
dengyihao 已提交
389 390
  // Get col info
  IndexCache* cache = NULL;
dengyihao's avatar
dengyihao 已提交
391 392

  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
393 394
  ICacheKey key = {
      .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
S
Shengliang Guan 已提交
395
  indexDebug("r suid:%" PRIu64 ", colName:%s, colType:%d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
396
  int32_t sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
397

wafwerar's avatar
wafwerar 已提交
398
  taosThreadMutexLock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
399
  IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
400
  cache = (pCache == NULL) ? NULL : *pCache;
wafwerar's avatar
wafwerar 已提交
401
  taosThreadMutexUnlock(&sIdx->mtx);
402

dengyihao's avatar
dengyihao 已提交
403
  *result = taosArrayInit(4, sizeof(uint64_t));
404
  // TODO: iterator mem and tidex
dengyihao's avatar
dengyihao 已提交
405
  STermValueType s = kTypeValue;
dengyihao's avatar
dengyihao 已提交
406

dengyihao's avatar
add UT  
dengyihao 已提交
407 408
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
409
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
410
  if (0 == idxCacheSearch(cache, query, tr, &s)) {
dengyihao's avatar
dengyihao 已提交
411
    if (s == kTypeDeletion) {
dengyihao's avatar
dengyihao 已提交
412
      indexInfo("col: %s already drop by", term->colName);
413
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
414 415
      return 0;
    } else {
dengyihao's avatar
add UT  
dengyihao 已提交
416
      st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
417
      if (0 != idxTFileSearch(sIdx->tindex, query, tr)) {
418
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
419
        goto END;
420
      }
dengyihao's avatar
add UT  
dengyihao 已提交
421 422
      int64_t tfCost = taosGetTimestampUs() - st;
      indexInfo("tfile search cost: %" PRIu64 "us", tfCost);
dengyihao's avatar
dengyihao 已提交
423 424 425
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
426
    goto END;
dengyihao's avatar
dengyihao 已提交
427
  }
dengyihao's avatar
add UT  
dengyihao 已提交
428 429
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("search cost: %" PRIu64 "us", cost);
dengyihao's avatar
dengyihao 已提交
430

dengyihao's avatar
dengyihao 已提交
431
  idxTRsltMergeTo(tr, *result);
dengyihao's avatar
add UT  
dengyihao 已提交
432

dengyihao's avatar
dengyihao 已提交
433
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
434
  return 0;
dengyihao's avatar
dengyihao 已提交
435
END:
dengyihao's avatar
dengyihao 已提交
436
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
437
  return -1;
dengyihao's avatar
dengyihao 已提交
438
}
dengyihao's avatar
dengyihao 已提交
439
static void idxInterRsltDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
440 441 442
  if (results == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
443 444 445

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
446
    SArray* p = taosArrayGetP(results, i);
447 448
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
449 450
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
451

dengyihao's avatar
dengyihao 已提交
452
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
453
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
454 455
  for (int i = 0; i < taosArrayGetSize(in); i--) {
    SArray* t = taosArrayGetP(in, i);
dengyihao's avatar
dengyihao 已提交
456 457 458
    taosArraySort(t, uidCompare);
    taosArrayRemoveDuplicate(t, uidCompare, NULL);
  }
dengyihao's avatar
dengyihao 已提交
459

dengyihao's avatar
dengyihao 已提交
460
  if (oType == MUST) {
dengyihao's avatar
dengyihao 已提交
461
    iIntersection(in, out);
dengyihao's avatar
dengyihao 已提交
462
  } else if (oType == SHOULD) {
dengyihao's avatar
dengyihao 已提交
463
    iUnion(in, out);
dengyihao's avatar
dengyihao 已提交
464
  } else if (oType == NOT) {
465
    // just one column index, enhance later
dengyihao's avatar
dengyihao 已提交
466
    // taosArrayAddAll(fResults, interResults);
467
    // not use currently
dengyihao's avatar
dengyihao 已提交
468 469 470
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
471

dengyihao's avatar
dengyihao 已提交
472
static void idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
473 474 475 476
  int32_t sz = taosArrayGetSize(result);
  if (sz > 0) {
    TFileValue* lv = taosArrayGetP(result, sz - 1);
    if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
dengyihao's avatar
dengyihao 已提交
477 478
      idxTRsltMergeTo(tr, lv->tableId);
      idxTRsltClear(tr);
479 480 481

      taosArrayPush(result, &tfv);
    } else if (tfv == NULL) {
dengyihao's avatar
dengyihao 已提交
482
      // handle last iterator
dengyihao's avatar
dengyihao 已提交
483
      idxTRsltMergeTo(tr, lv->tableId);
484 485 486 487 488 489 490
    } else {
      tfileValueDestroy(tfv);
    }
  } else {
    taosArrayPush(result, &tfv);
  }
}
dengyihao's avatar
dengyihao 已提交
491
static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
492
  char*       colVal = (cv != NULL) ? cv->colVal : tv->colVal;
dengyihao's avatar
dengyihao 已提交
493
  TFileValue* tfv = tfileValueCreate(colVal);
494

dengyihao's avatar
dengyihao 已提交
495
  idxMayMergeTempToFinalRslt(result, tfv, tr);
496

dengyihao's avatar
dengyihao 已提交
497
  if (cv != NULL) {
498
    uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
dengyihao's avatar
dengyihao 已提交
499
    uint32_t ver = cv->ver;
dengyihao's avatar
dengyihao 已提交
500
    if (cv->type == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
501
      INDEX_MERGE_ADD_DEL(tr->del, tr->add, id)
dengyihao's avatar
dengyihao 已提交
502
    } else if (cv->type == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
503
      INDEX_MERGE_ADD_DEL(tr->add, tr->del, id)
dengyihao's avatar
dengyihao 已提交
504 505 506
    }
  }
  if (tv != NULL) {
dengyihao's avatar
dengyihao 已提交
507
    taosArrayAddAll(tr->total, tv->val);
dengyihao's avatar
dengyihao 已提交
508 509
  }
}
dengyihao's avatar
dengyihao 已提交
510
static void idxDestroyFinalRslt(SArray* result) {
dengyihao's avatar
dengyihao 已提交
511 512 513 514 515 516 517
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  for (size_t i = 0; i < sz; i++) {
    TFileValue* tv = taosArrayGetP(result, i);
    tfileValueDestroy(tv);
  }
  taosArrayDestroy(result);
}
518

dengyihao's avatar
dengyihao 已提交
519
int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
dengyihao's avatar
dengyihao 已提交
520 521 522
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
523
  indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
524

dengyihao's avatar
dengyihao 已提交
525 526
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
527 528
  IndexCache* pCache = (IndexCache*)cache;

dengyihao's avatar
dengyihao 已提交
529 530
  while (quit && atomic_load_32(&pCache->merging) == 1)
    ;
dengyihao's avatar
dengyihao 已提交
531
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
dengyihao's avatar
dengyihao 已提交
532 533 534
  if (pReader == NULL) {
    indexWarn("empty tfile reader found");
  }
dengyihao's avatar
dengyihao 已提交
535
  // handle flush
dengyihao's avatar
dengyihao 已提交
536
  Iterate* cacheIter = idxCacheIteratorCreate(pCache);
dengyihao's avatar
dengyihao 已提交
537 538
  if (cacheIter == NULL) {
    indexError("%p immtable is empty, ignore merge opera", pCache);
dengyihao's avatar
dengyihao 已提交
539
    idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
540
    tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
541 542
    atomic_store_32(&pCache->merging, 0);
    if (quit) {
dengyihao's avatar
dengyihao 已提交
543
      idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
544
    }
dengyihao's avatar
dengyihao 已提交
545
    idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
546 547 548
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
549
  Iterate* tfileIter = tfileIteratorCreate(pReader);
dengyihao's avatar
dengyihao 已提交
550 551 552
  if (tfileIter == NULL) {
    indexWarn("empty tfile reader iterator");
  }
dengyihao's avatar
dengyihao 已提交
553 554 555

  SArray* result = taosArrayInit(1024, sizeof(void*));

dengyihao's avatar
dengyihao 已提交
556 557
  bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
  bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
558

dengyihao's avatar
dengyihao 已提交
559
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
560 561 562 563 564 565 566 567 568 569 570 571
  while (cn == true || tn == true) {
    IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
    IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;

    int comp = 0;
    if (cn == true && tn == true) {
      comp = strcmp(cv->colVal, tv->colVal);
    } else if (cn == true) {
      comp = -1;
    } else {
      comp = 1;
    }
dengyihao's avatar
dengyihao 已提交
572
    if (comp == 0) {
dengyihao's avatar
dengyihao 已提交
573
      idxMergeCacheAndTFile(result, cv, tv, tr);
dengyihao's avatar
dengyihao 已提交
574 575 576
      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
    } else if (comp < 0) {
dengyihao's avatar
dengyihao 已提交
577
      idxMergeCacheAndTFile(result, cv, NULL, tr);
dengyihao's avatar
dengyihao 已提交
578 579
      cn = cacheIter->next(cacheIter);
    } else {
dengyihao's avatar
dengyihao 已提交
580
      idxMergeCacheAndTFile(result, NULL, tv, tr);
dengyihao's avatar
dengyihao 已提交
581 582 583
      tn = tfileIter->next(tfileIter);
    }
  }
dengyihao's avatar
dengyihao 已提交
584
  idxMayMergeTempToFinalRslt(result, NULL, tr);
dengyihao's avatar
dengyihao 已提交
585
  idxTRsltDestroy(tr);
586

dengyihao's avatar
dengyihao 已提交
587
  int ret = idxGenTFile(sIdx, pCache, result);
dengyihao's avatar
dengyihao 已提交
588
  if (ret != 0) {
dengyihao's avatar
dengyihao 已提交
589
    indexError("failed to merge");
dengyihao's avatar
dengyihao 已提交
590 591 592 593
  } else {
    int64_t cost = taosGetTimestampUs() - st;
    indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
  }
dengyihao's avatar
dengyihao 已提交
594
  idxDestroyFinalRslt(result);
dengyihao's avatar
dengyihao 已提交
595

dengyihao's avatar
dengyihao 已提交
596
  idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
597

dengyihao's avatar
dengyihao 已提交
598
  idxCacheIteratorDestroy(cacheIter);
dengyihao's avatar
dengyihao 已提交
599 600
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
601
  tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
602
  idxCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
603

dengyihao's avatar
dengyihao 已提交
604 605
  atomic_store_32(&pCache->merging, 0);
  if (quit) {
dengyihao's avatar
dengyihao 已提交
606
    idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
607
  }
dengyihao's avatar
dengyihao 已提交
608
  idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
609

dengyihao's avatar
dengyihao 已提交
610
  return ret;
dengyihao's avatar
dengyihao 已提交
611
}
dengyihao's avatar
dengyihao 已提交
612 613 614
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
dengyihao's avatar
dengyihao 已提交
615
    value->val = NULL;
dengyihao's avatar
dengyihao 已提交
616
  } else {
dengyihao's avatar
dengyihao 已提交
617 618 619
    if (value->val != NULL) {
      taosArrayClear(value->val);
    }
dengyihao's avatar
dengyihao 已提交
620
  }
wafwerar's avatar
wafwerar 已提交
621
  taosMemoryFree(value->colVal);
dengyihao's avatar
dengyihao 已提交
622 623
  value->colVal = NULL;
}
624

dengyihao's avatar
dengyihao 已提交
625
static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) {
626 627
  ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
  int64_t   ver = CACHE_VERSION(cache);
dengyihao's avatar
dengyihao 已提交
628

dengyihao's avatar
dengyihao 已提交
629
  IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
630 631

  taosThreadMutexLock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
632
  TFileReader* rd = tfileCacheGet(tf->cache, &key);
dengyihao's avatar
dengyihao 已提交
633 634 635
  taosThreadMutexUnlock(&tf->mtx);

  if (rd != NULL) {
dengyihao's avatar
dengyihao 已提交
636
    ver = (ver > rd->header.version ? ver : rd->header.version) + 1;
dengyihao's avatar
dengyihao 已提交
637
    indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver);
638
  }
dengyihao's avatar
dengyihao 已提交
639
  tfileReaderUnRef(rd);
640 641
  return ver;
}
dengyihao's avatar
dengyihao 已提交
642
static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
dengyihao's avatar
dengyihao 已提交
643
  int64_t version = idxGetAvailableVer(sIdx, cache);
644
  indexInfo("file name version: %" PRId64 "", version);
dengyihao's avatar
dengyihao 已提交
645 646
  uint8_t colType = cache->type;

dengyihao's avatar
dengyihao 已提交
647
  TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
dengyihao's avatar
dengyihao 已提交
648 649 650 651 652 653 654 655 656 657 658 659
  if (tw == NULL) {
    indexError("failed to open file to write");
    return -1;
  }

  int ret = tfileWriterPut(tw, batch, true);
  if (ret != 0) {
    indexError("failed to write into tindex ");
    goto END;
  }
  tfileWriterClose(tw);

dengyihao's avatar
dengyihao 已提交
660
  TFileReader* reader = tfileReaderOpen(sIdx, cache->suid, version, cache->colName);
dengyihao's avatar
dengyihao 已提交
661 662 663
  if (reader == NULL) {
    return -1;
  }
664
  indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
665

dengyihao's avatar
dengyihao 已提交
666 667
  IndexTFile* tf = (IndexTFile*)sIdx->tindex;

dengyihao's avatar
dengyihao 已提交
668
  TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
669
  ICacheKey    key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
670

dengyihao's avatar
dengyihao 已提交
671 672 673
  taosThreadMutexLock(&tf->mtx);
  tfileCachePut(tf->cache, &key, reader);
  taosThreadMutexUnlock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
674

dengyihao's avatar
dengyihao 已提交
675 676
  return ret;
END:
dengyihao's avatar
dengyihao 已提交
677
  if (tw != NULL) {
dengyihao's avatar
dengyihao 已提交
678
    idxFileCtxDestroy(tw->ctx, true);
wafwerar's avatar
wafwerar 已提交
679
    taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
680
  }
dengyihao's avatar
dengyihao 已提交
681
  return -1;
dengyihao's avatar
dengyihao 已提交
682
}
dengyihao's avatar
dengyihao 已提交
683

dengyihao's avatar
dengyihao 已提交
684
int32_t idxSerialCacheKey(ICacheKey* key, char* buf) {
dengyihao's avatar
dengyihao 已提交
685
  bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
686

dengyihao's avatar
dengyihao 已提交
687
  char* p = buf;
dengyihao's avatar
dengyihao 已提交
688
  char  tbuf[65] = {0};
dengyihao's avatar
dengyihao 已提交
689
  idxInt2str((int64_t)key->suid, tbuf, 0);
dengyihao's avatar
dengyihao 已提交
690 691

  SERIALIZE_STR_VAR_TO_BUF(buf, tbuf, strlen(tbuf));
dengyihao's avatar
dengyihao 已提交
692
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
693 694 695 696 697
  if (hasJson) {
    SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
  } else {
    SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
  }
dengyihao's avatar
dengyihao 已提交
698 699
  return buf - p;
}