index.c 19.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
dengyihao's avatar
dengyihao 已提交
5 6
 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
 * Software Foundation.
H
refact  
Hongze Cheng 已提交
7 8 9 10 11 12 13 14 15
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include "indexCache.h"
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
20 21
#include "indexTfile.h"
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
22 23
#include "tcoding.h"
#include "tdataformat.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
26
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
27

dengyihao's avatar
dengyihao 已提交
28 29
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
30 31
#endif

dengyihao's avatar
dengyihao 已提交
32
#define INDEX_NUM_OF_THREADS 5
dengyihao's avatar
dengyihao 已提交
33
#define INDEX_QUEUE_SIZE     200
dengyihao's avatar
dengyihao 已提交
34

dengyihao's avatar
dengyihao 已提交
35 36 37
#define INDEX_DATA_BOOL_NULL      0x02
#define INDEX_DATA_TINYINT_NULL   0x80
#define INDEX_DATA_SMALLINT_NULL  0x8000
wafwerar's avatar
wafwerar 已提交
38 39
#define INDEX_DATA_INT_NULL       0x80000000LL
#define INDEX_DATA_BIGINT_NULL    0x8000000000000000LL
dengyihao's avatar
dengyihao 已提交
40 41
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL

dengyihao's avatar
dengyihao 已提交
42
#define INDEX_DATA_FLOAT_NULL    0x7FF00000            // it is an NAN
wafwerar's avatar
wafwerar 已提交
43
#define INDEX_DATA_DOUBLE_NULL   0x7FFFFF0000000000LL  // an NAN
dengyihao's avatar
dengyihao 已提交
44 45 46 47
#define INDEX_DATA_NCHAR_NULL    0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL   0xFF
#define INDEX_DATA_JSON_NULL     0xFFFFFFFF
#define INDEX_DATA_JSON_null     0xFFFFFFFE
dengyihao's avatar
dengyihao 已提交
48 49
#define INDEX_DATA_JSON_NOT_NULL 0x01

dengyihao's avatar
dengyihao 已提交
50
#define INDEX_DATA_UTINYINT_NULL  0xFF
dengyihao's avatar
dengyihao 已提交
51
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
dengyihao's avatar
dengyihao 已提交
52 53
#define INDEX_DATA_UINT_NULL      0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL   0xFFFFFFFFFFFFFFFFL
dengyihao's avatar
dengyihao 已提交
54

dengyihao's avatar
dengyihao 已提交
55
#define INDEX_DATA_NULL_STR   "NULL"
dengyihao's avatar
dengyihao 已提交
56 57
#define INDEX_DATA_NULL_STR_L "null"

dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
void*   indexQhandle = NULL;
int32_t indexRefMgt;

static void indexDestroy(void* sIdx);

dengyihao's avatar
dengyihao 已提交
63 64
void indexInit() {
  // refactor later
dengyihao's avatar
dengyihao 已提交
65
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
dengyihao's avatar
dengyihao 已提交
66
  indexRefMgt = taosOpenRef(1000, indexDestroy);
dengyihao's avatar
dengyihao 已提交
67
}
dengyihao's avatar
dengyihao 已提交
68
void indexCleanup() {
dengyihao's avatar
dengyihao 已提交
69 70
  // refacto later
  taosCleanUpScheduler(indexQhandle);
dengyihao's avatar
dengyihao 已提交
71
  taosCloseRef(indexRefMgt);
dengyihao's avatar
dengyihao 已提交
72
}
dengyihao's avatar
dengyihao 已提交
73

