index.c 13.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17
#include "index.h"
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
18
#include "index_cache.h"
dengyihao's avatar
dengyihao 已提交
19
#include "index_tfile.h"
dengyihao's avatar
dengyihao 已提交
20
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
21
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
25 26
#endif

dengyihao's avatar
dengyihao 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40
#define INDEX_NUM_OF_THREADS 4
#define INDEX_QUEUE_SIZE 4

void* indexQhandle = NULL;

int32_t indexInit() {
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
  return indexQhandle == NULL ? -1 : 0;
  // do nothing
}
void indexCleanUp() {
  taosCleanUpScheduler(indexQhandle);
}

dengyihao's avatar
dengyihao 已提交
41 42 43
static int uidCompare(const void* a, const void* b) {
  uint64_t u1 = *(uint64_t*)a;
  uint64_t u2 = *(uint64_t*)b;
44 45 46 47 48
  if (u1 == u2) {
    return 0;
  } else {
    return u1 < u2 ? -1 : 1;
  }
dengyihao's avatar
dengyihao 已提交
49
}
dengyihao's avatar
dengyihao 已提交
50
typedef struct SIdxColInfo {
51
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
52
  int cVersion;
53
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
54 55

static pthread_once_t isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
56
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
57

dengyihao's avatar
dengyihao 已提交
58
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
59

dengyihao's avatar
dengyihao 已提交
60 61
static void indexInterResultsDestroy(SArray* results);
static int  indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult);
dengyihao's avatar
dengyihao 已提交
62

dengyihao's avatar
dengyihao 已提交
63
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
dengyihao's avatar
dengyihao 已提交
64
  // pthread_once(&isInit, indexInit);
dengyihao's avatar
dengyihao 已提交
65
  SIndex* sIdx = calloc(1, sizeof(SIndex));
dengyihao's avatar
dengyihao 已提交
66
  if (sIdx == NULL) { return -1; }
dengyihao's avatar
dengyihao 已提交
67

68
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
69
  index_t* index = index_open(path);
dengyihao's avatar
dengyihao 已提交
70
  sIdx->index = index;
dengyihao's avatar
dengyihao 已提交
71
#endif
dengyihao's avatar
dengyihao 已提交
72

dengyihao's avatar
dengyihao 已提交
73
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
74 75
  // sIdx->cache = (void*)indexCacheCreate(sIdx);
  sIdx->tindex = indexTFileCreate(path);
76 77
  sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  sIdx->cVersion = 1;
dengyihao's avatar
dengyihao 已提交
78 79
  sIdx->path = calloc(1, strlen(path) + 1);
  memcpy(sIdx->path, path, strlen(path));
dengyihao's avatar
dengyihao 已提交
80
  pthread_mutex_init(&sIdx->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
81

82
  *index = sIdx;
dengyihao's avatar
dengyihao 已提交
83

84
  return 0;
dengyihao's avatar
dengyihao 已提交
85 86 87 88
#endif

  *index = NULL;
  return -1;
H
refact  
Hongze Cheng 已提交
89
}
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91
void indexClose(SIndex* sIdx) {
92 93
#ifdef USE_LUCENE
  index_close(sIdex->index);
dengyihao's avatar
dengyihao 已提交
94
  sIdx->index = NULL;
H
refact  
Hongze Cheng 已提交
95
#endif
dengyihao's avatar
dengyihao 已提交
96

dengyihao's avatar
dengyihao 已提交
97
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
98
  indexCacheDestroy(sIdx->cache);
dengyihao's avatar
dengyihao 已提交
99 100 101 102 103 104
  void* iter = taosHashIterate(sIdx->colObj, NULL);
  while (iter) {
    IndexCache** pCache = iter;
    if (*pCache) { indexCacheUnRef(*pCache); }
    iter = taosHashIterate(sIdx->colObj, iter);
  }
105
  taosHashCleanup(sIdx->colObj);
dengyihao's avatar
dengyihao 已提交
106
  pthread_mutex_destroy(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
107
#endif
108
  free(sIdx);
dengyihao's avatar
dengyihao 已提交
109 110
  return;
}
dengyihao's avatar
dengyihao 已提交
111

dengyihao's avatar
dengyihao 已提交
112
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
113
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
114
  index_document_t* doc = index_document_create();
115 116 117

  char buf[16] = {0};
  sprintf(buf, "%d", uid);
dengyihao's avatar
dengyihao 已提交
118

119
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
120 121
    SIndexTerm* p = taosArrayGetP(fVals, i);
    index_document_add(doc, (const char*)(p->key), p->nKey, (const char*)(p->val), p->nVal, 1);
122 123 124 125 126
  }
  index_document_add(doc, NULL, 0, buf, strlen(buf), 0);

  index_put(index->index, doc);
  index_document_destroy(doc);
