index.c 19.4 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
dengyihao's avatar
dengyihao 已提交
5 6
 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
 * Software Foundation.
H
refact  
Hongze Cheng 已提交
7 8 9 10 11 12 13 14 15
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include "indexCache.h"
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
20 21
#include "indexTfile.h"
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
22 23
#include "tcoding.h"
#include "tdataformat.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
26
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
27

dengyihao's avatar
dengyihao 已提交
28 29
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
30 31
#endif

dengyihao's avatar
dengyihao 已提交
32
#define INDEX_NUM_OF_THREADS 5
dengyihao's avatar
dengyihao 已提交
33
#define INDEX_QUEUE_SIZE     200
dengyihao's avatar
dengyihao 已提交
34

dengyihao's avatar
dengyihao 已提交
35 36 37
#define INDEX_DATA_BOOL_NULL      0x02
#define INDEX_DATA_TINYINT_NULL   0x80
#define INDEX_DATA_SMALLINT_NULL  0x8000
wafwerar's avatar
wafwerar 已提交
38 39
#define INDEX_DATA_INT_NULL       0x80000000LL
#define INDEX_DATA_BIGINT_NULL    0x8000000000000000LL
dengyihao's avatar
dengyihao 已提交
40 41
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL

dengyihao's avatar
dengyihao 已提交
42
#define INDEX_DATA_FLOAT_NULL    0x7FF00000            // it is an NAN
wafwerar's avatar
wafwerar 已提交
43
#define INDEX_DATA_DOUBLE_NULL   0x7FFFFF0000000000LL  // an NAN
dengyihao's avatar
dengyihao 已提交
44 45 46 47
#define INDEX_DATA_NCHAR_NULL    0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL   0xFF
#define INDEX_DATA_JSON_NULL     0xFFFFFFFF
#define INDEX_DATA_JSON_null     0xFFFFFFFE
dengyihao's avatar
dengyihao 已提交
48 49
#define INDEX_DATA_JSON_NOT_NULL 0x01

dengyihao's avatar
dengyihao 已提交
50
#define INDEX_DATA_UTINYINT_NULL  0xFF
dengyihao's avatar
dengyihao 已提交
51
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
dengyihao's avatar
dengyihao 已提交
52 53
#define INDEX_DATA_UINT_NULL      0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL   0xFFFFFFFFFFFFFFFFL
dengyihao's avatar
dengyihao 已提交
54

dengyihao's avatar
dengyihao 已提交
55
#define INDEX_DATA_NULL_STR   "NULL"
dengyihao's avatar
dengyihao 已提交
56 57
#define INDEX_DATA_NULL_STR_L "null"

dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
void*   indexQhandle = NULL;
int32_t indexRefMgt;

static void indexDestroy(void* sIdx);

dengyihao's avatar
dengyihao 已提交
63 64
void indexInit() {
  // refactor later
dengyihao's avatar
dengyihao 已提交
65
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
dengyihao's avatar
dengyihao 已提交
66
  indexRefMgt = taosOpenRef(10, indexDestroy);
dengyihao's avatar
dengyihao 已提交
67
}
dengyihao's avatar
dengyihao 已提交
68 69 70 71
void indexCleanUp() {
  // refacto later
  taosCleanUpScheduler(indexQhandle);
}
dengyihao's avatar
dengyihao 已提交
72

