index.c 15.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
dengyihao's avatar
dengyihao 已提交
5 6
 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
 * Software Foundation.
H
refact  
Hongze Cheng 已提交
7 8 9 10 11 12 13 14 15
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include "indexCache.h"
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
20 21
#include "indexTfile.h"
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
23
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
24

dengyihao's avatar
dengyihao 已提交
25 26
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
27 28
#endif

dengyihao's avatar
dengyihao 已提交
29
#define INDEX_NUM_OF_THREADS 4
dengyihao's avatar
dengyihao 已提交
30
#define INDEX_QUEUE_SIZE     200
dengyihao's avatar
dengyihao 已提交
31 32 33

void* indexQhandle = NULL;

dengyihao's avatar
dengyihao 已提交
34 35
void indexInit() {
  // refactor later
dengyihao's avatar
dengyihao 已提交
36 37
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
}
dengyihao's avatar
dengyihao 已提交
38 39 40 41
void indexCleanUp() {
  // refacto later
  taosCleanUpScheduler(indexQhandle);
}
dengyihao's avatar
dengyihao 已提交
42

dengyihao's avatar
dengyihao 已提交
43
typedef struct SIdxColInfo {
44
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
45
  int cVersion;
46
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
47

wafwerar's avatar
wafwerar 已提交
48
static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
49
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
50
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
51

dengyihao's avatar
dengyihao 已提交
52 53
static void indexInterResultsDestroy(SArray* results);
static int  indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult);
dengyihao's avatar
dengyihao 已提交
54

dengyihao's avatar
dengyihao 已提交
55 56
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);

dengyihao's avatar
dengyihao 已提交
57
// merge cache and tfile by opera type
dengyihao's avatar
dengyihao 已提交
58
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper);
dengyihao's avatar
dengyihao 已提交
59

dengyihao's avatar
dengyihao 已提交
60 61
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t        indexSerialKey(ICacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
62

dengyihao's avatar
dengyihao 已提交
63
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
wafwerar's avatar
wafwerar 已提交
64
  taosThreadOnce(&isInit, indexInit);
wafwerar's avatar
wafwerar 已提交
65
  SIndex* sIdx = taosMemoryCalloc(1, sizeof(SIndex));
dengyihao's avatar
dengyihao 已提交
66 67 68
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
69

70
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
71
  index_t* index = index_open(path);
dengyihao's avatar
dengyihao 已提交
72
  sIdx->index = index;
dengyihao's avatar
dengyihao 已提交
73
#endif
dengyihao's avatar
dengyihao 已提交
74

dengyihao's avatar
dengyihao 已提交
75
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
76 77
  // sIdx->cache = (void*)indexCacheCreate(sIdx);
  sIdx->tindex = indexTFileCreate(path);
dengyihao's avatar
dengyihao 已提交
78 79 80
  if (sIdx->tindex == NULL) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
81

82 83
  sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  sIdx->cVersion = 1;
dengyihao's avatar
dengyihao 已提交
84
  sIdx->path = tstrdup(path);
wafwerar's avatar
wafwerar 已提交
85
  taosThreadMutexInit(&sIdx->mtx, NULL);
86 87
  *index = sIdx;
  return 0;
dengyihao's avatar
dengyihao 已提交
88
#endif
dengyihao's avatar
dengyihao 已提交
89

dengyihao's avatar
dengyihao 已提交
90
END:
dengyihao's avatar
dengyihao 已提交
91 92 93
  if (sIdx != NULL) {
    indexClose(sIdx);
  }
dengyihao's avatar
dengyihao 已提交
94 95 96

  *index = NULL;
  return -1;
H
refact  
Hongze Cheng 已提交
97
}
dengyihao's avatar
dengyihao 已提交
98

