index.c 18.1 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
dengyihao's avatar
dengyihao 已提交
5 6
 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
 * Software Foundation.
H
refact  
Hongze Cheng 已提交
7 8 9 10 11 12 13 14 15
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17
#include "index.h"
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
18
#include "index_cache.h"
dengyihao's avatar
dengyihao 已提交
19
#include "index_comm.h"
dengyihao's avatar
dengyihao 已提交
20
#include "index_tfile.h"
dengyihao's avatar
dengyihao 已提交
21
#include "index_util.h"
dengyihao's avatar
dengyihao 已提交
22
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
23
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
24

dengyihao's avatar
dengyihao 已提交
25 26
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
27 28
#endif

dengyihao's avatar
dengyihao 已提交
29
#define INDEX_NUM_OF_THREADS 4
dengyihao's avatar
dengyihao 已提交
30
#define INDEX_QUEUE_SIZE 200
dengyihao's avatar
dengyihao 已提交
31 32 33

void* indexQhandle = NULL;

34 35 36 37 38 39 40 41 42 43 44 45
#define INDEX_MERGE_ADD_DEL(src, dst, tgt)            \
  {                                                   \
    bool f = false;                                   \
    for (int i = 0; i < taosArrayGetSize(src); i++) { \
      if (*(uint64_t*)taosArrayGet(src, i) == tgt) {  \
        f = true;                                     \
      }                                               \
    }                                                 \
    if (f == false) {                                 \
      taosArrayPush(dst, &tgt);                       \
    }                                                 \
  }
dengyihao's avatar
dengyihao 已提交
46 47
void indexInit() {
  // refactor later
dengyihao's avatar
dengyihao 已提交
48 49
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
}
dengyihao's avatar
dengyihao 已提交
50 51 52 53
void indexCleanUp() {
  // refacto later
  taosCleanUpScheduler(indexQhandle);
}
dengyihao's avatar
dengyihao 已提交
54

dengyihao's avatar
dengyihao 已提交
55
static int uidCompare(const void* a, const void* b) {
dengyihao's avatar
dengyihao 已提交
56
  // add more version compare
dengyihao's avatar
dengyihao 已提交
57 58
  uint64_t u1 = *(uint64_t*)a;
  uint64_t u2 = *(uint64_t*)b;
dengyihao's avatar
dengyihao 已提交
59
  return u1 - u2;
dengyihao's avatar
dengyihao 已提交
60
}
dengyihao's avatar
dengyihao 已提交
61
typedef struct SIdxColInfo {
62
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
63
  int cVersion;
64
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
65

dengyihao's avatar
dengyihao 已提交
66
typedef struct SIdxTempResult {
67 68 69
  SArray* total;
  SArray* added;
  SArray* deled;
dengyihao's avatar
dengyihao 已提交
70
} SIdxTempResult;
71

dengyihao's avatar
dengyihao 已提交
72
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
73
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
74
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
75

dengyihao's avatar
dengyihao 已提交
76 77
static void indexInterResultsDestroy(SArray* results);
static int  indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult);
dengyihao's avatar
dengyihao 已提交
78

dengyihao's avatar
dengyihao 已提交
79 80
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);

dengyihao's avatar
dengyihao 已提交
81
// merge cache and tfile by opera type
dengyihao's avatar
dengyihao 已提交
82
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper);
dengyihao's avatar
dengyihao 已提交
83