dengyihao's avatar
dengyihao 已提交
74
typedef struct SIdxColInfo {
75
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
76
  int cVersion;
77
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
78

wafwerar's avatar
wafwerar 已提交
79
static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
80
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
81
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
82

dengyihao's avatar
dengyihao 已提交
83 84
static void idxInterRsltDestroy(SArray* results);
static int  idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
dengyihao's avatar
dengyihao 已提交
85

dengyihao's avatar
dengyihao 已提交
86
static int idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
dengyihao's avatar
dengyihao 已提交
87

dengyihao's avatar
dengyihao 已提交
88
// merge cache and tfile by opera type
dengyihao's avatar
dengyihao 已提交
89
static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91 92
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t        indexSerialKey(ICacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
93

dengyihao's avatar
dengyihao 已提交
94
static void idxPost(void* idx) {
dengyihao's avatar
dengyihao 已提交
95 96 97 98 99 100 101 102
  SIndex* pIdx = idx;
  tsem_post(&pIdx->sem);
}
static void indexWait(void* idx) {
  SIndex* pIdx = idx;
  tsem_wait(&pIdx->sem);
}

dengyihao's avatar
dengyihao 已提交
103
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
dengyihao's avatar
dengyihao 已提交
104
  int ret = TSDB_CODE_SUCCESS;
wafwerar's avatar
wafwerar 已提交
105
  taosThreadOnce(&isInit, indexInit);
wafwerar's avatar
wafwerar 已提交
106
  SIndex* sIdx = taosMemoryCalloc(1, sizeof(SIndex));
dengyihao's avatar
dengyihao 已提交
107
  if (sIdx == NULL) {
dengyihao's avatar
dengyihao 已提交
108
    return TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
109
  }
dengyihao's avatar
dengyihao 已提交
110

dengyihao's avatar
dengyihao 已提交
111
  sIdx->tindex = idxTFileCreate(path);
dengyihao's avatar
dengyihao 已提交
112
  if (sIdx->tindex == NULL) {
dengyihao's avatar
dengyihao 已提交
113
    ret = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
114 115
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
116

117 118
  sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  sIdx->cVersion = 1;
dengyihao's avatar
dengyihao 已提交
119
  sIdx->path = tstrdup(path);
wafwerar's avatar
wafwerar 已提交
120
  taosThreadMutexInit(&sIdx->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
121
  tsem_init(&sIdx->sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
122

dengyihao's avatar
dengyihao 已提交
123 124
  sIdx->refId = idxAddRef(sIdx);
  idxAcquireRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
125

126
  *index = sIdx;
dengyihao's avatar
dengyihao 已提交
127
  return ret;
dengyihao's avatar
dengyihao 已提交
128

dengyihao's avatar
dengyihao 已提交
129
END:
dengyihao's avatar
dengyihao 已提交
130 131 132
  if (sIdx != NULL) {
    indexClose(sIdx);
  }
dengyihao's avatar
dengyihao 已提交
133
  *index = NULL;
dengyihao's avatar
dengyihao 已提交
134
  return ret;
H
refact  
Hongze Cheng 已提交
135
}
dengyihao's avatar
dengyihao 已提交
136

dengyihao's avatar
dengyihao 已提交
137 138
void indexDestroy(void* handle) {
  SIndex* sIdx = handle;
dengyihao's avatar
dengyihao 已提交
139
  taosThreadMutexDestroy(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
140
  tsem_destroy(&sIdx->sem);
dengyihao's avatar
dengyihao 已提交
141
  idxTFileDestroy(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
142 143 144 145 146 147
  taosMemoryFree(sIdx->path);
  taosMemoryFree(sIdx);
  return;
}
void indexClose(SIndex* sIdx) {
  bool ref = 0;
dengyihao's avatar
dengyihao 已提交
148 149 150 151
  if (sIdx->colObj != NULL) {
    void* iter = taosHashIterate(sIdx->colObj, NULL);
    while (iter) {
      IndexCache** pCache = iter;
dengyihao's avatar
dengyihao 已提交
152
      idxCacheForceToMerge((void*)(*pCache));
153
      indexInfo("%s wait to merge", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
154
      indexWait((void*)(sIdx));
dengyihao's avatar
dengyihao 已提交
155
      indexInfo("%s finish to wait", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
156
      iter = taosHashIterate(sIdx->colObj, iter);
dengyihao's avatar
dengyihao 已提交
157
      idxCacheUnRef(*pCache);
dengyihao's avatar
dengyihao 已提交
158
    }
dengyihao's avatar
dengyihao 已提交
159 160
    taosHashCleanup(sIdx->colObj);
    sIdx->colObj = NULL;
dengyihao's avatar
dengyihao 已提交
161
  }
dengyihao's avatar
dengyihao 已提交
162 163
  idxReleaseRef(sIdx->refId);
  idxRemoveRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
164
}
dengyihao's avatar
dengyihao 已提交
165
int64_t idxAddRef(void* p) {
dengyihao's avatar
dengyihao 已提交
166 167 168
  // impl
  return taosAddRef(indexRefMgt, p);
}
dengyihao's avatar
dengyihao 已提交
169
int32_t idxRemoveRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
170 171 172 173
  // impl later
  return taosRemoveRef(indexRefMgt, ref);
}

dengyihao's avatar
dengyihao 已提交
174
void idxAcquireRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
175 176 177
  // impl
  taosAcquireRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
178
void idxReleaseRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
179 180 181
  // impl
  taosReleaseRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
182

dengyihao's avatar
dengyihao 已提交
183
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
184
  // TODO(yihao): reduce the lock range
wafwerar's avatar
wafwerar 已提交
185
  taosThreadMutexLock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
186
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
187 188
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
189 190
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
191
    int32_t   sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
192 193

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
194
    if (cache == NULL) {
dengyihao's avatar
dengyihao 已提交
195
      IndexCache* pCache = idxCacheCreate(index, p->suid, p->colName, p->colType);
dengyihao's avatar
dengyihao 已提交
196
      taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
197
    }
198
  }
dengyihao's avatar
dengyihao 已提交
199
  taosThreadMutexUnlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
200 201

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
202 203
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
204 205
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
206
    int32_t   sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
207
    indexDebug("w suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
208 209

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
210
    assert(*cache != NULL);
dengyihao's avatar
dengyihao 已提交
211
    int ret = idxCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
212 213 214
    if (ret != 0) {
      return ret;
    }
dengyihao's avatar
dengyihao 已提交
215
  }
dengyihao's avatar
dengyihao 已提交
216
  return 0;
dengyihao's avatar
dengyihao 已提交
217
}
dengyihao's avatar
dengyihao 已提交
218
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
219 220
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
221
  SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
222
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
223
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
224 225
    SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
    SArray*          trslt = NULL;
dengyihao's avatar
dengyihao 已提交
226
    idxTermSearch(index, qterm, &trslt);
dengyihao's avatar
dengyihao 已提交
227
    taosArrayPush(iRslts, (void*)&trslt);
228
  }
dengyihao's avatar
dengyihao 已提交
229 230
  idxMergeFinalResults(iRslts, opera, result);
  idxInterRsltDestroy(iRslts);
dengyihao's avatar
dengyihao 已提交
231
  return 0;
dengyihao's avatar
dengyihao 已提交
232 233
}

234
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
dengyihao's avatar
dengyihao 已提交
235
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
dengyihao's avatar
dengyihao 已提交
236

237 238
SIndexOpts* indexOptsCreate() { return NULL; }
void        indexOptsDestroy(SIndexOpts* opts) { return; }
dengyihao's avatar
dengyihao 已提交
239 240 241 242
/*
 * @param: oper
 *
 */
dengyihao's avatar
dengyihao 已提交
243
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
dengyihao's avatar
dengyihao 已提交
244 245
  SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
  if (mtq == NULL) {
dengyihao's avatar
dengyihao 已提交
246 247
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
248 249 250
  mtq->opera = opera;
  mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
  return mtq;
dengyihao's avatar
dengyihao 已提交
251
}
dengyihao's avatar
dengyihao 已提交
252
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
253
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
254
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
255
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
256
  }
257
  taosArrayDestroy(pQuery->query);
wafwerar's avatar
wafwerar 已提交
258
  taosMemoryFree(pQuery);
dengyihao's avatar
dengyihao 已提交
259
};
dengyihao's avatar
dengyihao 已提交
260
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
261
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
262 263
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
264 265
}