dengyihao's avatar
dengyihao 已提交
127
#endif
dengyihao's avatar
dengyihao 已提交
128

dengyihao's avatar
dengyihao 已提交
129
#ifdef USE_INVERTED_INDEX
130 131 132

  // TODO(yihao): reduce the lock range
  pthread_mutex_lock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
133
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
134
    SIndexTerm*  p = taosArrayGetP(fVals, i);
dengyihao's avatar
dengyihao 已提交
135 136 137 138
    IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
    if (*cache == NULL) {
      IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType);
      taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
139
    }
140
  }
dengyihao's avatar
dengyihao 已提交
141
  pthread_mutex_unlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
142 143

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
144
    SIndexTerm*  p = taosArrayGetP(fVals, i);
dengyihao's avatar
dengyihao 已提交
145 146 147 148
    IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);

    assert(*cache != NULL);
    int ret = indexCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
149
    if (ret != 0) { return ret; }
dengyihao's avatar
dengyihao 已提交
150
  }
dengyihao's avatar
dengyihao 已提交
151

dengyihao's avatar
dengyihao 已提交
152
#endif
dengyihao's avatar
dengyihao 已提交
153
  return 0;
dengyihao's avatar
dengyihao 已提交
154
}
dengyihao's avatar
dengyihao 已提交
155
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
156 157
#ifdef USE_LUCENE
  EIndexOperatorType opera = multiQuerys->opera;
dengyihao's avatar
dengyihao 已提交
158

159
  int    nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
160 161 162
  char** fields = malloc(sizeof(char*) * nQuery);
  char** keys = malloc(sizeof(char*) * nQuery);
  int*   types = malloc(sizeof(int) * nQuery);
dengyihao's avatar
dengyihao 已提交
163 164

  for (int i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
165 166
    SIndexTermQuery* p = taosArrayGet(multiQuerys->query, i);
    SIndexTerm*      term = p->field_value;
167 168 169 170 171 172 173 174

    fields[i] = calloc(1, term->nKey + 1);
    keys[i] = calloc(1, term->nVal + 1);

    memcpy(fields[i], term->key, term->nKey);
    memcpy(keys[i], term->val, term->nVal);
    types[i] = (int)(p->type);
  }
dengyihao's avatar
dengyihao 已提交
175
  int* tResult = NULL;
176
  int  tsz = 0;
dengyihao's avatar
dengyihao 已提交
177
  index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz);
178

dengyihao's avatar
dengyihao 已提交
179 180 181
  for (int i = 0; i < tsz; i++) {
    taosArrayPush(result, &tResult[i]);
  }
dengyihao's avatar
dengyihao 已提交
182 183 184 185 186 187 188 189

  for (int i = 0; i < nQuery; i++) {
    free(fields[i]);
    free(keys[i]);
  }
  free(fields);
  free(keys);
  free(types);
dengyihao's avatar
dengyihao 已提交
190 191
#endif

dengyihao's avatar
dengyihao 已提交
192
#ifdef USE_INVERTED_INDEX
193 194
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
195
  SArray* interResults = taosArrayInit(4, POINTER_BYTES);
196
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
197
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
198 199
    SIndexTermQuery* qTerm = taosArrayGet(multiQuerys->query, i);
    SArray*          tResult = NULL;
200
    indexTermSearch(index, qTerm, &tResult);
dengyihao's avatar
dengyihao 已提交
201
    taosArrayPush(interResults, (void*)&tResult);
202
  }
dengyihao's avatar
dengyihao 已提交
203 204
  indexMergeFinalResults(interResults, opera, result);
  indexInterResultsDestroy(interResults);
205

dengyihao's avatar
dengyihao 已提交
206
#endif
dengyihao's avatar
dengyihao 已提交
207 208 209
  return 1;
}

dengyihao's avatar
dengyihao 已提交
210
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
dengyihao's avatar
dengyihao 已提交
211
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
212
#endif
213

dengyihao's avatar
dengyihao 已提交
214 215
  return 1;
}
dengyihao's avatar
dengyihao 已提交
216
int indexRebuild(SIndex* index, SIndexOpts* opts){
dengyihao's avatar
dengyihao 已提交
217
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
218 219 220
#endif

}
dengyihao's avatar
dengyihao 已提交
221