dengyihao's avatar
dengyihao 已提交
84 85
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t        indexSerialKey(ICacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
86

dengyihao's avatar
dengyihao 已提交
87
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
dengyihao's avatar
dengyihao 已提交
88
  pthread_once(&isInit, indexInit);
dengyihao's avatar
dengyihao 已提交
89
  SIndex* sIdx = calloc(1, sizeof(SIndex));
dengyihao's avatar
dengyihao 已提交
90 91 92
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
93

94
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
95
  index_t* index = index_open(path);
dengyihao's avatar
dengyihao 已提交
96
  sIdx->index = index;
dengyihao's avatar
dengyihao 已提交
97
#endif
dengyihao's avatar
dengyihao 已提交
98

dengyihao's avatar
dengyihao 已提交
99
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
100 101
  // sIdx->cache = (void*)indexCacheCreate(sIdx);
  sIdx->tindex = indexTFileCreate(path);
dengyihao's avatar
dengyihao 已提交
102 103 104
  if (sIdx->tindex == NULL) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
105

106 107
  sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  sIdx->cVersion = 1;
dengyihao's avatar
dengyihao 已提交
108
  sIdx->path = tstrdup(path);
dengyihao's avatar
dengyihao 已提交
109
  pthread_mutex_init(&sIdx->mtx, NULL);
110 111
  *index = sIdx;
  return 0;
dengyihao's avatar
dengyihao 已提交
112
#endif
dengyihao's avatar
dengyihao 已提交
113

dengyihao's avatar
dengyihao 已提交
114
END:
dengyihao's avatar
dengyihao 已提交
115 116 117
  if (sIdx != NULL) {
    indexClose(sIdx);
  }
dengyihao's avatar
dengyihao 已提交
118 119 120

  *index = NULL;
  return -1;
H
refact  
Hongze Cheng 已提交
121
}
dengyihao's avatar
dengyihao 已提交
122

dengyihao's avatar
dengyihao 已提交
123
void indexClose(SIndex* sIdx) {
124 125
#ifdef USE_LUCENE
  index_close(sIdex->index);
dengyihao's avatar
dengyihao 已提交
126
  sIdx->index = NULL;
H
refact  
Hongze Cheng 已提交
127
#endif
dengyihao's avatar
dengyihao 已提交
128

dengyihao's avatar
dengyihao 已提交
129
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
130 131 132
  void* iter = taosHashIterate(sIdx->colObj, NULL);
  while (iter) {
    IndexCache** pCache = iter;
dengyihao's avatar
dengyihao 已提交
133 134 135
    if (*pCache) {
      indexCacheUnRef(*pCache);
    }
dengyihao's avatar
dengyihao 已提交
136 137
    iter = taosHashIterate(sIdx->colObj, iter);
  }
138
  taosHashCleanup(sIdx->colObj);
dengyihao's avatar
dengyihao 已提交
139
  pthread_mutex_destroy(&sIdx->mtx);
140
  indexTFileDestroy(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
141
#endif
dengyihao's avatar
dengyihao 已提交
142
  free(sIdx->path);
143
  free(sIdx);
dengyihao's avatar
dengyihao 已提交
144 145
  return;
}
dengyihao's avatar
dengyihao 已提交
146

dengyihao's avatar
dengyihao 已提交
147
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
148
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
149
  index_document_t* doc = index_document_create();
150 151 152

  char buf[16] = {0};
  sprintf(buf, "%d", uid);
dengyihao's avatar
dengyihao 已提交
153

154
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
155 156
    SIndexTerm* p = taosArrayGetP(fVals, i);
    index_document_add(doc, (const char*)(p->key), p->nKey, (const char*)(p->val), p->nVal, 1);
157 158 159 160 161
  }
  index_document_add(doc, NULL, 0, buf, strlen(buf), 0);

  index_put(index->index, doc);
  index_document_destroy(doc);
dengyihao's avatar
dengyihao 已提交
162
#endif
dengyihao's avatar
dengyihao 已提交
163

dengyihao's avatar
dengyihao 已提交
164
#ifdef USE_INVERTED_INDEX
165 166 167

  // TODO(yihao): reduce the lock range
  pthread_mutex_lock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
168
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
169 170
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
171 172 173
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
    int32_t   sz = indexSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
174 175

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
176
    if (cache == NULL) {
dengyihao's avatar
dengyihao 已提交
177 178
      IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType);
      taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