dengyihao's avatar
dengyihao 已提交
266 267
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
                            int32_t nColName, const char* colVal, int32_t nColVal) {
dengyihao's avatar
dengyihao 已提交
268 269
  SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
  if (tm == NULL) {
dengyihao's avatar
dengyihao 已提交
270 271
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
272

dengyihao's avatar
dengyihao 已提交
273 274 275
  tm->suid = suid;
  tm->operType = oper;
  tm->colType = colType;
dengyihao's avatar
dengyihao 已提交
276

dengyihao's avatar
dengyihao 已提交
277 278 279 280 281
  tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
  memcpy(tm->colName, colName, nColName);
  tm->nColName = nColName;

  char*   buf = NULL;
dengyihao's avatar
dengyihao 已提交
282 283 284 285 286 287 288 289 290 291 292
  int32_t len = 0;
  if (colVal != NULL && nColVal != 0) {
    len = idxConvertDataToStr((void*)colVal, IDX_TYPE_GET_TYPE(colType), (void**)&buf);
  } else if (colVal == NULL) {
    buf = strndup(INDEX_DATA_NULL_STR, (int32_t)strlen(INDEX_DATA_NULL_STR));
    len = (int32_t)strlen(INDEX_DATA_NULL_STR);
  } else {
    const char* emptyStr = " ";
    buf = strndup(emptyStr, (int32_t)strlen(emptyStr));
    len = (int32_t)strlen(emptyStr);
  }
dengyihao's avatar
dengyihao 已提交
293 294 295
  tm->colVal = buf;
  tm->nColVal = len;

dengyihao's avatar
dengyihao 已提交
296
  return tm;
dengyihao's avatar
dengyihao 已提交
297
}
dengyihao's avatar
dengyihao 已提交
298

dengyihao's avatar
dengyihao 已提交
299
void indexTermDestroy(SIndexTerm* p) {
wafwerar's avatar
wafwerar 已提交
300 301 302
  taosMemoryFree(p->colName);
  taosMemoryFree(p->colVal);
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
303 304
}

dengyihao's avatar
dengyihao 已提交
305
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
306

dengyihao's avatar
dengyihao 已提交
307
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
308 309
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
310
}
dengyihao's avatar
dengyihao 已提交
311
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
312
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
313
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
314 315
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
316
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
317
}
dengyihao's avatar
dengyihao 已提交
318