dengyihao's avatar
dengyihao 已提交
99
void indexClose(SIndex* sIdx) {
100 101
#ifdef USE_LUCENE
  index_close(sIdex->index);
dengyihao's avatar
dengyihao 已提交
102
  sIdx->index = NULL;
H
refact  
Hongze Cheng 已提交
103
#endif
dengyihao's avatar
dengyihao 已提交
104

dengyihao's avatar
dengyihao 已提交
105
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
106 107 108
  void* iter = taosHashIterate(sIdx->colObj, NULL);
  while (iter) {
    IndexCache** pCache = iter;
dengyihao's avatar
dengyihao 已提交
109 110 111
    if (*pCache) {
      indexCacheUnRef(*pCache);
    }
dengyihao's avatar
dengyihao 已提交
112 113
    iter = taosHashIterate(sIdx->colObj, iter);
  }
114
  taosHashCleanup(sIdx->colObj);
wafwerar's avatar
wafwerar 已提交
115
  taosThreadMutexDestroy(&sIdx->mtx);
116
  indexTFileDestroy(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
117
#endif
wafwerar's avatar
wafwerar 已提交
118 119
  taosMemoryFree(sIdx->path);
  taosMemoryFree(sIdx);
dengyihao's avatar
dengyihao 已提交
120 121
  return;
}
dengyihao's avatar
dengyihao 已提交
122

dengyihao's avatar
dengyihao 已提交
123
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
124
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
125
  index_document_t* doc = index_document_create();
126 127 128

  char buf[16] = {0};
  sprintf(buf, "%d", uid);
dengyihao's avatar
dengyihao 已提交
129

130
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
131 132
    SIndexTerm* p = taosArrayGetP(fVals, i);
    index_document_add(doc, (const char*)(p->key), p->nKey, (const char*)(p->val), p->nVal, 1);
133 134 135 136 137
  }
  index_document_add(doc, NULL, 0, buf, strlen(buf), 0);

  index_put(index->index, doc);
  index_document_destroy(doc);
dengyihao's avatar
dengyihao 已提交
138
#endif
dengyihao's avatar
dengyihao 已提交
139

dengyihao's avatar
dengyihao 已提交
140
#ifdef USE_INVERTED_INDEX
141 142

  // TODO(yihao): reduce the lock range
wafwerar's avatar
wafwerar 已提交
143
  taosThreadMutexLock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
144
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
145 146
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
147 148 149
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
    int32_t   sz = indexSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
150 151

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
152
    if (cache == NULL) {
dengyihao's avatar
dengyihao 已提交
153 154
      IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType);
      taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
155
    }
156
  }
wafwerar's avatar
wafwerar 已提交
157
  taosThreadMutexUnlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
158 159

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
160 161
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
162 163 164
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
    int32_t   sz = indexSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
165 166

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
167 168
    assert(*cache != NULL);
    int ret = indexCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
169 170 171
    if (ret != 0) {
      return ret;
    }
dengyihao's avatar
dengyihao 已提交
172
  }
dengyihao's avatar
dengyihao 已提交
173

dengyihao's avatar
dengyihao 已提交
174
#endif
dengyihao's avatar
dengyihao 已提交
175
  return 0;
dengyihao's avatar
dengyihao 已提交
176
}
dengyihao's avatar
dengyihao 已提交
177
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
dengyihao's avatar
dengyihao 已提交
178
#ifdef USE_INVERTED_INDEX
179 180
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
181
  SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
182
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
183
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
184 185 186 187
    SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
    SArray*          trslt = NULL;
    indexTermSearch(index, qterm, &trslt);
    taosArrayPush(iRslts, (void*)&trslt);
188
  }
dengyihao's avatar
dengyihao 已提交
189 190
  indexMergeFinalResults(iRslts, opera, result);
  indexInterResultsDestroy(iRslts);
191

dengyihao's avatar
dengyihao 已提交
192
#endif
dengyihao's avatar
dengyihao 已提交
193
  return 0;
dengyihao's avatar
dengyihao 已提交
194 195
}

dengyihao's avatar
dengyihao 已提交
196
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
dengyihao's avatar
dengyihao 已提交
197
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
198

dengyihao's avatar
dengyihao 已提交
199
#endif
200

