index.c 15.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17
#include "index.h"
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
18
#include "index_cache.h"
dengyihao's avatar
dengyihao 已提交
19
#include "index_tfile.h"
dengyihao's avatar
dengyihao 已提交
20
#include "index_util.h"
dengyihao's avatar
dengyihao 已提交
21
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
23

dengyihao's avatar
dengyihao 已提交
24 25
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
26 27
#endif

dengyihao's avatar
dengyihao 已提交
28
#define INDEX_NUM_OF_THREADS 4
dengyihao's avatar
dengyihao 已提交
29
#define INDEX_QUEUE_SIZE 200
dengyihao's avatar
dengyihao 已提交
30 31 32

void* indexQhandle = NULL;

dengyihao's avatar
dengyihao 已提交
33 34
void indexInit() {
  // refactor later
dengyihao's avatar
dengyihao 已提交
35 36
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
}
dengyihao's avatar
dengyihao 已提交
37 38 39 40
void indexCleanUp() {
  // refacto later
  taosCleanUpScheduler(indexQhandle);
}
dengyihao's avatar
dengyihao 已提交
41

dengyihao's avatar
dengyihao 已提交
42
static int uidCompare(const void* a, const void* b) {
dengyihao's avatar
dengyihao 已提交
43
  // add more version compare
dengyihao's avatar
dengyihao 已提交
44 45
  uint64_t u1 = *(uint64_t*)a;
  uint64_t u2 = *(uint64_t*)b;
dengyihao's avatar
dengyihao 已提交
46
  return u1 - u2;
dengyihao's avatar
dengyihao 已提交
47
}
dengyihao's avatar
dengyihao 已提交
48
typedef struct SIdxColInfo {
49
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
50
  int cVersion;
51
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
52 53

static pthread_once_t isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
54
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
55
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
56

dengyihao's avatar
dengyihao 已提交
57 58
static void indexInterResultsDestroy(SArray* results);
static int  indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult);
dengyihao's avatar
dengyihao 已提交
59

dengyihao's avatar
dengyihao 已提交
60 61
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);

dengyihao's avatar
dengyihao 已提交
62
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
dengyihao's avatar
dengyihao 已提交
63
  pthread_once(&isInit, indexInit);
dengyihao's avatar
dengyihao 已提交
64
  SIndex* sIdx = calloc(1, sizeof(SIndex));
dengyihao's avatar
dengyihao 已提交
65
  if (sIdx == NULL) { return -1; }
dengyihao's avatar
dengyihao 已提交
66

67
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
68
  index_t* index = index_open(path);
dengyihao's avatar
dengyihao 已提交
69
  sIdx->index = index;
dengyihao's avatar
dengyihao 已提交
70
#endif
dengyihao's avatar
dengyihao 已提交
71

dengyihao's avatar
dengyihao 已提交
72
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
73 74
  // sIdx->cache = (void*)indexCacheCreate(sIdx);
  sIdx->tindex = indexTFileCreate(path);
dengyihao's avatar
dengyihao 已提交
75
  if (sIdx->tindex == NULL) { goto END; }
dengyihao's avatar
dengyihao 已提交
76

77 78
  sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  sIdx->cVersion = 1;
dengyihao's avatar
dengyihao 已提交
79
  sIdx->path = tstrdup(path);
dengyihao's avatar
dengyihao 已提交
80
  pthread_mutex_init(&sIdx->mtx, NULL);
81 82
  *index = sIdx;
  return 0;
dengyihao's avatar
dengyihao 已提交
83
#endif
dengyihao's avatar
dengyihao 已提交
84

dengyihao's avatar
dengyihao 已提交
85
END:
dengyihao's avatar
dengyihao 已提交
86
  if (sIdx != NULL) { indexClose(sIdx); }
dengyihao's avatar
dengyihao 已提交
87 88 89

  *index = NULL;
  return -1;
H
refact  
Hongze Cheng 已提交
90
}
dengyihao's avatar
dengyihao 已提交
91