dengyihao's avatar
dengyihao 已提交
222
SIndexOpts* indexOptsCreate() {
223
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
224
#endif
225
  return NULL;
dengyihao's avatar
dengyihao 已提交
226
}
dengyihao's avatar
dengyihao 已提交
227
void indexOptsDestroy(SIndexOpts* opts){
228
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
229
#endif
230 231 232 233
} /*
   * @param: oper
   *
   */
dengyihao's avatar
dengyihao 已提交
234

dengyihao's avatar
dengyihao 已提交
235 236
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
  SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
dengyihao's avatar
dengyihao 已提交
237
  if (p == NULL) { return NULL; }
238 239
  p->opera = opera;
  p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
dengyihao's avatar
dengyihao 已提交
240 241
  return p;
}
dengyihao's avatar
dengyihao 已提交
242
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
243
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
244
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
245
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
246
  }
247
  taosArrayDestroy(pQuery->query);
dengyihao's avatar
dengyihao 已提交
248 249
  free(pQuery);
};
dengyihao's avatar
dengyihao 已提交
250
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
251
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
252 253
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
254 255
}

dengyihao's avatar
dengyihao 已提交
256 257 258 259 260 261 262 263
SIndexTerm* indexTermCreate(int64_t            suid,
                            SIndexOperOnColumn oper,
                            uint8_t            colType,
                            const char*        colName,
                            int32_t            nColName,
                            const char*        colVal,
                            int32_t            nColVal) {
  SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
dengyihao's avatar
dengyihao 已提交
264
  if (t == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
265

266 267
  t->suid = suid;
  t->operType = oper;
dengyihao's avatar
dengyihao 已提交
268 269
  t->colType = colType;

dengyihao's avatar
dengyihao 已提交
270
  t->colName = (char*)calloc(1, nColName + 1);
dengyihao's avatar
dengyihao 已提交
271 272
  memcpy(t->colName, colName, nColName);
  t->nColName = nColName;
dengyihao's avatar
dengyihao 已提交
273

dengyihao's avatar
dengyihao 已提交
274
  t->colVal = (char*)calloc(1, nColVal + 1);
dengyihao's avatar
dengyihao 已提交
275 276
  memcpy(t->colVal, colVal, nColVal);
  t->nColVal = nColVal;
dengyihao's avatar
dengyihao 已提交
277
  return t;
dengyihao's avatar
dengyihao 已提交
278
}
dengyihao's avatar
dengyihao 已提交
279
void indexTermDestroy(SIndexTerm* p) {
dengyihao's avatar
dengyihao 已提交
280 281
  free(p->colName);
  free(p->colVal);
dengyihao's avatar
dengyihao 已提交
282
  free(p);
dengyihao's avatar
dengyihao 已提交
283 284
}

dengyihao's avatar
dengyihao 已提交
285 286 287
SIndexMultiTerm* indexMultiTermCreate() {
  return taosArrayInit(4, sizeof(SIndexTerm*));
}
288

dengyihao's avatar
dengyihao 已提交
289
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
290 291
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
292
}
dengyihao's avatar
dengyihao 已提交
293
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
294
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
295
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
296 297
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
298
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
299
}
dengyihao's avatar
dengyihao 已提交
300

dengyihao's avatar
dengyihao 已提交
301 302 303
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
304
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
305

dengyihao's avatar
dengyihao 已提交
306 307
  // Get col info
  IndexCache* cache = NULL;
308
  pthread_mutex_lock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
309 310
  IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
  if (*pCache == NULL) {
311 312
    pthread_mutex_unlock(&sIdx->mtx);
    return -1;
dengyihao's avatar
dengyihao 已提交
313
  }
dengyihao's avatar
dengyihao 已提交
314
  cache = *pCache;
315 316
  pthread_mutex_unlock(&sIdx->mtx);

dengyihao's avatar
dengyihao 已提交
317
  *result = taosArrayInit(4, sizeof(uint64_t));
318 319
  // TODO: iterator mem and tidex
  STermValueType s;