179
    }
180
  }
dengyihao's avatar
dengyihao 已提交
181
  pthread_mutex_unlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
182 183

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
184 185
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
186 187 188
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
    int32_t   sz = indexSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
189 190

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
191 192
    assert(*cache != NULL);
    int ret = indexCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
193 194 195
    if (ret != 0) {
      return ret;
    }
dengyihao's avatar
dengyihao 已提交
196
  }
dengyihao's avatar
dengyihao 已提交
197

dengyihao's avatar
dengyihao 已提交
198
#endif
dengyihao's avatar
dengyihao 已提交
199
  return 0;
dengyihao's avatar
dengyihao 已提交
200
}
dengyihao's avatar
dengyihao 已提交
201
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
202 203
#ifdef USE_LUCENE
  EIndexOperatorType opera = multiQuerys->opera;
dengyihao's avatar
dengyihao 已提交
204

205
  int    nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
206 207 208
  char** fields = malloc(sizeof(char*) * nQuery);
  char** keys = malloc(sizeof(char*) * nQuery);
  int*   types = malloc(sizeof(int) * nQuery);
dengyihao's avatar
dengyihao 已提交
209 210

  for (int i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
211 212
    SIndexTermQuery* p = taosArrayGet(multiQuerys->query, i);
    SIndexTerm*      term = p->field_value;
213 214 215 216 217 218 219 220

    fields[i] = calloc(1, term->nKey + 1);
    keys[i] = calloc(1, term->nVal + 1);

    memcpy(fields[i], term->key, term->nKey);
    memcpy(keys[i], term->val, term->nVal);
    types[i] = (int)(p->type);
  }
dengyihao's avatar
dengyihao 已提交
221
  int* tResult = NULL;
222
  int  tsz = 0;
dengyihao's avatar
dengyihao 已提交
223
  index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz);
224

dengyihao's avatar
dengyihao 已提交
225 226 227
  for (int i = 0; i < tsz; i++) {
    taosArrayPush(result, &tResult[i]);
  }
dengyihao's avatar
dengyihao 已提交
228 229 230 231 232 233 234 235

  for (int i = 0; i < nQuery; i++) {
    free(fields[i]);
    free(keys[i]);
  }
  free(fields);
  free(keys);
  free(types);
dengyihao's avatar
dengyihao 已提交
236 237
#endif

dengyihao's avatar
dengyihao 已提交
238
#ifdef USE_INVERTED_INDEX
239 240
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
241
  SArray* interResults = taosArrayInit(4, POINTER_BYTES);
242
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
243
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
244 245
    SIndexTermQuery* qTerm = taosArrayGet(multiQuerys->query, i);
    SArray*          tResult = NULL;
246
    indexTermSearch(index, qTerm, &tResult);
dengyihao's avatar
dengyihao 已提交
247
    taosArrayPush(interResults, (void*)&tResult);
248
  }
dengyihao's avatar
dengyihao 已提交
249 250
  indexMergeFinalResults(interResults, opera, result);
  indexInterResultsDestroy(interResults);
251

dengyihao's avatar
dengyihao 已提交
252
#endif
dengyihao's avatar
dengyihao 已提交
253
  return 0;
dengyihao's avatar
dengyihao 已提交
254 255
}

dengyihao's avatar
dengyihao 已提交
256
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
dengyihao's avatar
dengyihao 已提交
257
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
258
#endif
259

dengyihao's avatar
dengyihao 已提交
260 261
  return 1;
}
dengyihao's avatar
dengyihao 已提交
262
int indexRebuild(SIndex* index, SIndexOpts* opts){
dengyihao's avatar
dengyihao 已提交
263
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
264 265 266
#endif

}
dengyihao's avatar
dengyihao 已提交
267