dengyihao's avatar
dengyihao 已提交
92
void indexClose(SIndex* sIdx) {
93 94
#ifdef USE_LUCENE
  index_close(sIdex->index);
dengyihao's avatar
dengyihao 已提交
95
  sIdx->index = NULL;
H
refact  
Hongze Cheng 已提交
96
#endif
dengyihao's avatar
dengyihao 已提交
97

dengyihao's avatar
dengyihao 已提交
98
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
99 100 101
  void* iter = taosHashIterate(sIdx->colObj, NULL);
  while (iter) {
    IndexCache** pCache = iter;
dengyihao's avatar
dengyihao 已提交
102
    if (*pCache) { indexCacheUnRef(*pCache); }
dengyihao's avatar
dengyihao 已提交
103 104
    iter = taosHashIterate(sIdx->colObj, iter);
  }
105
  taosHashCleanup(sIdx->colObj);
dengyihao's avatar
dengyihao 已提交
106
  pthread_mutex_destroy(&sIdx->mtx);
107
  indexTFileDestroy(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
108
#endif
dengyihao's avatar
dengyihao 已提交
109
  free(sIdx->path);
110
  free(sIdx);
dengyihao's avatar
dengyihao 已提交
111 112
  return;
}
dengyihao's avatar
dengyihao 已提交
113

dengyihao's avatar
dengyihao 已提交
114
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
115
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
116
  index_document_t* doc = index_document_create();
117 118 119

  char buf[16] = {0};
  sprintf(buf, "%d", uid);
dengyihao's avatar
dengyihao 已提交
120

121
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
122 123
    SIndexTerm* p = taosArrayGetP(fVals, i);
    index_document_add(doc, (const char*)(p->key), p->nKey, (const char*)(p->val), p->nVal, 1);
124 125 126 127 128
  }
  index_document_add(doc, NULL, 0, buf, strlen(buf), 0);

  index_put(index->index, doc);
  index_document_destroy(doc);
dengyihao's avatar
dengyihao 已提交
129
#endif
dengyihao's avatar
dengyihao 已提交
130

dengyihao's avatar
dengyihao 已提交
131
#ifdef USE_INVERTED_INDEX
132 133 134

  // TODO(yihao): reduce the lock range
  pthread_mutex_lock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
135
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
136 137 138
    SIndexTerm* p = taosArrayGetP(fVals, i);

    char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
139
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
dengyihao's avatar
dengyihao 已提交
140 141 142
    int32_t   sz = indexSerialCacheKey(&key, buf);

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
143
    if (cache == NULL) {
dengyihao's avatar
dengyihao 已提交
144 145
      IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType);
      taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
146
    }
147
  }
dengyihao's avatar
dengyihao 已提交
148
  pthread_mutex_unlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
149 150

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
151 152 153
    SIndexTerm* p = taosArrayGetP(fVals, i);

    char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
154
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
dengyihao's avatar
dengyihao 已提交
155 156 157
    int32_t   sz = indexSerialCacheKey(&key, buf);

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
158 159
    assert(*cache != NULL);
    int ret = indexCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
160
    if (ret != 0) { return ret; }
dengyihao's avatar
dengyihao 已提交
161
  }
dengyihao's avatar
dengyihao 已提交
162

dengyihao's avatar
dengyihao 已提交
163
#endif
dengyihao's avatar
dengyihao 已提交
164
  return 0;
dengyihao's avatar
dengyihao 已提交
165
}
dengyihao's avatar
dengyihao 已提交
166
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
167 168
#ifdef USE_LUCENE
  EIndexOperatorType opera = multiQuerys->opera;
dengyihao's avatar
dengyihao 已提交
169

170
  int    nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
171 172 173
  char** fields = malloc(sizeof(char*) * nQuery);
  char** keys = malloc(sizeof(char*) * nQuery);
  int*   types = malloc(sizeof(int) * nQuery);