dengyihao's avatar
dengyihao 已提交
201 202
  return 1;
}
dengyihao's avatar
dengyihao 已提交
203
int indexRebuild(SIndex* index, SIndexOpts* opts) {
dengyihao's avatar
dengyihao 已提交
204
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
205 206
#endif

dengyihao's avatar
dengyihao 已提交
207
  return 0;
dengyihao's avatar
dengyihao 已提交
208
}
dengyihao's avatar
dengyihao 已提交
209

dengyihao's avatar
dengyihao 已提交
210
SIndexOpts* indexOptsCreate() {
211
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
212
#endif
213
  return NULL;
dengyihao's avatar
dengyihao 已提交
214
}
dengyihao's avatar
dengyihao 已提交
215
void indexOptsDestroy(SIndexOpts* opts) {
216
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
217
#endif
dengyihao's avatar
dengyihao 已提交
218 219 220 221 222 223
  return;
}
/*
 * @param: oper
 *
 */
dengyihao's avatar
dengyihao 已提交
224
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
dengyihao's avatar
dengyihao 已提交
225 226
  SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
  if (mtq == NULL) {
dengyihao's avatar
dengyihao 已提交
227 228
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
229 230 231
  mtq->opera = opera;
  mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
  return mtq;
dengyihao's avatar
dengyihao 已提交
232
}
dengyihao's avatar
dengyihao 已提交
233
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
234
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
235
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
236
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
237
  }
238
  taosArrayDestroy(pQuery->query);
wafwerar's avatar
wafwerar 已提交
239
  taosMemoryFree(pQuery);
dengyihao's avatar
dengyihao 已提交
240
};
dengyihao's avatar
dengyihao 已提交
241
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
242
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
243 244
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
245 246
}

dengyihao's avatar
dengyihao 已提交
247 248
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryType, uint8_t colType,
                            const char* colName, int32_t nColName, const char* colVal, int32_t nColVal) {
dengyihao's avatar
dengyihao 已提交
249 250
  SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
  if (tm == NULL) {
dengyihao's avatar
dengyihao 已提交
251 252
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
253

dengyihao's avatar
dengyihao 已提交
254 255 256
  tm->suid = suid;
  tm->operType = oper;
  tm->colType = colType;
dengyihao's avatar
dengyihao 已提交
257

dengyihao's avatar
dengyihao 已提交
258 259 260
  tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
  memcpy(tm->colName, colName, nColName);
  tm->nColName = nColName;
dengyihao's avatar
dengyihao 已提交
261

dengyihao's avatar
dengyihao 已提交
262 263 264
  tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
  memcpy(tm->colVal, colVal, nColVal);
  tm->nColVal = nColVal;
dengyihao's avatar
dengyihao 已提交
265
  tm->qType = queryType;
dengyihao's avatar
dengyihao 已提交
266 267

  return tm;
dengyihao's avatar
dengyihao 已提交
268
}
dengyihao's avatar
dengyihao 已提交
269
void indexTermDestroy(SIndexTerm* p) {
wafwerar's avatar
wafwerar 已提交
270 271 272
  taosMemoryFree(p->colName);
  taosMemoryFree(p->colVal);
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
273 274
}

dengyihao's avatar
dengyihao 已提交
275
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
276

dengyihao's avatar
dengyihao 已提交
277
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
278 279
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
280
}
dengyihao's avatar
dengyihao 已提交
281
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
282
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
283
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
284 285
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
286
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
287
}
dengyihao's avatar
dengyihao 已提交
288

dengyihao's avatar
dengyihao 已提交
289 290 291
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
292
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
293

dengyihao's avatar
dengyihao 已提交
294 295
  // Get col info
  IndexCache* cache = NULL;
dengyihao's avatar
dengyihao 已提交
296 297

  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