dengyihao's avatar
dengyihao 已提交
268
SIndexOpts* indexOptsCreate() {
269
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
270
#endif
271
  return NULL;
dengyihao's avatar
dengyihao 已提交
272
}
dengyihao's avatar
dengyihao 已提交
273
void indexOptsDestroy(SIndexOpts* opts) {
274
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
275
#endif
dengyihao's avatar
dengyihao 已提交
276 277 278 279 280 281
  return;
}
/*
 * @param: oper
 *
 */
dengyihao's avatar
dengyihao 已提交
282 283
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
  SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
dengyihao's avatar
dengyihao 已提交
284 285 286
  if (p == NULL) {
    return NULL;
  }
287 288
  p->opera = opera;
  p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
dengyihao's avatar
dengyihao 已提交
289 290
  return p;
}
dengyihao's avatar
dengyihao 已提交
291
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
292
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
293
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
294
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
295
  }
296
  taosArrayDestroy(pQuery->query);
dengyihao's avatar
dengyihao 已提交
297 298
  free(pQuery);
};
dengyihao's avatar
dengyihao 已提交
299
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
300
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
301 302
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
303 304
}

dengyihao's avatar
dengyihao 已提交
305 306
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
                            int32_t nColName, const char* colVal, int32_t nColVal) {
dengyihao's avatar
dengyihao 已提交
307
  SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
dengyihao's avatar
dengyihao 已提交
308 309 310
  if (t == NULL) {
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
311

312 313
  t->suid = suid;
  t->operType = oper;
dengyihao's avatar
dengyihao 已提交
314 315
  t->colType = colType;

dengyihao's avatar
dengyihao 已提交
316
  t->colName = (char*)calloc(1, nColName + 1);
dengyihao's avatar
dengyihao 已提交
317 318
  memcpy(t->colName, colName, nColName);
  t->nColName = nColName;
dengyihao's avatar
dengyihao 已提交
319

dengyihao's avatar
dengyihao 已提交
320
  t->colVal = (char*)calloc(1, nColVal + 1);
dengyihao's avatar
dengyihao 已提交
321 322
  memcpy(t->colVal, colVal, nColVal);
  t->nColVal = nColVal;
dengyihao's avatar
dengyihao 已提交
323
  return t;
dengyihao's avatar
dengyihao 已提交
324
}
dengyihao's avatar
dengyihao 已提交
325
void indexTermDestroy(SIndexTerm* p) {
dengyihao's avatar
dengyihao 已提交
326 327
  free(p->colName);
  free(p->colVal);
dengyihao's avatar
dengyihao 已提交
328
  free(p);
dengyihao's avatar
dengyihao 已提交
329 330
}

dengyihao's avatar
dengyihao 已提交
331
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
332

dengyihao's avatar
dengyihao 已提交
333
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
334 335
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
336
}
dengyihao's avatar
dengyihao 已提交
337
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
338
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
339
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
340 341
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
342
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
343
}
dengyihao's avatar
dengyihao 已提交
344

dengyihao's avatar
dengyihao 已提交
345 346 347
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
348
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
349

dengyihao's avatar
dengyihao 已提交
350 351
  // Get col info
  IndexCache* cache = NULL;
dengyihao's avatar
dengyihao 已提交
352 353

  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