dengyihao's avatar
dengyihao 已提交
73
typedef struct SIdxColInfo {
74
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
75
  int cVersion;
76
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
77

wafwerar's avatar
wafwerar 已提交
78
static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
79
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
80
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
81

dengyihao's avatar
dengyihao 已提交
82 83
static void idxInterRsltDestroy(SArray* results);
static int  idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
dengyihao's avatar
dengyihao 已提交
84

dengyihao's avatar
dengyihao 已提交
85
static int idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
dengyihao's avatar
dengyihao 已提交
86

dengyihao's avatar
dengyihao 已提交
87
// merge cache and tfile by opera type
dengyihao's avatar
dengyihao 已提交
88
static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
dengyihao's avatar
dengyihao 已提交
89

dengyihao's avatar
dengyihao 已提交
90 91
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t        indexSerialKey(ICacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
92

dengyihao's avatar
dengyihao 已提交
93
static void idxPost(void* idx) {
dengyihao's avatar
dengyihao 已提交
94 95 96 97 98 99 100 101
  SIndex* pIdx = idx;
  tsem_post(&pIdx->sem);
}
static void indexWait(void* idx) {
  SIndex* pIdx = idx;
  tsem_wait(&pIdx->sem);
}

dengyihao's avatar
dengyihao 已提交
102
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
wafwerar's avatar
wafwerar 已提交
103
  taosThreadOnce(&isInit, indexInit);
wafwerar's avatar
wafwerar 已提交
104
  SIndex* sIdx = taosMemoryCalloc(1, sizeof(SIndex));
dengyihao's avatar
dengyihao 已提交
105 106 107
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
108

dengyihao's avatar
dengyihao 已提交
109 110
  // sIdx->cache = (void*)idxCacheCreate(sIdx);
  sIdx->tindex = idxTFileCreate(path);
dengyihao's avatar
dengyihao 已提交
111 112 113
  if (sIdx->tindex == NULL) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
114

115 116
  sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  sIdx->cVersion = 1;
dengyihao's avatar
dengyihao 已提交
117
  sIdx->path = tstrdup(path);
wafwerar's avatar
wafwerar 已提交
118
  taosThreadMutexInit(&sIdx->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
119
  tsem_init(&sIdx->sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
120

dengyihao's avatar
dengyihao 已提交
121 122
  sIdx->refId = idxAddRef(sIdx);
  idxAcquireRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
123

124 125
  *index = sIdx;
  return 0;
dengyihao's avatar
dengyihao 已提交
126

dengyihao's avatar
dengyihao 已提交
127
END:
dengyihao's avatar
dengyihao 已提交
128 129 130
  if (sIdx != NULL) {
    indexClose(sIdx);
  }
dengyihao's avatar
dengyihao 已提交
131 132
  *index = NULL;
  return -1;
H
refact  
Hongze Cheng 已提交
133
}
dengyihao's avatar
dengyihao 已提交
134

dengyihao's avatar
dengyihao 已提交
135 136
void indexDestroy(void* handle) {
  SIndex* sIdx = handle;
dengyihao's avatar
dengyihao 已提交
137
  taosThreadMutexDestroy(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
138
  tsem_destroy(&sIdx->sem);
dengyihao's avatar
dengyihao 已提交
139
  idxTFileDestroy(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
140 141 142 143 144 145
  taosMemoryFree(sIdx->path);
  taosMemoryFree(sIdx);
  return;
}
void indexClose(SIndex* sIdx) {
  bool ref = 0;
dengyihao's avatar
dengyihao 已提交
146 147 148 149
  if (sIdx->colObj != NULL) {
    void* iter = taosHashIterate(sIdx->colObj, NULL);
    while (iter) {
      IndexCache** pCache = iter;
dengyihao's avatar
dengyihao 已提交
150
      idxCacheForceToMerge((void*)(*pCache));
151
      indexInfo("%s wait to merge", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
152
      indexWait((void*)(sIdx));
dengyihao's avatar
dengyihao 已提交
153
      indexInfo("%s finish to wait", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
154
      iter = taosHashIterate(sIdx->colObj, iter);
dengyihao's avatar
dengyihao 已提交
155
      idxCacheUnRef(*pCache);
dengyihao's avatar
dengyihao 已提交
156
    }
dengyihao's avatar
dengyihao 已提交
157 158
    taosHashCleanup(sIdx->colObj);
    sIdx->colObj = NULL;
dengyihao's avatar
dengyihao 已提交
159
  }
dengyihao's avatar
dengyihao 已提交
160 161
  idxReleaseRef(sIdx->refId);
  idxRemoveRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
162
}
dengyihao's avatar
dengyihao 已提交
163
int64_t idxAddRef(void* p) {
dengyihao's avatar
dengyihao 已提交
164 165 166
  // impl
  return taosAddRef(indexRefMgt, p);
}
dengyihao's avatar
dengyihao 已提交
167
int32_t idxRemoveRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
168 169 170 171
  // impl later
  return taosRemoveRef(indexRefMgt, ref);
}

dengyihao's avatar
dengyihao 已提交
172
void idxAcquireRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
173 174 175
  // impl
  taosAcquireRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
176
void idxReleaseRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
177 178 179
  // impl
  taosReleaseRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
180

dengyihao's avatar
dengyihao 已提交
181
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
182
  // TODO(yihao): reduce the lock range
wafwerar's avatar
wafwerar 已提交
183
  taosThreadMutexLock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
184
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
185 186
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
187 188
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
189
    int32_t   sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
190 191

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
192
    if (cache == NULL) {
dengyihao's avatar
dengyihao 已提交
193
      IndexCache* pCache = idxCacheCreate(index, p->suid, p->colName, p->colType);
dengyihao's avatar
dengyihao 已提交
194
      taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
195
    }
196
  }
dengyihao's avatar
dengyihao 已提交
197
  taosThreadMutexUnlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
198 199

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
200 201
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
202 203
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
204
    int32_t   sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
205
    indexDebug("w suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
206 207

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
208
    assert(*cache != NULL);
dengyihao's avatar
dengyihao 已提交
209
    int ret = idxCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
210 211 212
    if (ret != 0) {
      return ret;
    }
dengyihao's avatar
dengyihao 已提交
213
  }
dengyihao's avatar
dengyihao 已提交
214
  return 0;
dengyihao's avatar
dengyihao 已提交
215
}
dengyihao's avatar
dengyihao 已提交
216
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
217 218
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
219
  SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
220
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
221
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
222 223
    SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
    SArray*          trslt = NULL;
dengyihao's avatar
dengyihao 已提交
224
    idxTermSearch(index, qterm, &trslt);
dengyihao's avatar
dengyihao 已提交
225
    taosArrayPush(iRslts, (void*)&trslt);
226
  }
dengyihao's avatar
dengyihao 已提交
227 228
  idxMergeFinalResults(iRslts, opera, result);
  idxInterRsltDestroy(iRslts);
dengyihao's avatar
dengyihao 已提交
229
  return 0;
dengyihao's avatar
dengyihao 已提交
230 231
}

232
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
dengyihao's avatar
dengyihao 已提交
233
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
dengyihao's avatar
dengyihao 已提交
234

235 236
SIndexOpts* indexOptsCreate() { return NULL; }
void        indexOptsDestroy(SIndexOpts* opts) { return; }
dengyihao's avatar
dengyihao 已提交
237 238 239 240
/*
 * @param: oper
 *
 */
dengyihao's avatar
dengyihao 已提交
241
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
dengyihao's avatar
dengyihao 已提交
242 243
  SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
  if (mtq == NULL) {
dengyihao's avatar
dengyihao 已提交
244 245
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
246 247 248
  mtq->opera = opera;
  mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
  return mtq;
dengyihao's avatar
dengyihao 已提交
249
}
dengyihao's avatar
dengyihao 已提交
250
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
251
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
252
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
253
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
254
  }
255
  taosArrayDestroy(pQuery->query);
wafwerar's avatar
wafwerar 已提交
256
  taosMemoryFree(pQuery);
dengyihao's avatar
dengyihao 已提交
257
};
dengyihao's avatar
dengyihao 已提交
258
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
259
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
260 261
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
262 263
}