dengyihao's avatar
dengyihao 已提交
319 320 321 322 323
/*
 * rebuild index
 */

static void idxSchedRebuildIdx(SSchedMsg* msg) {
dengyihao's avatar
dengyihao 已提交
324
  // TODO, no need rebuild index
dengyihao's avatar
dengyihao 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
  SIndex* idx = msg->ahandle;

  int8_t st = kFinished;
  atomic_store_8(&idx->status, st);
  idxReleaseRef(idx->refId);
}
void indexRebuild(SIndexJson* idx, void* iter) {
  // set up rebuild status
  int8_t st = kRebuild;
  atomic_store_8(&idx->status, st);

  // task put into BG thread
  SSchedMsg schedMsg = {0};
  schedMsg.fp = idxSchedRebuildIdx;
  schedMsg.ahandle = idx;
  idxAcquireRef(idx->refId);
  taosScheduleTask(indexQhandle, &schedMsg);
}

/*
 * check index json status
 **/
bool indexIsRebuild(SIndex* idx) {
  // idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}
/*
 * rebuild index
 */
void indexJsonRebuild(SIndexJson* idx, void* iter) {
  // idx rebuild or not
  indexRebuild(idx, iter);
}

/*
 * check index json status
 **/
bool indexJsonIsRebuild(SIndexJson* idx) {
  // load idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}

dengyihao's avatar
dengyihao 已提交
367
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
dengyihao's avatar
dengyihao 已提交
368 369
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
370
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
371

dengyihao's avatar
dengyihao 已提交
372 373
  // Get col info
  IndexCache* cache = NULL;
dengyihao's avatar
dengyihao 已提交
374 375

  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
376 377
  ICacheKey key = {
      .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
dengyihao's avatar
dengyihao 已提交
378
  indexDebug("r suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
379
  int32_t sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
380

wafwerar's avatar
wafwerar 已提交
381
  taosThreadMutexLock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
382
  IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
383
  cache = (pCache == NULL) ? NULL : *pCache;
wafwerar's avatar
wafwerar 已提交
384
  taosThreadMutexUnlock(&sIdx->mtx);
385

dengyihao's avatar
dengyihao 已提交
386
  *result = taosArrayInit(4, sizeof(uint64_t));
387
  // TODO: iterator mem and tidex
dengyihao's avatar
dengyihao 已提交
388
  STermValueType s = kTypeValue;
dengyihao's avatar
dengyihao 已提交
389

dengyihao's avatar
add UT  
dengyihao 已提交
390 391
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
392
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
393
  if (0 == idxCacheSearch(cache, query, tr, &s)) {
dengyihao's avatar
dengyihao 已提交
394
    if (s == kTypeDeletion) {
dengyihao's avatar
dengyihao 已提交
395
      indexInfo("col: %s already drop by", term->colName);
396
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
397 398
      return 0;
    } else {
dengyihao's avatar
add UT  
dengyihao 已提交
399
      st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
400
      if (0 != idxTFileSearch(sIdx->tindex, query, tr)) {
401
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
402
        goto END;
403
      }
dengyihao's avatar
add UT  
dengyihao 已提交
404 405
      int64_t tfCost = taosGetTimestampUs() - st;
      indexInfo("tfile search cost: %" PRIu64 "us", tfCost);
dengyihao's avatar
dengyihao 已提交
406 407 408
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
409
    goto END;
dengyihao's avatar
dengyihao 已提交
410
  }
dengyihao's avatar
add UT  
dengyihao 已提交
411 412
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("search cost: %" PRIu64 "us", cost);
dengyihao's avatar
dengyihao 已提交
413

dengyihao's avatar
dengyihao 已提交
414
  idxTRsltMergeTo(tr, *result);
dengyihao's avatar
add UT  
dengyihao 已提交
415

dengyihao's avatar
dengyihao 已提交
416
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
417
  return 0;
dengyihao's avatar
dengyihao 已提交
418
END:
dengyihao's avatar
dengyihao 已提交
419
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
420
  return -1;
dengyihao's avatar
dengyihao 已提交
421
}
dengyihao's avatar
dengyihao 已提交
422
static void idxInterRsltDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
423 424 425
  if (results == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
426 427 428

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
429
    SArray* p = taosArrayGetP(results, i);
430 431
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
432 433
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
434

dengyihao's avatar
dengyihao 已提交
435
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
436
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
437 438
  for (int i = 0; i < taosArrayGetSize(in); i--) {
    SArray* t = taosArrayGetP(in, i);
dengyihao's avatar
dengyihao 已提交
439 440 441
    taosArraySort(t, uidCompare);
    taosArrayRemoveDuplicate(t, uidCompare, NULL);
  }
dengyihao's avatar
dengyihao 已提交
442

dengyihao's avatar
dengyihao 已提交
443
  if (oType == MUST) {
dengyihao's avatar
dengyihao 已提交
444
    iIntersection(in, out);
dengyihao's avatar
dengyihao 已提交
445
  } else if (oType == SHOULD) {
dengyihao's avatar
dengyihao 已提交
446
    iUnion(in, out);
dengyihao's avatar
dengyihao 已提交
447
  } else if (oType == NOT) {
448
    // just one column index, enhance later
dengyihao's avatar
dengyihao 已提交
449
    // taosArrayAddAll(fResults, interResults);
450
    // not use currently
dengyihao's avatar
dengyihao 已提交
451 452 453
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
454

dengyihao's avatar
dengyihao 已提交
455
static void idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
456 457 458 459
  int32_t sz = taosArrayGetSize(result);
  if (sz > 0) {
    TFileValue* lv = taosArrayGetP(result, sz - 1);
    if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
dengyihao's avatar
dengyihao 已提交
460 461
      idxTRsltMergeTo(tr, lv->tableId);
      idxTRsltClear(tr);
462 463 464

      taosArrayPush(result, &tfv);
    } else if (tfv == NULL) {
dengyihao's avatar
dengyihao 已提交
465
      // handle last iterator
dengyihao's avatar
dengyihao 已提交
466
      idxTRsltMergeTo(tr, lv->tableId);
467
    } else {
dengyihao's avatar
dengyihao 已提交
468
      // temp result saved in help
469 470 471 472 473 474
      tfileValueDestroy(tfv);
    }
  } else {
    taosArrayPush(result, &tfv);
  }
}
dengyihao's avatar
dengyihao 已提交
475
static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
476
  char*       colVal = (cv != NULL) ? cv->colVal : tv->colVal;
dengyihao's avatar
dengyihao 已提交
477
  TFileValue* tfv = tfileValueCreate(colVal);
478

dengyihao's avatar
dengyihao 已提交
479
  idxMayMergeTempToFinalRslt(result, tfv, tr);
480

dengyihao's avatar
dengyihao 已提交
481
  if (cv != NULL) {
482
    uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
dengyihao's avatar
dengyihao 已提交
483
    uint32_t ver = cv->ver;
dengyihao's avatar
dengyihao 已提交
484
    if (cv->type == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
485
      INDEX_MERGE_ADD_DEL(tr->del, tr->add, id)
dengyihao's avatar
dengyihao 已提交
486
    } else if (cv->type == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
487
      INDEX_MERGE_ADD_DEL(tr->add, tr->del, id)
dengyihao's avatar
dengyihao 已提交
488 489 490
    }
  }
  if (tv != NULL) {
dengyihao's avatar
dengyihao 已提交
491
    taosArrayAddAll(tr->total, tv->val);
dengyihao's avatar
dengyihao 已提交
492 493
  }
}
dengyihao's avatar
dengyihao 已提交
494
static void idxDestroyFinalRslt(SArray* result) {
dengyihao's avatar
dengyihao 已提交
495 496 497 498 499 500 501
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  for (size_t i = 0; i < sz; i++) {
    TFileValue* tv = taosArrayGetP(result, i);
    tfileValueDestroy(tv);
  }
  taosArrayDestroy(result);
}
502

dengyihao's avatar
dengyihao 已提交
503
int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
dengyihao's avatar
dengyihao 已提交
504 505 506
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
507
  indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
508

dengyihao's avatar
dengyihao 已提交
509 510
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
511 512
  IndexCache* pCache = (IndexCache*)cache;

dengyihao's avatar
dengyihao 已提交
513 514
  while (quit && atomic_load_32(&pCache->merging) == 1)
    ;
dengyihao's avatar
dengyihao 已提交
515
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
dengyihao's avatar
dengyihao 已提交
516 517 518
  if (pReader == NULL) {
    indexWarn("empty tfile reader found");
  }
dengyihao's avatar
dengyihao 已提交
519
  // handle flush
dengyihao's avatar
dengyihao 已提交
520
  Iterate* cacheIter = idxCacheIteratorCreate(pCache);
dengyihao's avatar
dengyihao 已提交
521 522
  if (cacheIter == NULL) {
    indexError("%p immtable is empty, ignore merge opera", pCache);
dengyihao's avatar
dengyihao 已提交
523
    idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
524
    tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
525 526
    atomic_store_32(&pCache->merging, 0);
    if (quit) {
dengyihao's avatar
dengyihao 已提交
527
      idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
528
    }
dengyihao's avatar
dengyihao 已提交
529
    idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
530 531 532
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
533
  Iterate* tfileIter = tfileIteratorCreate(pReader);
dengyihao's avatar
dengyihao 已提交
534 535 536
  if (tfileIter == NULL) {
    indexWarn("empty tfile reader iterator");
  }
dengyihao's avatar
dengyihao 已提交
537 538 539

  SArray* result = taosArrayInit(1024, sizeof(void*));

dengyihao's avatar
dengyihao 已提交
540 541
  bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
  bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
542

dengyihao's avatar
dengyihao 已提交
543
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
544 545 546 547 548 549 550 551 552 553 554 555
  while (cn == true || tn == true) {
    IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
    IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;

    int comp = 0;
    if (cn == true && tn == true) {
      comp = strcmp(cv->colVal, tv->colVal);
    } else if (cn == true) {
      comp = -1;
    } else {
      comp = 1;
    }
dengyihao's avatar
dengyihao 已提交
556
    if (comp == 0) {
dengyihao's avatar
dengyihao 已提交
557
      idxMergeCacheAndTFile(result, cv, tv, tr);
dengyihao's avatar
dengyihao 已提交
558 559 560
      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
    } else if (comp < 0) {
dengyihao's avatar
dengyihao 已提交
561
      idxMergeCacheAndTFile(result, cv, NULL, tr);
dengyihao's avatar
dengyihao 已提交
562 563
      cn = cacheIter->next(cacheIter);
    } else {
dengyihao's avatar
dengyihao 已提交
564
      idxMergeCacheAndTFile(result, NULL, tv, tr);
dengyihao's avatar
dengyihao 已提交
565 566 567
      tn = tfileIter->next(tfileIter);
    }
  }
dengyihao's avatar
dengyihao 已提交
568
  idxMayMergeTempToFinalRslt(result, NULL, tr);
dengyihao's avatar
dengyihao 已提交
569
  idxTRsltDestroy(tr);
570

dengyihao's avatar
dengyihao 已提交
571 572
  int ret = idxGenTFile(sIdx, pCache, result);
  idxDestroyFinalRslt(result);
dengyihao's avatar
dengyihao 已提交
573

dengyihao's avatar
dengyihao 已提交
574
  idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
575

dengyihao's avatar
dengyihao 已提交
576
  idxCacheIteratorDestroy(cacheIter);
dengyihao's avatar
dengyihao 已提交
577 578
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
579
  tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
580
  idxCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
581 582 583 584 585 586 587

  int64_t cost = taosGetTimestampUs() - st;
  if (ret != 0) {
    indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
  } else {
    indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
  }
dengyihao's avatar
dengyihao 已提交
588 589
  atomic_store_32(&pCache->merging, 0);
  if (quit) {
dengyihao's avatar
dengyihao 已提交
590
    idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
591
  }
dengyihao's avatar
dengyihao 已提交
592
  idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
593

dengyihao's avatar
dengyihao 已提交
594
  return ret;
dengyihao's avatar
dengyihao 已提交
595
}
dengyihao's avatar
dengyihao 已提交
596 597 598
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
dengyihao's avatar
dengyihao 已提交
599
    value->val = NULL;
dengyihao's avatar
dengyihao 已提交
600
  } else {
dengyihao's avatar
dengyihao 已提交
601 602 603
    if (value->val != NULL) {
      taosArrayClear(value->val);
    }
dengyihao's avatar
dengyihao 已提交
604
  }
wafwerar's avatar
wafwerar 已提交
605
  taosMemoryFree(value->colVal);
dengyihao's avatar
dengyihao 已提交
606 607
  value->colVal = NULL;
}
608

dengyihao's avatar
dengyihao 已提交
609
static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) {
610 611
  ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
  int64_t   ver = CACHE_VERSION(cache);
dengyihao's avatar
dengyihao 已提交
612

dengyihao's avatar
dengyihao 已提交
613
  IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
614 615

  taosThreadMutexLock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
616
  TFileReader* rd = tfileCacheGet(tf->cache, &key);
dengyihao's avatar
dengyihao 已提交
617 618 619
  taosThreadMutexUnlock(&tf->mtx);

  if (rd != NULL) {
dengyihao's avatar
dengyihao 已提交
620
    ver = (ver > rd->header.version ? ver : rd->header.version) + 1;
dengyihao's avatar
dengyihao 已提交
621
    indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver);
622
  }