354 355 356
  ICacheKey key = {
      .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
  int32_t sz = indexSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
357

dengyihao's avatar
dengyihao 已提交
358
  pthread_mutex_lock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
359
  IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
360
  cache = (pCache == NULL) ? NULL : *pCache;
361 362
  pthread_mutex_unlock(&sIdx->mtx);

dengyihao's avatar
dengyihao 已提交
363
  *result = taosArrayInit(4, sizeof(uint64_t));
364
  // TODO: iterator mem and tidex
dengyihao's avatar
dengyihao 已提交
365
  STermValueType s = kTypeValue;
dengyihao's avatar
dengyihao 已提交
366
  if (0 == indexCacheSearch(cache, query, *result, &s)) {
dengyihao's avatar
dengyihao 已提交
367
    if (s == kTypeDeletion) {
dengyihao's avatar
dengyihao 已提交
368
      indexInfo("col: %s already drop by", term->colName);
369
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
370 371 372
      return 0;
    } else {
      if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
373 374 375
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
        return -1;
      }
dengyihao's avatar
dengyihao 已提交
376 377 378 379 380 381
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
382
}
dengyihao's avatar
dengyihao 已提交
383
static void indexInterResultsDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
384 385 386
  if (results == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
387 388 389

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
390
    SArray* p = taosArrayGetP(results, i);
391 392
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
393 394
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
395

dengyihao's avatar
dengyihao 已提交
396
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) {
397
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
398 399 400 401 402
  for (int i = 0; i < taosArrayGetSize(interResults); i--) {
    SArray* t = taosArrayGetP(interResults, i);
    taosArraySort(t, uidCompare);
    taosArrayRemoveDuplicate(t, uidCompare, NULL);
  }
dengyihao's avatar
dengyihao 已提交
403

dengyihao's avatar
dengyihao 已提交
404
  if (oType == MUST) {
dengyihao's avatar
dengyihao 已提交
405
    iIntersection(interResults, fResults);
dengyihao's avatar
dengyihao 已提交
406
  } else if (oType == SHOULD) {
dengyihao's avatar
dengyihao 已提交
407
    iUnion(interResults, fResults);
dengyihao's avatar
dengyihao 已提交
408
  } else if (oType == NOT) {
409
    // just one column index, enhance later
dengyihao's avatar
dengyihao 已提交
410
    taosArrayAddAll(fResults, interResults);
411
    // not use currently
dengyihao's avatar
dengyihao 已提交
412 413 414
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
415

dengyihao's avatar
dengyihao 已提交
416 417 418 419 420 421
SIdxTempResult* sIdxTempResultCreate() {
  SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult));
  tr->total = taosArrayInit(4, sizeof(uint64_t));
  tr->added = taosArrayInit(4, sizeof(uint64_t));
  tr->deled = taosArrayInit(4, sizeof(uint64_t));
  return tr;
422
}
dengyihao's avatar
dengyihao 已提交
423 424
void sIdxTempResultClear(SIdxTempResult* tr) {
  if (tr == NULL) {
425 426
    return;
  }
dengyihao's avatar
dengyihao 已提交
427 428 429
  taosArrayClear(tr->total);
  taosArrayClear(tr->added);
  taosArrayClear(tr->deled);
430
}
dengyihao's avatar
dengyihao 已提交
431 432
void sIdxTempResultDestroy(SIdxTempResult* tr) {
  if (tr == NULL) {
433 434
    return;
  }
dengyihao's avatar
dengyihao 已提交
435 436 437
  taosArrayDestroy(tr->total);
  taosArrayDestroy(tr->added);
  taosArrayDestroy(tr->deled);
438
}
dengyihao's avatar
dengyihao 已提交
439 440 441 442
static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) {
  taosArraySort(tr->total, uidCompare);
  taosArraySort(tr->added, uidCompare);
  taosArraySort(tr->deled, uidCompare);
443 444

  SArray* arrs = taosArrayInit(2, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
445 446
  taosArrayPush(arrs, &tr->total);
  taosArrayPush(arrs, &tr->added);
447 448 449 450

  iUnion(arrs, result);
  taosArrayDestroy(arrs);

dengyihao's avatar
dengyihao 已提交
451
  iExcept(result, tr->deled);
452
}
dengyihao's avatar
dengyihao 已提交
453
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) {
454 455 456 457
  int32_t sz = taosArrayGetSize(result);
  if (sz > 0) {
    TFileValue* lv = taosArrayGetP(result, sz - 1);
    if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
dengyihao's avatar
dengyihao 已提交
458 459
      sIdxTempResultMergeTo(lv->tableId, tr);
      sIdxTempResultClear(tr);
460 461 462

      taosArrayPush(result, &tfv);
    } else if (tfv == NULL) {
dengyihao's avatar
dengyihao 已提交
463 464
      // handle last iterator
      sIdxTempResultMergeTo(lv->tableId, tr);
465
    } else {
dengyihao's avatar
dengyihao 已提交
466
      // temp result saved in help
467 468 469 470 471 472
      tfileValueDestroy(tfv);
    }
  } else {
    taosArrayPush(result, &tfv);
  }
}
dengyihao's avatar
dengyihao 已提交
473
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) {
474
  char*       colVal = (cv != NULL) ? cv->colVal : tv->colVal;
dengyihao's avatar
dengyihao 已提交
475
  TFileValue* tfv = tfileValueCreate(colVal);
476

dengyihao's avatar
dengyihao 已提交
477
  indexMayMergeTempToFinalResult(result, tfv, tr);
478

dengyihao's avatar
dengyihao 已提交
479
  if (cv != NULL) {
480
    uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
dengyihao's avatar
dengyihao 已提交
481
    if (cv->type == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
482
      INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id)
dengyihao's avatar
dengyihao 已提交
483
    } else if (cv->type == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
484
      INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id)
dengyihao's avatar
dengyihao 已提交
485 486 487
    }
  }
  if (tv != NULL) {
dengyihao's avatar
dengyihao 已提交
488
    taosArrayAddAll(tr->total, tv->val);
dengyihao's avatar
dengyihao 已提交
489 490
  }
}
dengyihao's avatar
dengyihao 已提交
491
static void indexDestroyFinalResult(SArray* result) {
dengyihao's avatar
dengyihao 已提交
492 493 494 495 496 497 498
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  for (size_t i = 0; i < sz; i++) {
    TFileValue* tv = taosArrayGetP(result, i);
    tfileValueDestroy(tv);
  }
  taosArrayDestroy(result);
}
499