dengyihao's avatar
dengyihao 已提交
264 265
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
                            int32_t nColName, const char* colVal, int32_t nColVal) {
dengyihao's avatar
dengyihao 已提交
266 267
  SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
  if (tm == NULL) {
dengyihao's avatar
dengyihao 已提交
268 269
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
270

dengyihao's avatar
dengyihao 已提交
271 272 273
  tm->suid = suid;
  tm->operType = oper;
  tm->colType = colType;
dengyihao's avatar
dengyihao 已提交
274

dengyihao's avatar
dengyihao 已提交
275
#if 0
dengyihao's avatar
dengyihao 已提交
276 277 278
  tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
  memcpy(tm->colName, colName, nColName);
  tm->nColName = nColName;
dengyihao's avatar
dengyihao 已提交
279

dengyihao's avatar
dengyihao 已提交
280 281 282
  tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
  memcpy(tm->colVal, colVal, nColVal);
  tm->nColVal = nColVal;
dengyihao's avatar
dengyihao 已提交
283 284 285 286 287 288 289 290 291
#endif

#if 1

  tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
  memcpy(tm->colName, colName, nColName);
  tm->nColName = nColName;

  char*   buf = NULL;
dengyihao's avatar
dengyihao 已提交
292
  int32_t len = idxConvertDataToStr((void*)colVal, IDX_TYPE_GET_TYPE(colType), (void**)&buf);
dengyihao's avatar
dengyihao 已提交
293 294 295 296 297 298
  assert(len != -1);

  tm->colVal = buf;
  tm->nColVal = len;

#endif
dengyihao's avatar
dengyihao 已提交
299 300

  return tm;
dengyihao's avatar
dengyihao 已提交
301
}
dengyihao's avatar
dengyihao 已提交
302

dengyihao's avatar
dengyihao 已提交
303
void indexTermDestroy(SIndexTerm* p) {
wafwerar's avatar
wafwerar 已提交
304 305 306
  taosMemoryFree(p->colName);
  taosMemoryFree(p->colVal);
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
307 308
}

dengyihao's avatar
dengyihao 已提交
309
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
310

dengyihao's avatar
dengyihao 已提交
311
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
312 313
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
314
}
dengyihao's avatar
dengyihao 已提交
315
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
316
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
317
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
318 319
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
320
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
321
}
dengyihao's avatar
dengyihao 已提交
322