dengyihao's avatar
dengyihao 已提交
174 175

  for (int i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
176 177
    SIndexTermQuery* p = taosArrayGet(multiQuerys->query, i);
    SIndexTerm*      term = p->field_value;
178 179 180 181 182 183 184 185

    fields[i] = calloc(1, term->nKey + 1);
    keys[i] = calloc(1, term->nVal + 1);

    memcpy(fields[i], term->key, term->nKey);
    memcpy(keys[i], term->val, term->nVal);
    types[i] = (int)(p->type);
  }
dengyihao's avatar
dengyihao 已提交
186
  int* tResult = NULL;
187
  int  tsz = 0;
dengyihao's avatar
dengyihao 已提交
188
  index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz);
189

dengyihao's avatar
dengyihao 已提交
190
  for (int i = 0; i < tsz; i++) { taosArrayPush(result, &tResult[i]); }
dengyihao's avatar
dengyihao 已提交
191 192 193 194 195 196 197 198

  for (int i = 0; i < nQuery; i++) {
    free(fields[i]);
    free(keys[i]);
  }
  free(fields);
  free(keys);
  free(types);
dengyihao's avatar
dengyihao 已提交
199 200
#endif

dengyihao's avatar
dengyihao 已提交
201
#ifdef USE_INVERTED_INDEX
202 203
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
204
  SArray* interResults = taosArrayInit(4, POINTER_BYTES);
205
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
206
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
207 208
    SIndexTermQuery* qTerm = taosArrayGet(multiQuerys->query, i);
    SArray*          tResult = NULL;
209
    indexTermSearch(index, qTerm, &tResult);
dengyihao's avatar
dengyihao 已提交
210
    taosArrayPush(interResults, (void*)&tResult);
211
  }
dengyihao's avatar
dengyihao 已提交
212 213
  indexMergeFinalResults(interResults, opera, result);
  indexInterResultsDestroy(interResults);
214

dengyihao's avatar
dengyihao 已提交
215
#endif
dengyihao's avatar
dengyihao 已提交
216
  return 0;
dengyihao's avatar
dengyihao 已提交
217 218
}

dengyihao's avatar
dengyihao 已提交
219
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
dengyihao's avatar
dengyihao 已提交
220
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
221
#endif
222

dengyihao's avatar
dengyihao 已提交
223 224
  return 1;
}
dengyihao's avatar
dengyihao 已提交
225
int indexRebuild(SIndex* index, SIndexOpts* opts){
dengyihao's avatar
dengyihao 已提交
226
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
227 228 229
#endif

}
dengyihao's avatar
dengyihao 已提交
230

dengyihao's avatar
dengyihao 已提交
231
SIndexOpts* indexOptsCreate() {
232
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
233
#endif
234
  return NULL;
dengyihao's avatar
dengyihao 已提交
235
}
dengyihao's avatar
dengyihao 已提交
236
void indexOptsDestroy(SIndexOpts* opts) {
237
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
238
#endif
dengyihao's avatar
dengyihao 已提交
239 240 241 242 243 244
  return;
}
/*
 * @param: oper
 *
 */
dengyihao's avatar
dengyihao 已提交
245 246
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
  SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
dengyihao's avatar
dengyihao 已提交
247
  if (p == NULL) { return NULL; }
248 249
  p->opera = opera;
  p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
dengyihao's avatar
dengyihao 已提交
250 251
  return p;
}
dengyihao's avatar
dengyihao 已提交
252
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
253
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
254
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
255
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
256
  }
257
  taosArrayDestroy(pQuery->query);
dengyihao's avatar
dengyihao 已提交
258 259
  free(pQuery);
};
dengyihao's avatar
dengyihao 已提交
260
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
261
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
262 263
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
264 265
}