298 299 300
  ICacheKey key = {
      .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
  int32_t sz = indexSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
301

wafwerar's avatar
wafwerar 已提交
302
  taosThreadMutexLock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
303
  IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
304
  cache = (pCache == NULL) ? NULL : *pCache;
wafwerar's avatar
wafwerar 已提交
305
  taosThreadMutexUnlock(&sIdx->mtx);
306

dengyihao's avatar
dengyihao 已提交
307
  *result = taosArrayInit(4, sizeof(uint64_t));
308
  // TODO: iterator mem and tidex
dengyihao's avatar
dengyihao 已提交
309
  STermValueType s = kTypeValue;
dengyihao's avatar
dengyihao 已提交
310

dengyihao's avatar
add UT  
dengyihao 已提交
311 312
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
313 314
  SIdxTempResult* tr = sIdxTempResultCreate();
  if (0 == indexCacheSearch(cache, query, tr, &s)) {
dengyihao's avatar
dengyihao 已提交
315
    if (s == kTypeDeletion) {
dengyihao's avatar
dengyihao 已提交
316
      indexInfo("col: %s already drop by", term->colName);
317
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
318 319
      return 0;
    } else {
dengyihao's avatar
add UT  
dengyihao 已提交
320
      st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
321
      if (0 != indexTFileSearch(sIdx->tindex, query, tr)) {
322
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
323
        goto END;
324
      }
dengyihao's avatar
add UT  
dengyihao 已提交
325 326
      int64_t tfCost = taosGetTimestampUs() - st;
      indexInfo("tfile search cost: %" PRIu64 "us", tfCost);
dengyihao's avatar
dengyihao 已提交
327 328 329
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
330
    goto END;
dengyihao's avatar
dengyihao 已提交
331
  }
dengyihao's avatar
add UT  
dengyihao 已提交
332 333
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("search cost: %" PRIu64 "us", cost);
dengyihao's avatar
dengyihao 已提交
334 335

  sIdxTempResultMergeTo(*result, tr);
dengyihao's avatar
add UT  
dengyihao 已提交
336

dengyihao's avatar
dengyihao 已提交
337
  sIdxTempResultDestroy(tr);
dengyihao's avatar
dengyihao 已提交
338
  return 0;
dengyihao's avatar
dengyihao 已提交
339 340 341
END:
  sIdxTempResultDestroy(tr);
  return -1;
dengyihao's avatar
dengyihao 已提交
342
}
dengyihao's avatar
dengyihao 已提交
343
static void indexInterResultsDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
344 345 346
  if (results == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
347 348 349

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
350
    SArray* p = taosArrayGetP(results, i);
351 352
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
353 354
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
355

dengyihao's avatar
dengyihao 已提交
356
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) {
357
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
358 359 360 361 362
  for (int i = 0; i < taosArrayGetSize(interResults); i--) {
    SArray* t = taosArrayGetP(interResults, i);
    taosArraySort(t, uidCompare);
    taosArrayRemoveDuplicate(t, uidCompare, NULL);
  }
dengyihao's avatar
dengyihao 已提交
363

dengyihao's avatar
dengyihao 已提交
364
  if (oType == MUST) {
dengyihao's avatar
dengyihao 已提交
365
    iIntersection(interResults, fResults);
dengyihao's avatar
dengyihao 已提交
366
  } else if (oType == SHOULD) {
dengyihao's avatar
dengyihao 已提交
367
    iUnion(interResults, fResults);
dengyihao's avatar
dengyihao 已提交
368
  } else if (oType == NOT) {
369
    // just one column index, enhance later
dengyihao's avatar
dengyihao 已提交
370
    taosArrayAddAll(fResults, interResults);
371
    // not use currently
dengyihao's avatar
dengyihao 已提交
372 373 374
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
375

dengyihao's avatar
dengyihao 已提交
376
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) {
377 378 379 380
  int32_t sz = taosArrayGetSize(result);
  if (sz > 0) {
    TFileValue* lv = taosArrayGetP(result, sz - 1);
    if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
dengyihao's avatar
dengyihao 已提交
381 382
      sIdxTempResultMergeTo(lv->tableId, tr);
      sIdxTempResultClear(tr);
383 384 385

      taosArrayPush(result, &tfv);
    } else if (tfv == NULL) {
dengyihao's avatar
dengyihao 已提交
386 387
      // handle last iterator
      sIdxTempResultMergeTo(lv->tableId, tr);
388
    } else {
dengyihao's avatar
dengyihao 已提交
389
      // temp result saved in help
390 391 392 393 394 395
      tfileValueDestroy(tfv);
    }
  } else {
    taosArrayPush(result, &tfv);
  }
}
dengyihao's avatar
dengyihao 已提交
396
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) {
397
  char*       colVal = (cv != NULL) ? cv->colVal : tv->colVal;
dengyihao's avatar
dengyihao 已提交
398
  TFileValue* tfv = tfileValueCreate(colVal);
399

dengyihao's avatar
dengyihao 已提交
400
  indexMayMergeTempToFinalResult(result, tfv, tr);
401

dengyihao's avatar
dengyihao 已提交
402
  if (cv != NULL) {
403
    uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
dengyihao's avatar
dengyihao 已提交
404
    uint32_t ver = cv->ver;
dengyihao's avatar
dengyihao 已提交
405
    if (cv->type == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
406
      INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id)
dengyihao's avatar
dengyihao 已提交
407
    } else if (cv->type == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
408
      INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id)
dengyihao's avatar
dengyihao 已提交
409 410 411
    }
  }
  if (tv != NULL) {
dengyihao's avatar
dengyihao 已提交
412
    taosArrayAddAll(tr->total, tv->val);
dengyihao's avatar
dengyihao 已提交
413 414
  }
}
dengyihao's avatar
dengyihao 已提交
415
static void indexDestroyFinalResult(SArray* result) {
dengyihao's avatar
dengyihao 已提交
416 417 418 419 420 421 422
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  for (size_t i = 0; i < sz; i++) {
    TFileValue* tv = taosArrayGetP(result, i);
    tfileValueDestroy(tv);
  }
  taosArrayDestroy(result);
}
423