dengyihao's avatar
dengyihao 已提交
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
/*
 * rebuild index
 */

static void idxSchedRebuildIdx(SSchedMsg* msg) {
  // TODO
  SIndex* idx = msg->ahandle;

  int8_t st = kFinished;
  atomic_store_8(&idx->status, st);
  idxReleaseRef(idx->refId);
}
void indexRebuild(SIndexJson* idx, void* iter) {
  // set up rebuild status
  int8_t st = kRebuild;
  atomic_store_8(&idx->status, st);

  // task put into BG thread
  SSchedMsg schedMsg = {0};
  schedMsg.fp = idxSchedRebuildIdx;
  schedMsg.ahandle = idx;
  idxAcquireRef(idx->refId);
  taosScheduleTask(indexQhandle, &schedMsg);
}

/*
 * check index json status
 **/
bool indexIsRebuild(SIndex* idx) {
  // idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}
/*
 * rebuild index
 */
void indexJsonRebuild(SIndexJson* idx, void* iter) {
  // idx rebuild or not
  indexRebuild(idx, iter);
}

/*
 * check index json status
 **/
bool indexJsonIsRebuild(SIndexJson* idx) {
  // load idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}

dengyihao's avatar
dengyihao 已提交
371
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
dengyihao's avatar
dengyihao 已提交
372 373
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
374
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
375

dengyihao's avatar
dengyihao 已提交
376 377
  // Get col info
  IndexCache* cache = NULL;
dengyihao's avatar
dengyihao 已提交
378 379

  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
380 381
  ICacheKey key = {
      .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
dengyihao's avatar
dengyihao 已提交
382
  indexDebug("r suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
383
  int32_t sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
384

wafwerar's avatar
wafwerar 已提交
385
  taosThreadMutexLock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
386
  IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
387
  cache = (pCache == NULL) ? NULL : *pCache;
wafwerar's avatar
wafwerar 已提交
388
  taosThreadMutexUnlock(&sIdx->mtx);
389

dengyihao's avatar
dengyihao 已提交
390
  *result = taosArrayInit(4, sizeof(uint64_t));
391
  // TODO: iterator mem and tidex
dengyihao's avatar
dengyihao 已提交
392
  STermValueType s = kTypeValue;
dengyihao's avatar
dengyihao 已提交
393

dengyihao's avatar
add UT  
dengyihao 已提交
394 395
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
396
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
397
  if (0 == idxCacheSearch(cache, query, tr, &s)) {
dengyihao's avatar
dengyihao 已提交
398
    if (s == kTypeDeletion) {
dengyihao's avatar
dengyihao 已提交
399
      indexInfo("col: %s already drop by", term->colName);
400
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
401 402
      return 0;
    } else {
dengyihao's avatar
add UT  
dengyihao 已提交
403
      st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
404
      if (0 != idxTFileSearch(sIdx->tindex, query, tr)) {
405
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
406
        goto END;
407
      }
dengyihao's avatar
add UT  
dengyihao 已提交
408 409
      int64_t tfCost = taosGetTimestampUs() - st;
      indexInfo("tfile search cost: %" PRIu64 "us", tfCost);
dengyihao's avatar
dengyihao 已提交
410 411 412
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
413
    goto END;
dengyihao's avatar
dengyihao 已提交
414
  }
dengyihao's avatar
add UT  
dengyihao 已提交
415 416
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("search cost: %" PRIu64 "us", cost);
dengyihao's avatar
dengyihao 已提交
417

dengyihao's avatar
dengyihao 已提交
418
  idxTRsltMergeTo(tr, *result);
dengyihao's avatar
add UT  
dengyihao 已提交
419

dengyihao's avatar
dengyihao 已提交
420
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
421
  return 0;
dengyihao's avatar
dengyihao 已提交
422
END:
dengyihao's avatar
dengyihao 已提交
423
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
424
  return -1;
dengyihao's avatar
dengyihao 已提交
425
}
dengyihao's avatar
dengyihao 已提交
426
static void idxInterRsltDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
427 428 429
  if (results == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
430 431 432

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
433
    SArray* p = taosArrayGetP(results, i);
434 435
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
436 437
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
438

dengyihao's avatar
dengyihao 已提交
439
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
440
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
441 442
  for (int i = 0; i < taosArrayGetSize(in); i--) {
    SArray* t = taosArrayGetP(in, i);
dengyihao's avatar
dengyihao 已提交
443 444 445
    taosArraySort(t, uidCompare);
    taosArrayRemoveDuplicate(t, uidCompare, NULL);
  }
dengyihao's avatar
dengyihao 已提交
446

dengyihao's avatar
dengyihao 已提交
447
  if (oType == MUST) {
dengyihao's avatar
dengyihao 已提交
448
    iIntersection(in, out);
dengyihao's avatar
dengyihao 已提交
449
  } else if (oType == SHOULD) {
dengyihao's avatar
dengyihao 已提交
450
    iUnion(in, out);
dengyihao's avatar
dengyihao 已提交
451
  } else if (oType == NOT) {
452
    // just one column index, enhance later
dengyihao's avatar
dengyihao 已提交
453
    // taosArrayAddAll(fResults, interResults);
454
    // not use currently
dengyihao's avatar
dengyihao 已提交
455 456 457
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
458

dengyihao's avatar
dengyihao 已提交
459
static void idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
460 461 462 463
  int32_t sz = taosArrayGetSize(result);
  if (sz > 0) {
    TFileValue* lv = taosArrayGetP(result, sz - 1);
    if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
dengyihao's avatar
dengyihao 已提交
464 465
      idxTRsltMergeTo(tr, lv->tableId);
      idxTRsltClear(tr);
466 467 468

      taosArrayPush(result, &tfv);
    } else if (tfv == NULL) {
dengyihao's avatar
dengyihao 已提交
469
      // handle last iterator
dengyihao's avatar
dengyihao 已提交
470
      idxTRsltMergeTo(tr, lv->tableId);
471
    } else {
dengyihao's avatar
dengyihao 已提交
472
      // temp result saved in help
473 474 475 476 477 478
      tfileValueDestroy(tfv);
    }
  } else {
    taosArrayPush(result, &tfv);
  }
}
dengyihao's avatar
dengyihao 已提交
479
static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
480
  char*       colVal = (cv != NULL) ? cv->colVal : tv->colVal;
dengyihao's avatar
dengyihao 已提交
481
  TFileValue* tfv = tfileValueCreate(colVal);
482

dengyihao's avatar
dengyihao 已提交
483
  idxMayMergeTempToFinalRslt(result, tfv, tr);
484

dengyihao's avatar
dengyihao 已提交
485
  if (cv != NULL) {
486
    uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
dengyihao's avatar
dengyihao 已提交
487
    uint32_t ver = cv->ver;
dengyihao's avatar
dengyihao 已提交
488
    if (cv->type == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
489
      INDEX_MERGE_ADD_DEL(tr->del, tr->add, id)
dengyihao's avatar
dengyihao 已提交
490
    } else if (cv->type == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
491
      INDEX_MERGE_ADD_DEL(tr->add, tr->del, id)
dengyihao's avatar
dengyihao 已提交
492 493 494
    }
  }
  if (tv != NULL) {
dengyihao's avatar
dengyihao 已提交
495
    taosArrayAddAll(tr->total, tv->val);
dengyihao's avatar
dengyihao 已提交
496 497
  }
}
dengyihao's avatar
dengyihao 已提交
498
static void idxDestroyFinalRslt(SArray* result) {
dengyihao's avatar
dengyihao 已提交
499 500 501 502 503 504 505
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  for (size_t i = 0; i < sz; i++) {
    TFileValue* tv = taosArrayGetP(result, i);
    tfileValueDestroy(tv);
  }
  taosArrayDestroy(result);
}
506

dengyihao's avatar
dengyihao 已提交
507
int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
dengyihao's avatar
dengyihao 已提交
508 509 510
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
511
  indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
512

dengyihao's avatar
dengyihao 已提交
513 514
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
515 516
  IndexCache* pCache = (IndexCache*)cache;

dengyihao's avatar
dengyihao 已提交
517 518
  while (quit && atomic_load_32(&pCache->merging) == 1)
    ;
dengyihao's avatar
dengyihao 已提交
519
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
dengyihao's avatar
dengyihao 已提交
520 521 522
  if (pReader == NULL) {
    indexWarn("empty tfile reader found");
  }
dengyihao's avatar
dengyihao 已提交
523
  // handle flush
dengyihao's avatar
dengyihao 已提交
524
  Iterate* cacheIter = idxCacheIteratorCreate(pCache);
dengyihao's avatar
dengyihao 已提交
525 526
  if (cacheIter == NULL) {
    indexError("%p immtable is empty, ignore merge opera", pCache);
dengyihao's avatar
dengyihao 已提交
527
    idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
528
    tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
529 530
    atomic_store_32(&pCache->merging, 0);
    if (quit) {
dengyihao's avatar
dengyihao 已提交
531
      idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
532
    }
dengyihao's avatar
dengyihao 已提交
533
    idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
534 535 536
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
537
  Iterate* tfileIter = tfileIteratorCreate(pReader);
dengyihao's avatar
dengyihao 已提交
538 539 540
  if (tfileIter == NULL) {
    indexWarn("empty tfile reader iterator");
  }
dengyihao's avatar
dengyihao 已提交
541 542 543

  SArray* result = taosArrayInit(1024, sizeof(void*));

dengyihao's avatar
dengyihao 已提交
544 545
  bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
  bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
546

dengyihao's avatar
dengyihao 已提交
547
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
548 549 550 551 552 553 554 555 556 557 558 559
  while (cn == true || tn == true) {
    IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
    IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;

    int comp = 0;
    if (cn == true && tn == true) {
      comp = strcmp(cv->colVal, tv->colVal);
    } else if (cn == true) {
      comp = -1;
    } else {
      comp = 1;
    }
dengyihao's avatar
dengyihao 已提交
560
    if (comp == 0) {
dengyihao's avatar
dengyihao 已提交
561
      idxMergeCacheAndTFile(result, cv, tv, tr);
dengyihao's avatar
dengyihao 已提交
562 563 564
      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
    } else if (comp < 0) {
dengyihao's avatar
dengyihao 已提交
565
      idxMergeCacheAndTFile(result, cv, NULL, tr);
dengyihao's avatar
dengyihao 已提交
566 567
      cn = cacheIter->next(cacheIter);
    } else {
dengyihao's avatar
dengyihao 已提交
568
      idxMergeCacheAndTFile(result, NULL, tv, tr);
dengyihao's avatar
dengyihao 已提交
569 570 571
      tn = tfileIter->next(tfileIter);
    }
  }
dengyihao's avatar
dengyihao 已提交
572
  idxMayMergeTempToFinalRslt(result, NULL, tr);
dengyihao's avatar
dengyihao 已提交
573
  idxTRsltDestroy(tr);
574

dengyihao's avatar
dengyihao 已提交
575 576
  int ret = idxGenTFile(sIdx, pCache, result);
  idxDestroyFinalRslt(result);
dengyihao's avatar
dengyihao 已提交
577

dengyihao's avatar
dengyihao 已提交
578
  idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
579

dengyihao's avatar
dengyihao 已提交
580
  idxCacheIteratorDestroy(cacheIter);
dengyihao's avatar
dengyihao 已提交
581 582
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
583
  tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
584
  idxCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
585 586 587 588 589 590 591

  int64_t cost = taosGetTimestampUs() - st;
  if (ret != 0) {
    indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
  } else {
    indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
  }
dengyihao's avatar
dengyihao 已提交
592 593
  atomic_store_32(&pCache->merging, 0);
  if (quit) {
dengyihao's avatar
dengyihao 已提交
594
    idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
595
  }
dengyihao's avatar
dengyihao 已提交
596
  idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
597

dengyihao's avatar
dengyihao 已提交
598
  return ret;
dengyihao's avatar
dengyihao 已提交
599
}
dengyihao's avatar
dengyihao 已提交
600 601 602
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
dengyihao's avatar
dengyihao 已提交
603
    value->val = NULL;
dengyihao's avatar
dengyihao 已提交
604
  } else {
dengyihao's avatar
dengyihao 已提交
605 606 607
    if (value->val != NULL) {
      taosArrayClear(value->val);
    }
dengyihao's avatar
dengyihao 已提交
608
  }
wafwerar's avatar
wafwerar 已提交
609
  taosMemoryFree(value->colVal);
dengyihao's avatar
dengyihao 已提交
610 611
  value->colVal = NULL;
}
612

dengyihao's avatar
dengyihao 已提交
613
static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) {
614 615
  ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
  int64_t   ver = CACHE_VERSION(cache);
dengyihao's avatar
dengyihao 已提交
616

dengyihao's avatar
dengyihao 已提交
617
  IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
618 619

  taosThreadMutexLock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
620
  TFileReader* rd = tfileCacheGet(tf->cache, &key);
dengyihao's avatar
dengyihao 已提交
621 622 623
  taosThreadMutexUnlock(&tf->mtx);

  if (rd != NULL) {
dengyihao's avatar
dengyihao 已提交
624
    ver = (ver > rd->header.version ? ver : rd->header.version) + 1;
dengyihao's avatar
dengyihao 已提交
625
    indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver);
626
  }