dengyihao's avatar
dengyihao 已提交
623
  tfileReaderUnRef(rd);
624 625
  return ver;
}
dengyihao's avatar
dengyihao 已提交
626
static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
dengyihao's avatar
dengyihao 已提交
627
  int64_t version = idxGetAvailableVer(sIdx, cache);
628
  indexInfo("file name version: %" PRId64 "", version);
dengyihao's avatar
dengyihao 已提交
629 630
  uint8_t colType = cache->type;

dengyihao's avatar
dengyihao 已提交
631
  TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
dengyihao's avatar
dengyihao 已提交
632 633 634 635 636 637 638 639 640 641 642 643
  if (tw == NULL) {
    indexError("failed to open file to write");
    return -1;
  }

  int ret = tfileWriterPut(tw, batch, true);
  if (ret != 0) {
    indexError("failed to write into tindex ");
    goto END;
  }
  tfileWriterClose(tw);

dengyihao's avatar
dengyihao 已提交
644
  TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
dengyihao's avatar
dengyihao 已提交
645 646 647
  if (reader == NULL) {
    return -1;
  }
648
  indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
649

dengyihao's avatar
dengyihao 已提交
650 651
  IndexTFile* tf = (IndexTFile*)sIdx->tindex;

dengyihao's avatar
dengyihao 已提交
652
  TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