dengyihao's avatar
dengyihao 已提交
500
int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
dengyihao's avatar
dengyihao 已提交
501 502 503
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
504
  indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
505

dengyihao's avatar
dengyihao 已提交
506 507
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
508
  IndexCache*  pCache = (IndexCache*)cache;
dengyihao's avatar
dengyihao 已提交
509
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
dengyihao's avatar
dengyihao 已提交
510 511 512
  if (pReader == NULL) {
    indexWarn("empty tfile reader found");
  }
dengyihao's avatar
dengyihao 已提交
513 514 515
  // handle flush
  Iterate* cacheIter = indexCacheIteratorCreate(pCache);
  Iterate* tfileIter = tfileIteratorCreate(pReader);
dengyihao's avatar
dengyihao 已提交
516 517 518
  if (tfileIter == NULL) {
    indexWarn("empty tfile reader iterator");
  }
dengyihao's avatar
dengyihao 已提交
519 520 521

  SArray* result = taosArrayInit(1024, sizeof(void*));

dengyihao's avatar
dengyihao 已提交
522 523
  bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
  bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
524

dengyihao's avatar
dengyihao 已提交
525
  SIdxTempResult* tr = sIdxTempResultCreate();
dengyihao's avatar
dengyihao 已提交
526 527 528 529 530 531 532 533 534 535 536 537
  while (cn == true || tn == true) {
    IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
    IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;

    int comp = 0;
    if (cn == true && tn == true) {
      comp = strcmp(cv->colVal, tv->colVal);
    } else if (cn == true) {
      comp = -1;
    } else {
      comp = 1;
    }
dengyihao's avatar
dengyihao 已提交
538
    if (comp == 0) {
dengyihao's avatar
dengyihao 已提交
539
      indexMergeCacheAndTFile(result, cv, tv, tr);
dengyihao's avatar
dengyihao 已提交
540 541 542
      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
    } else if (comp < 0) {
dengyihao's avatar
dengyihao 已提交
543
      indexMergeCacheAndTFile(result, cv, NULL, tr);
dengyihao's avatar
dengyihao 已提交
544 545
      cn = cacheIter->next(cacheIter);
    } else {
dengyihao's avatar
dengyihao 已提交
546
      indexMergeCacheAndTFile(result, NULL, tv, tr);
dengyihao's avatar
dengyihao 已提交
547 548 549
      tn = tfileIter->next(tfileIter);
    }
  }