dengyihao's avatar
dengyihao 已提交
627
  tfileReaderUnRef(rd);
628 629
  return ver;
}
dengyihao's avatar
dengyihao 已提交
630
static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
dengyihao's avatar
dengyihao 已提交
631
  int64_t version = idxGetAvailableVer(sIdx, cache);
632
  indexInfo("file name version: %" PRId64 "", version);
dengyihao's avatar
dengyihao 已提交
633 634
  uint8_t colType = cache->type;

dengyihao's avatar
dengyihao 已提交
635
  TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
dengyihao's avatar
dengyihao 已提交
636 637 638 639 640 641 642 643 644 645 646 647
  if (tw == NULL) {
    indexError("failed to open file to write");
    return -1;
  }

  int ret = tfileWriterPut(tw, batch, true);
  if (ret != 0) {
    indexError("failed to write into tindex ");
    goto END;
  }
  tfileWriterClose(tw);

dengyihao's avatar
dengyihao 已提交
648
  TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
dengyihao's avatar
dengyihao 已提交
649 650 651
  if (reader == NULL) {
    return -1;
  }
652
  indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
653

dengyihao's avatar
dengyihao 已提交
654 655
  IndexTFile* tf = (IndexTFile*)sIdx->tindex;

dengyihao's avatar
dengyihao 已提交
656
  TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