dengyihao's avatar
dengyihao 已提交
424
int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
dengyihao's avatar
dengyihao 已提交
425 426 427
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
428
  indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
429

dengyihao's avatar
dengyihao 已提交
430 431
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
432
  IndexCache*  pCache = (IndexCache*)cache;
dengyihao's avatar
dengyihao 已提交
433
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
dengyihao's avatar
dengyihao 已提交
434 435 436
  if (pReader == NULL) {
    indexWarn("empty tfile reader found");
  }
dengyihao's avatar
dengyihao 已提交
437 438 439
  // handle flush
  Iterate* cacheIter = indexCacheIteratorCreate(pCache);
  Iterate* tfileIter = tfileIteratorCreate(pReader);
dengyihao's avatar
dengyihao 已提交
440 441 442
  if (tfileIter == NULL) {
    indexWarn("empty tfile reader iterator");
  }
dengyihao's avatar
dengyihao 已提交
443 444 445

  SArray* result = taosArrayInit(1024, sizeof(void*));

dengyihao's avatar
dengyihao 已提交
446 447
  bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
  bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
448

dengyihao's avatar
dengyihao 已提交
449
  SIdxTempResult* tr = sIdxTempResultCreate();
dengyihao's avatar
dengyihao 已提交
450 451 452 453 454 455 456 457 458 459 460 461
  while (cn == true || tn == true) {
    IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
    IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;

    int comp = 0;
    if (cn == true && tn == true) {
      comp = strcmp(cv->colVal, tv->colVal);
    } else if (cn == true) {
      comp = -1;
    } else {
      comp = 1;
    }
dengyihao's avatar
dengyihao 已提交
462
    if (comp == 0) {
dengyihao's avatar
dengyihao 已提交
463
      indexMergeCacheAndTFile(result, cv, tv, tr);
dengyihao's avatar
dengyihao 已提交
464 465 466
      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
    } else if (comp < 0) {
dengyihao's avatar
dengyihao 已提交
467
      indexMergeCacheAndTFile(result, cv, NULL, tr);
dengyihao's avatar
dengyihao 已提交
468 469
      cn = cacheIter->next(cacheIter);
    } else {
dengyihao's avatar
dengyihao 已提交
470
      indexMergeCacheAndTFile(result, NULL, tv, tr);
dengyihao's avatar
dengyihao 已提交
471 472 473
      tn = tfileIter->next(tfileIter);
    }
  }