dengyihao's avatar
dengyihao 已提交
550 551
  indexMayMergeTempToFinalResult(result, NULL, tr);
  sIdxTempResultDestroy(tr);
552

dengyihao's avatar
dengyihao 已提交
553
  int ret = indexGenTFile(sIdx, pCache, result);
dengyihao's avatar
dengyihao 已提交
554
  indexDestroyFinalResult(result);
dengyihao's avatar
dengyihao 已提交
555

dengyihao's avatar
dengyihao 已提交
556
  indexCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
557 558 559 560

  indexCacheIteratorDestroy(cacheIter);
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
561 562
  tfileReaderUnRef(pReader);
  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
563 564 565 566 567 568 569 570

  int64_t cost = taosGetTimestampUs() - st;
  if (ret != 0) {
    indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
  } else {
    indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
  }
  return ret;
dengyihao's avatar
dengyihao 已提交
571
}
dengyihao's avatar
dengyihao 已提交
572 573 574
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
dengyihao's avatar
dengyihao 已提交
575
    value->val = NULL;
dengyihao's avatar
dengyihao 已提交
576
  } else {
dengyihao's avatar
dengyihao 已提交
577 578 579
    if (value->val != NULL) {
      taosArrayClear(value->val);
    }
dengyihao's avatar
dengyihao 已提交
580
  }
581
  free(value->colVal);
dengyihao's avatar
dengyihao 已提交
582 583
  value->colVal = NULL;
}
dengyihao's avatar
dengyihao 已提交
584 585 586 587
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
  int32_t version = CACHE_VERSION(cache);
  uint8_t colType = cache->type;

dengyihao's avatar
dengyihao 已提交
588
  TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
dengyihao's avatar
dengyihao 已提交
589 590 591 592 593 594 595 596 597 598 599 600
  if (tw == NULL) {
    indexError("failed to open file to write");
    return -1;
  }

  int ret = tfileWriterPut(tw, batch, true);
  if (ret != 0) {
    indexError("failed to write into tindex ");
    goto END;
  }
  tfileWriterClose(tw);

dengyihao's avatar
dengyihao 已提交
601
  TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
dengyihao's avatar
dengyihao 已提交
602 603 604
  if (reader == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
605 606

  TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
607
  ICacheKey    key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
608 609 610 611 612 613 614

  pthread_mutex_lock(&sIdx->mtx);
  IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
  tfileCachePut(ifile->cache, &key, reader);
  pthread_mutex_unlock(&sIdx->mtx);
  return ret;
END:
dengyihao's avatar
dengyihao 已提交
615 616 617 618
  if (tw != NULL) {
    writerCtxDestroy(tw->ctx, true);
    free(tw);
  }
dengyihao's avatar
dengyihao 已提交
619
  return -1;
dengyihao's avatar
dengyihao 已提交
620
}
dengyihao's avatar
dengyihao 已提交
621

dengyihao's avatar
dengyihao 已提交
622 623
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
624

dengyihao's avatar
dengyihao 已提交
625 626 627 628 629
  char* p = buf;
  SERIALIZE_MEM_TO_BUF(buf, key, suid);
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
  // SERIALIZE_MEM_TO_BUF(buf, key, colType);
  // SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
630 631 632 633 634
  if (hasJson) {
    SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
  } else {
    SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
  }
dengyihao's avatar
dengyihao 已提交
635 636
  return buf - p;
}