dengyihao's avatar
dengyihao 已提交
266 267
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
                            int32_t nColName, const char* colVal, int32_t nColVal) {
dengyihao's avatar
dengyihao 已提交
268
  SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
dengyihao's avatar
dengyihao 已提交
269
  if (t == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
270

271 272
  t->suid = suid;
  t->operType = oper;
dengyihao's avatar
dengyihao 已提交
273 274
  t->colType = colType;

dengyihao's avatar
dengyihao 已提交
275
  t->colName = (char*)calloc(1, nColName + 1);
dengyihao's avatar
dengyihao 已提交
276 277
  memcpy(t->colName, colName, nColName);
  t->nColName = nColName;
dengyihao's avatar
dengyihao 已提交
278

dengyihao's avatar
dengyihao 已提交
279
  t->colVal = (char*)calloc(1, nColVal + 1);
dengyihao's avatar
dengyihao 已提交
280 281
  memcpy(t->colVal, colVal, nColVal);
  t->nColVal = nColVal;
dengyihao's avatar
dengyihao 已提交
282
  return t;
dengyihao's avatar
dengyihao 已提交
283
}
dengyihao's avatar
dengyihao 已提交
284
void indexTermDestroy(SIndexTerm* p) {
dengyihao's avatar
dengyihao 已提交
285 286
  free(p->colName);
  free(p->colVal);
dengyihao's avatar
dengyihao 已提交
287
  free(p);
dengyihao's avatar
dengyihao 已提交
288 289
}

dengyihao's avatar
dengyihao 已提交
290
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
291

dengyihao's avatar
dengyihao 已提交
292
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
293 294
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
295
}
dengyihao's avatar
dengyihao 已提交
296
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
297
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
298
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
299 300
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
301
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
302
}
dengyihao's avatar
dengyihao 已提交
303

dengyihao's avatar
dengyihao 已提交
304 305 306
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
307
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
308

dengyihao's avatar
dengyihao 已提交
309 310
  // Get col info
  IndexCache* cache = NULL;
dengyihao's avatar
dengyihao 已提交
311 312

  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
313
  ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)};
dengyihao's avatar
dengyihao 已提交
314 315
  int32_t   sz = indexSerialCacheKey(&key, buf);

dengyihao's avatar
dengyihao 已提交
316
  pthread_mutex_lock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
317
  IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
318
  cache = (pCache == NULL) ? NULL : *pCache;
319 320
  pthread_mutex_unlock(&sIdx->mtx);

dengyihao's avatar
dengyihao 已提交
321
  *result = taosArrayInit(4, sizeof(uint64_t));
322
  // TODO: iterator mem and tidex
dengyihao's avatar
dengyihao 已提交
323
  STermValueType s = kTypeValue;