dengyihao's avatar
dengyihao 已提交
320
  if (0 == indexCacheSearch(cache, query, *result, &s)) {
dengyihao's avatar
dengyihao 已提交
321 322
    if (s == kTypeDeletion) {
      indexInfo("col: %s already drop by other opera", term->colName);
323
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
324 325 326
      return 0;
    } else {
      if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
327 328 329
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
        return -1;
      }
dengyihao's avatar
dengyihao 已提交
330 331 332 333 334 335
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
336
}
dengyihao's avatar
dengyihao 已提交
337
static void indexInterResultsDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
338
  if (results == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
339 340 341

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
342
    SArray* p = taosArrayGetP(results, i);
343 344
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
345 346
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
347
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) {
348
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
349
  SArray* first = taosArrayGetP(interResults, 0);
350
  taosArraySort(first, uidCompare);
dengyihao's avatar
dengyihao 已提交
351
  taosArrayRemoveDuplicate(first, uidCompare, NULL);
dengyihao's avatar
dengyihao 已提交
352

dengyihao's avatar
dengyihao 已提交
353
  if (oType == MUST) {
354 355
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
dengyihao's avatar
dengyihao 已提交
356
  } else if (oType == SHOULD) {
357 358
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
dengyihao's avatar
dengyihao 已提交
359 360
    // tag1 condistion || tag2 condition
  } else if (oType == NOT) {
361 362 363
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
    // not use currently
dengyihao's avatar
dengyihao 已提交
364 365 366
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
367

dengyihao's avatar
dengyihao 已提交
368
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
dengyihao's avatar
dengyihao 已提交
369
  if (sIdx == NULL) { return -1; }
370
  indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
371

dengyihao's avatar
dengyihao 已提交
372
  IndexCache*  pCache = (IndexCache*)cache;
dengyihao's avatar
dengyihao 已提交
373
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName);
dengyihao's avatar
dengyihao 已提交
374

dengyihao's avatar
dengyihao 已提交
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
  // handle flush
  Iterate* cacheIter = indexCacheIteratorCreate(pCache);
  Iterate* tfileIter = tfileIteratorCreate(pReader);

  SArray* result = taosArrayInit(1024, sizeof(void*));

  bool cn = cacheIter->next(cacheIter);
  bool tn = tfileIter->next(tfileIter);
  while (cn == true && tn == true) {
    IterateValue* cv = cacheIter->getValue(cacheIter);
    IterateValue* tv = tfileIter->getValue(tfileIter);

    // dump value
    int comp = strcmp(cv->colVal, tv->colVal);
    if (comp == 0) {
      TFileValue* tfv = tfileValueCreate(cv->colVal);
      taosArrayAddAll(tfv->tableId, cv->val);
      taosArrayAddAll(tfv->tableId, tv->val);
      taosArrayPush(result, &tfv);

      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
      continue;
    } else if (comp < 0) {
      TFileValue* tfv = tfileValueCreate(cv->colVal);
      taosArrayAddAll(tfv->tableId, cv->val);
      taosArrayPush(result, &tfv);

      // copy to final Result;
      cn = cacheIter->next(cacheIter);
    } else {
      TFileValue* tfv = tfileValueCreate(tv->colVal);
      taosArrayPush(result, &tfv);
      taosArrayAddAll(tfv->tableId, tv->val);
      // copy to final result
      tn = tfileIter->next(tfileIter);
    }
  }
  while (cn == true) {
    IterateValue* cv = cacheIter->getValue(cacheIter);
    TFileValue*   tfv = tfileValueCreate(cv->colVal);
    taosArrayAddAll(tfv->tableId, cv->val);
    taosArrayPush(result, &tfv);
    cn = cacheIter->next(cacheIter);
  }
  while (tn == true) {
    IterateValue* tv = tfileIter->getValue(tfileIter);
    TFileValue*   tfv = tfileValueCreate(tv->colVal);
    taosArrayAddAll(tfv->tableId, tv->val);
    taosArrayPush(result, &tfv);
    tn = tfileIter->next(tfileIter);
  }

  int32_t version = CACHE_VERSION(pCache);
  uint8_t colType = pCache->type;

  TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, pCache->colName, colType);
  if (tw == NULL) {
    indexError("faile to open file to write");
  } else {
    int ret = tfileWriterPut(tw, result);
    if (ret != 0) { indexError("faile to write into tindex "); }
  }
  // not free later, just put int table cache
  SSkipList* timm = (SSkipList*)pCache->imm;
  pCache->imm = NULL;  // or throw int bg thread
  indexCacheDestroySkiplist(timm);

  tfileWriteClose(tw);
  indexCacheIteratorDestroy(cacheIter);
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
447 448
  tfileReaderUnRef(pReader);
  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
449 450
  return 0;
}
dengyihao's avatar
dengyihao 已提交
451 452 453 454 455 456 457 458 459
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
  } else {
    taosArrayClear(value->val);
  }
  free(value->colVal);
  value->colVal = NULL;
}