dengyihao's avatar
dengyihao 已提交
474 475
  indexMayMergeTempToFinalResult(result, NULL, tr);
  sIdxTempResultDestroy(tr);
476

dengyihao's avatar
dengyihao 已提交
477
  int ret = indexGenTFile(sIdx, pCache, result);
dengyihao's avatar
dengyihao 已提交
478
  indexDestroyFinalResult(result);
dengyihao's avatar
dengyihao 已提交
479

dengyihao's avatar
dengyihao 已提交
480
  indexCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
481 482 483 484

  indexCacheIteratorDestroy(cacheIter);
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
485 486
  tfileReaderUnRef(pReader);
  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
487 488 489 490 491 492 493 494

  int64_t cost = taosGetTimestampUs() - st;
  if (ret != 0) {
    indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
  } else {
    indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
  }
  return ret;
dengyihao's avatar
dengyihao 已提交
495
}
dengyihao's avatar
dengyihao 已提交
496 497 498
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
dengyihao's avatar
dengyihao 已提交
499
    value->val = NULL;
dengyihao's avatar
dengyihao 已提交
500
  } else {
dengyihao's avatar
dengyihao 已提交
501 502 503
    if (value->val != NULL) {
      taosArrayClear(value->val);
    }
dengyihao's avatar
dengyihao 已提交
504
  }
wafwerar's avatar
wafwerar 已提交
505
  taosMemoryFree(value->colVal);
dengyihao's avatar
dengyihao 已提交
506 507
  value->colVal = NULL;
}
dengyihao's avatar
dengyihao 已提交
508 509 510 511
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
  int32_t version = CACHE_VERSION(cache);
  uint8_t colType = cache->type;

dengyihao's avatar
dengyihao 已提交
512
  TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
dengyihao's avatar
dengyihao 已提交
513 514 515 516 517 518 519 520 521 522 523 524
  if (tw == NULL) {
    indexError("failed to open file to write");
    return -1;
  }

  int ret = tfileWriterPut(tw, batch, true);
  if (ret != 0) {
    indexError("failed to write into tindex ");
    goto END;
  }
  tfileWriterClose(tw);

dengyihao's avatar
dengyihao 已提交
525
  TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
dengyihao's avatar
dengyihao 已提交
526 527 528
  if (reader == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
529 530

  TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
531
  ICacheKey    key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
532

wafwerar's avatar
wafwerar 已提交
533
  taosThreadMutexLock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
534 535
  IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
  tfileCachePut(ifile->cache, &key, reader);
wafwerar's avatar
wafwerar 已提交
536
  taosThreadMutexUnlock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
537 538
  return ret;
END:
dengyihao's avatar
dengyihao 已提交
539 540
  if (tw != NULL) {
    writerCtxDestroy(tw->ctx, true);
wafwerar's avatar
wafwerar 已提交
541
    taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
542
  }
dengyihao's avatar
dengyihao 已提交
543
  return -1;
dengyihao's avatar
dengyihao 已提交
544
}
dengyihao's avatar
dengyihao 已提交
545

dengyihao's avatar
dengyihao 已提交
546 547
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
548

dengyihao's avatar
dengyihao 已提交
549 550 551 552 553
  char* p = buf;
  SERIALIZE_MEM_TO_BUF(buf, key, suid);
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
  // SERIALIZE_MEM_TO_BUF(buf, key, colType);
  // SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
554 555 556 557 558
  if (hasJson) {
    SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
  } else {
    SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
  }
dengyihao's avatar
dengyihao 已提交
559 560
  return buf - p;
}