dengyihao's avatar
dengyihao 已提交
324
  if (0 == indexCacheSearch(cache, query, *result, &s)) {
dengyihao's avatar
dengyihao 已提交
325
    if (s == kTypeDeletion) {
dengyihao's avatar
dengyihao 已提交
326
      indexInfo("col: %s already drop by", term->colName);
327
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
328 329 330
      return 0;
    } else {
      if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
331 332 333
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
        return -1;
      }
dengyihao's avatar
dengyihao 已提交
334 335 336 337 338 339
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
340
}
dengyihao's avatar
dengyihao 已提交
341
static void indexInterResultsDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
342
  if (results == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
343 344 345

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
346
    SArray* p = taosArrayGetP(results, i);
347 348
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
349 350
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
351
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) {
352
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
353
  SArray* first = taosArrayGetP(interResults, 0);
354
  taosArraySort(first, uidCompare);
dengyihao's avatar
dengyihao 已提交
355
  taosArrayRemoveDuplicate(first, uidCompare, NULL);
dengyihao's avatar
dengyihao 已提交
356

dengyihao's avatar
dengyihao 已提交
357
  if (oType == MUST) {
358 359
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
dengyihao's avatar
dengyihao 已提交
360
  } else if (oType == SHOULD) {
361 362
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
dengyihao's avatar
dengyihao 已提交
363 364
    // tag1 condistion || tag2 condition
  } else if (oType == NOT) {
365 366 367
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
    // not use currently
dengyihao's avatar
dengyihao 已提交
368 369 370
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
371

dengyihao's avatar
dengyihao 已提交
372 373 374 375 376
static void indexMergeSameKey(SArray* result, TFileValue* tv) {
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  if (sz > 0) {
    // TODO(yihao): remove duplicate tableid
    TFileValue* lv = taosArrayGetP(result, sz - 1);
dengyihao's avatar
dengyihao 已提交
377
    // indexError("merge colVal: %s", lv->colVal);
dengyihao's avatar
dengyihao 已提交
378 379 380 381 382 383 384 385
    if (strcmp(lv->colVal, tv->colVal) == 0) {
      taosArrayAddAll(lv->tableId, tv->tableId);
      tfileValueDestroy(tv);
    } else {
      taosArrayPush(result, &tv);
    }
  } else {
    taosArrayPush(result, &tv);
dengyihao's avatar
dengyihao 已提交
386
  }
dengyihao's avatar
dengyihao 已提交
387 388 389 390 391 392 393 394 395
}
static void indexDestroyTempResult(SArray* result) {
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  for (size_t i = 0; i < sz; i++) {
    TFileValue* tv = taosArrayGetP(result, i);
    tfileValueDestroy(tv);
  }
  taosArrayDestroy(result);
}
dengyihao's avatar
dengyihao 已提交
396
int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
dengyihao's avatar
dengyihao 已提交
397
  if (sIdx == NULL) { return -1; }
dengyihao's avatar
dengyihao 已提交
398
  indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
399

dengyihao's avatar
dengyihao 已提交
400 401
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
402
  IndexCache*  pCache = (IndexCache*)cache;
dengyihao's avatar
dengyihao 已提交
403
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
dengyihao's avatar
dengyihao 已提交
404
  if (pReader == NULL) { indexWarn("empty tfile reader found"); }
dengyihao's avatar
dengyihao 已提交
405 406 407
  // handle flush
  Iterate* cacheIter = indexCacheIteratorCreate(pCache);
  Iterate* tfileIter = tfileIteratorCreate(pReader);
dengyihao's avatar
dengyihao 已提交
408
  if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); }
dengyihao's avatar
dengyihao 已提交
409 410 411

  SArray* result = taosArrayInit(1024, sizeof(void*));

dengyihao's avatar
dengyihao 已提交
412 413
  bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
  bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
dengyihao's avatar
dengyihao 已提交
414 415 416 417 418 419 420 421 422 423
  while (cn == true && tn == true) {
    IterateValue* cv = cacheIter->getValue(cacheIter);
    IterateValue* tv = tfileIter->getValue(tfileIter);

    // dump value
    int comp = strcmp(cv->colVal, tv->colVal);
    if (comp == 0) {
      TFileValue* tfv = tfileValueCreate(cv->colVal);
      taosArrayAddAll(tfv->tableId, cv->val);
      taosArrayAddAll(tfv->tableId, tv->val);
dengyihao's avatar
dengyihao 已提交
424
      indexMergeSameKey(result, tfv);
dengyihao's avatar
dengyihao 已提交
425 426 427 428 429 430 431

      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
      continue;
    } else if (comp < 0) {
      TFileValue* tfv = tfileValueCreate(cv->colVal);
      taosArrayAddAll(tfv->tableId, cv->val);
dengyihao's avatar
dengyihao 已提交
432 433

      indexMergeSameKey(result, tfv);
dengyihao's avatar
dengyihao 已提交
434 435 436 437 438
      // copy to final Result;
      cn = cacheIter->next(cacheIter);
    } else {
      TFileValue* tfv = tfileValueCreate(tv->colVal);
      taosArrayAddAll(tfv->tableId, tv->val);
dengyihao's avatar
dengyihao 已提交
439 440

      indexMergeSameKey(result, tfv);
dengyihao's avatar
dengyihao 已提交
441 442 443 444 445 446 447 448
      // copy to final result
      tn = tfileIter->next(tfileIter);
    }
  }
  while (cn == true) {
    IterateValue* cv = cacheIter->getValue(cacheIter);
    TFileValue*   tfv = tfileValueCreate(cv->colVal);
    taosArrayAddAll(tfv->tableId, cv->val);
dengyihao's avatar
dengyihao 已提交
449
    indexMergeSameKey(result, tfv);
dengyihao's avatar
dengyihao 已提交
450 451 452 453 454 455
    cn = cacheIter->next(cacheIter);
  }
  while (tn == true) {
    IterateValue* tv = tfileIter->getValue(tfileIter);
    TFileValue*   tfv = tfileValueCreate(tv->colVal);
    taosArrayAddAll(tfv->tableId, tv->val);
dengyihao's avatar
dengyihao 已提交
456
    indexMergeSameKey(result, tfv);
dengyihao's avatar
dengyihao 已提交
457 458
    tn = tfileIter->next(tfileIter);
  }