653
  ICacheKey    key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
654

dengyihao's avatar
dengyihao 已提交
655 656 657
  taosThreadMutexLock(&tf->mtx);
  tfileCachePut(tf->cache, &key, reader);
  taosThreadMutexUnlock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
658

dengyihao's avatar
dengyihao 已提交
659 660
  return ret;
END:
dengyihao's avatar
dengyihao 已提交
661
  if (tw != NULL) {
dengyihao's avatar
dengyihao 已提交
662
    idxFileCtxDestroy(tw->ctx, true);
wafwerar's avatar
wafwerar 已提交
663
    taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
664
  }
dengyihao's avatar
dengyihao 已提交
665
  return -1;
dengyihao's avatar
dengyihao 已提交
666
}
dengyihao's avatar
dengyihao 已提交
667

dengyihao's avatar
dengyihao 已提交
668
int32_t idxSerialCacheKey(ICacheKey* key, char* buf) {
dengyihao's avatar
dengyihao 已提交
669
  bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
670

dengyihao's avatar
dengyihao 已提交
671
  char* p = buf;
dengyihao's avatar
dengyihao 已提交
672
  char  tbuf[65] = {0};
dengyihao's avatar
dengyihao 已提交
673
  idxInt2str((int64_t)key->suid, tbuf, 0);
dengyihao's avatar
dengyihao 已提交
674 675

  SERIALIZE_STR_VAR_TO_BUF(buf, tbuf, strlen(tbuf));
dengyihao's avatar
dengyihao 已提交
676
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
677 678 679 680 681
  if (hasJson) {
    SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
  } else {
    SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
  }
dengyihao's avatar
dengyihao 已提交
682 683
  return buf - p;
}