657
  ICacheKey    key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
658

dengyihao's avatar
dengyihao 已提交
659 660 661
  taosThreadMutexLock(&tf->mtx);
  tfileCachePut(tf->cache, &key, reader);
  taosThreadMutexUnlock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
662

dengyihao's avatar
dengyihao 已提交
663 664
  return ret;
END:
dengyihao's avatar
dengyihao 已提交
665
  if (tw != NULL) {
dengyihao's avatar
dengyihao 已提交
666
    idxFileCtxDestroy(tw->ctx, true);
wafwerar's avatar
wafwerar 已提交
667
    taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
668
  }
dengyihao's avatar
dengyihao 已提交
669
  return -1;
dengyihao's avatar
dengyihao 已提交
670
}
dengyihao's avatar
dengyihao 已提交
671

dengyihao's avatar
dengyihao 已提交
672
int32_t idxSerialCacheKey(ICacheKey* key, char* buf) {
dengyihao's avatar
dengyihao 已提交
673
  bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
674

dengyihao's avatar
dengyihao 已提交
675
  char* p = buf;
dengyihao's avatar
dengyihao 已提交
676
  char  tbuf[65] = {0};
dengyihao's avatar
dengyihao 已提交
677
  idxInt2str((int64_t)key->suid, tbuf, 0);
dengyihao's avatar
dengyihao 已提交
678 679

  SERIALIZE_STR_VAR_TO_BUF(buf, tbuf, strlen(tbuf));
dengyihao's avatar
dengyihao 已提交
680
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
681 682 683 684 685
  if (hasJson) {
    SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
  } else {
    SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
  }
dengyihao's avatar
dengyihao 已提交
686 687
  return buf - p;
}