dengyihao's avatar
dengyihao 已提交
459 460
  int ret = indexGenTFile(sIdx, pCache, result);
  indexDestroyTempResult(result);
dengyihao's avatar
dengyihao 已提交
461

dengyihao's avatar
dengyihao 已提交
462
  indexCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
463 464 465 466

  indexCacheIteratorDestroy(cacheIter);
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
467 468
  tfileReaderUnRef(pReader);
  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
469 470 471 472 473 474 475 476

  int64_t cost = taosGetTimestampUs() - st;
  if (ret != 0) {
    indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
  } else {
    indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
  }
  return ret;
dengyihao's avatar
dengyihao 已提交
477
}
dengyihao's avatar
dengyihao 已提交
478 479 480
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
dengyihao's avatar
dengyihao 已提交
481
    value->val = NULL;
dengyihao's avatar
dengyihao 已提交
482
  } else {
dengyihao's avatar
dengyihao 已提交
483
    if (value->val != NULL) { taosArrayClear(value->val); }
dengyihao's avatar
dengyihao 已提交
484
  }
485
  free(value->colVal);
dengyihao's avatar
dengyihao 已提交
486 487
  value->colVal = NULL;
}
dengyihao's avatar
dengyihao 已提交
488 489 490 491
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
  int32_t version = CACHE_VERSION(cache);
  uint8_t colType = cache->type;

dengyihao's avatar
dengyihao 已提交
492
  TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
dengyihao's avatar
dengyihao 已提交
493 494 495 496 497 498 499 500 501 502 503 504
  if (tw == NULL) {
    indexError("failed to open file to write");
    return -1;
  }

  int ret = tfileWriterPut(tw, batch, true);
  if (ret != 0) {
    indexError("failed to write into tindex ");
    goto END;
  }
  tfileWriterClose(tw);

dengyihao's avatar
dengyihao 已提交
505
  TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
dengyihao's avatar
dengyihao 已提交
506
  if (reader == NULL) { goto END; }
dengyihao's avatar
dengyihao 已提交
507 508

  TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
509
  ICacheKey    key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
510 511 512 513 514 515 516

  pthread_mutex_lock(&sIdx->mtx);
  IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
  tfileCachePut(ifile->cache, &key, reader);
  pthread_mutex_unlock(&sIdx->mtx);
  return ret;
END:
dengyihao's avatar
dengyihao 已提交
517 518 519 520
  if (tw != NULL) {
    writerCtxDestroy(tw->ctx, true);
    free(tw);
  }
dengyihao's avatar
dengyihao 已提交
521
  return -1;
dengyihao's avatar
dengyihao 已提交
522
}
dengyihao's avatar
dengyihao 已提交
523 524 525 526 527 528 529 530 531 532

int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
  char* p = buf;
  SERIALIZE_MEM_TO_BUF(buf, key, suid);
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
  // SERIALIZE_MEM_TO_BUF(buf, key, colType);
  // SERIALIZE_VAR_TO_BUF(buf, '_', char);
  SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
  return buf - p;
}