index.c 18.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
dengyihao's avatar
dengyihao 已提交
5 6
 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
 * Software Foundation.
H
refact  
Hongze Cheng 已提交
7 8 9 10 11 12 13 14 15
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include "indexCache.h"
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
20 21
#include "indexTfile.h"
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
22 23
#include "tcoding.h"
#include "tdataformat.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
26
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
27

dengyihao's avatar
dengyihao 已提交
28 29
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
30 31
#endif

dengyihao's avatar
dengyihao 已提交
32
#define INDEX_NUM_OF_THREADS 5
dengyihao's avatar
dengyihao 已提交
33
#define INDEX_QUEUE_SIZE     200
dengyihao's avatar
dengyihao 已提交
34

dengyihao's avatar
dengyihao 已提交
35 36 37 38 39
#define INDEX_DATA_BOOL_NULL      0x02
#define INDEX_DATA_TINYINT_NULL   0x80
#define INDEX_DATA_SMALLINT_NULL  0x8000
#define INDEX_DATA_INT_NULL       0x80000000L
#define INDEX_DATA_BIGINT_NULL    0x8000000000000000L
dengyihao's avatar
dengyihao 已提交
40 41
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL

dengyihao's avatar
dengyihao 已提交
42 43 44 45 46 47
#define INDEX_DATA_FLOAT_NULL    0x7FF00000           // it is an NAN
#define INDEX_DATA_DOUBLE_NULL   0x7FFFFF0000000000L  // an NAN
#define INDEX_DATA_NCHAR_NULL    0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL   0xFF
#define INDEX_DATA_JSON_NULL     0xFFFFFFFF
#define INDEX_DATA_JSON_null     0xFFFFFFFE
dengyihao's avatar
dengyihao 已提交
48 49
#define INDEX_DATA_JSON_NOT_NULL 0x01

dengyihao's avatar
dengyihao 已提交
50
#define INDEX_DATA_UTINYINT_NULL  0xFF
dengyihao's avatar
dengyihao 已提交
51
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
dengyihao's avatar
dengyihao 已提交
52 53
#define INDEX_DATA_UINT_NULL      0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL   0xFFFFFFFFFFFFFFFFL
dengyihao's avatar
dengyihao 已提交
54

dengyihao's avatar
dengyihao 已提交
55
#define INDEX_DATA_NULL_STR   "NULL"
dengyihao's avatar
dengyihao 已提交
56 57
#define INDEX_DATA_NULL_STR_L "null"

dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
void*   indexQhandle = NULL;
int32_t indexRefMgt;

static void indexDestroy(void* sIdx);

dengyihao's avatar
dengyihao 已提交
63 64
void indexInit() {
  // refactor later
dengyihao's avatar
dengyihao 已提交
65
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
dengyihao's avatar
dengyihao 已提交
66
  indexRefMgt = taosOpenRef(10, indexDestroy);
dengyihao's avatar
dengyihao 已提交
67
}
dengyihao's avatar
dengyihao 已提交
68 69 70 71
void indexCleanUp() {
  // refacto later
  taosCleanUpScheduler(indexQhandle);
}
dengyihao's avatar
dengyihao 已提交
72

dengyihao's avatar
dengyihao 已提交
73
typedef struct SIdxColInfo {
74
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
75
  int cVersion;
76
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
77

wafwerar's avatar
wafwerar 已提交
78
static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
79
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
80
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
81

dengyihao's avatar
dengyihao 已提交
82
static void indexInterResultsDestroy(SArray* results);
dengyihao's avatar
dengyihao 已提交
83
static int  indexMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
dengyihao's avatar
dengyihao 已提交
84

dengyihao's avatar
dengyihao 已提交
85 86
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);

dengyihao's avatar
dengyihao 已提交
87
// merge cache and tfile by opera type
dengyihao's avatar
dengyihao 已提交
88
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
dengyihao's avatar
dengyihao 已提交
89

dengyihao's avatar
dengyihao 已提交
90 91
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t        indexSerialKey(ICacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
92

dengyihao's avatar
dengyihao 已提交
93 94 95 96 97 98 99 100 101
static void indexPost(void* idx) {
  SIndex* pIdx = idx;
  tsem_post(&pIdx->sem);
}
static void indexWait(void* idx) {
  SIndex* pIdx = idx;
  tsem_wait(&pIdx->sem);
}

dengyihao's avatar
dengyihao 已提交
102
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
wafwerar's avatar
wafwerar 已提交
103
  taosThreadOnce(&isInit, indexInit);
wafwerar's avatar
wafwerar 已提交
104
  SIndex* sIdx = taosMemoryCalloc(1, sizeof(SIndex));
dengyihao's avatar
dengyihao 已提交
105 106 107
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
108

dengyihao's avatar
dengyihao 已提交
109 110
  // sIdx->cache = (void*)indexCacheCreate(sIdx);
  sIdx->tindex = indexTFileCreate(path);
dengyihao's avatar
dengyihao 已提交
111 112 113
  if (sIdx->tindex == NULL) {
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
114

115 116
  sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  sIdx->cVersion = 1;
dengyihao's avatar
dengyihao 已提交
117
  sIdx->path = tstrdup(path);
wafwerar's avatar
wafwerar 已提交
118
  taosThreadMutexInit(&sIdx->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
119
  tsem_init(&sIdx->sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
120 121

  sIdx->refId = indexAddRef(sIdx);
dengyihao's avatar
dengyihao 已提交
122
  indexAcquireRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
123

124 125
  *index = sIdx;
  return 0;
dengyihao's avatar
dengyihao 已提交
126

dengyihao's avatar
dengyihao 已提交
127
END:
dengyihao's avatar
dengyihao 已提交
128 129 130
  if (sIdx != NULL) {
    indexClose(sIdx);
  }
dengyihao's avatar
dengyihao 已提交
131 132
  *index = NULL;
  return -1;
H
refact  
Hongze Cheng 已提交
133
}
dengyihao's avatar
dengyihao 已提交
134

dengyihao's avatar
dengyihao 已提交
135 136
void indexDestroy(void* handle) {
  SIndex* sIdx = handle;
dengyihao's avatar
dengyihao 已提交
137
  taosThreadMutexDestroy(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
138
  tsem_destroy(&sIdx->sem);
dengyihao's avatar
dengyihao 已提交
139 140 141 142 143 144 145
  indexTFileDestroy(sIdx->tindex);
  taosMemoryFree(sIdx->path);
  taosMemoryFree(sIdx);
  return;
}
void indexClose(SIndex* sIdx) {
  bool ref = 0;
dengyihao's avatar
dengyihao 已提交
146 147 148 149 150
  if (sIdx->colObj != NULL) {
    void* iter = taosHashIterate(sIdx->colObj, NULL);
    while (iter) {
      IndexCache** pCache = iter;
      indexCacheForceToMerge((void*)(*pCache));
151
      indexInfo("%s wait to merge", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
152
      indexWait((void*)(sIdx));
dengyihao's avatar
dengyihao 已提交
153
      indexInfo("%s finish to wait", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
154
      iter = taosHashIterate(sIdx->colObj, iter);
dengyihao's avatar
dengyihao 已提交
155 156
      indexCacheUnRef(*pCache);
    }
dengyihao's avatar
dengyihao 已提交
157 158
    taosHashCleanup(sIdx->colObj);
    sIdx->colObj = NULL;
dengyihao's avatar
dengyihao 已提交
159
  }
160
  indexReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
  indexRemoveRef(sIdx->refId);
}
int64_t indexAddRef(void* p) {
  // impl
  return taosAddRef(indexRefMgt, p);
}
int32_t indexRemoveRef(int64_t ref) {
  // impl later
  return taosRemoveRef(indexRefMgt, ref);
}

void indexAcquireRef(int64_t ref) {
  // impl
  taosAcquireRef(indexRefMgt, ref);
}
void indexReleaseRef(int64_t ref) {
  // impl
  taosReleaseRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
180

dengyihao's avatar
dengyihao 已提交
181
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
182
  // TODO(yihao): reduce the lock range
wafwerar's avatar
wafwerar 已提交
183
  taosThreadMutexLock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
184
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
185 186
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
187 188 189
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
    int32_t   sz = indexSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
190 191

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
192
    if (cache == NULL) {
dengyihao's avatar
dengyihao 已提交
193 194
      IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType);
      taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
195
    }
196
  }
dengyihao's avatar
dengyihao 已提交
197
  taosThreadMutexUnlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
198 199

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
200 201
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
202 203 204
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
    int32_t   sz = indexSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
205
    indexDebug("w suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
206 207

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
208 209
    assert(*cache != NULL);
    int ret = indexCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
210 211 212
    if (ret != 0) {
      return ret;
    }
dengyihao's avatar
dengyihao 已提交
213
  }
dengyihao's avatar
dengyihao 已提交
214
  return 0;
dengyihao's avatar
dengyihao 已提交
215
}
dengyihao's avatar
dengyihao 已提交
216
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
217 218
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
219
  SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
220
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
221
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
222 223 224 225
    SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
    SArray*          trslt = NULL;
    indexTermSearch(index, qterm, &trslt);
    taosArrayPush(iRslts, (void*)&trslt);
226
  }
dengyihao's avatar
dengyihao 已提交
227 228
  indexMergeFinalResults(iRslts, opera, result);
  indexInterResultsDestroy(iRslts);
dengyihao's avatar
dengyihao 已提交
229
  return 0;
dengyihao's avatar
dengyihao 已提交
230 231
}

232 233
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
dengyihao's avatar
dengyihao 已提交
234

235 236
SIndexOpts* indexOptsCreate() { return NULL; }
void        indexOptsDestroy(SIndexOpts* opts) { return; }
dengyihao's avatar
dengyihao 已提交
237 238 239 240
/*
 * @param: oper
 *
 */
dengyihao's avatar
dengyihao 已提交
241
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
dengyihao's avatar
dengyihao 已提交
242 243
  SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
  if (mtq == NULL) {
dengyihao's avatar
dengyihao 已提交
244 245
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
246 247 248
  mtq->opera = opera;
  mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
  return mtq;
dengyihao's avatar
dengyihao 已提交
249
}
dengyihao's avatar
dengyihao 已提交
250
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
251
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
252
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
253
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
254
  }
255
  taosArrayDestroy(pQuery->query);
wafwerar's avatar
wafwerar 已提交
256
  taosMemoryFree(pQuery);
dengyihao's avatar
dengyihao 已提交
257
};
dengyihao's avatar
dengyihao 已提交
258
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
259
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
260 261
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
262 263
}

dengyihao's avatar
dengyihao 已提交
264 265
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
                            int32_t nColName, const char* colVal, int32_t nColVal) {
dengyihao's avatar
dengyihao 已提交
266 267
  SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
  if (tm == NULL) {
dengyihao's avatar
dengyihao 已提交
268 269
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
270

dengyihao's avatar
dengyihao 已提交
271 272 273
  tm->suid = suid;
  tm->operType = oper;
  tm->colType = colType;
dengyihao's avatar
dengyihao 已提交
274

dengyihao's avatar
dengyihao 已提交
275
#if 0
dengyihao's avatar
dengyihao 已提交
276 277 278
  tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
  memcpy(tm->colName, colName, nColName);
  tm->nColName = nColName;
dengyihao's avatar
dengyihao 已提交
279

dengyihao's avatar
dengyihao 已提交
280 281 282
  tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
  memcpy(tm->colVal, colVal, nColVal);
  tm->nColVal = nColVal;
dengyihao's avatar
dengyihao 已提交
283 284 285 286 287 288 289 290 291
#endif

#if 1

  tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
  memcpy(tm->colName, colName, nColName);
  tm->nColName = nColName;

  char*   buf = NULL;
292
  int32_t len = indexConvertDataToStr((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf);
dengyihao's avatar
dengyihao 已提交
293 294 295 296 297 298
  assert(len != -1);

  tm->colVal = buf;
  tm->nColVal = len;

#endif
dengyihao's avatar
dengyihao 已提交
299 300

  return tm;
dengyihao's avatar
dengyihao 已提交
301
}
dengyihao's avatar
dengyihao 已提交
302
void indexTermDestroy(SIndexTerm* p) {
wafwerar's avatar
wafwerar 已提交
303 304 305
  taosMemoryFree(p->colName);
  taosMemoryFree(p->colVal);
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
306 307
}

dengyihao's avatar
dengyihao 已提交
308
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
309

dengyihao's avatar
dengyihao 已提交
310
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
311 312
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
313
}
dengyihao's avatar
dengyihao 已提交
314
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
315
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
316
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
317 318
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
319
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
320
}
dengyihao's avatar
dengyihao 已提交
321

dengyihao's avatar
dengyihao 已提交
322 323 324
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
325
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
326

dengyihao's avatar
dengyihao 已提交
327 328
  // Get col info
  IndexCache* cache = NULL;
dengyihao's avatar
dengyihao 已提交
329 330

  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
331 332
  ICacheKey key = {
      .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
dengyihao's avatar
dengyihao 已提交
333
  indexDebug("r suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
334
  int32_t sz = indexSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
335

wafwerar's avatar
wafwerar 已提交
336
  taosThreadMutexLock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
337
  IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
338
  cache = (pCache == NULL) ? NULL : *pCache;
wafwerar's avatar
wafwerar 已提交
339
  taosThreadMutexUnlock(&sIdx->mtx);
340

dengyihao's avatar
dengyihao 已提交
341
  *result = taosArrayInit(4, sizeof(uint64_t));
342
  // TODO: iterator mem and tidex
dengyihao's avatar
dengyihao 已提交
343
  STermValueType s = kTypeValue;
dengyihao's avatar
dengyihao 已提交
344

dengyihao's avatar
add UT  
dengyihao 已提交
345 346
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
347
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
348
  if (0 == indexCacheSearch(cache, query, tr, &s)) {
dengyihao's avatar
dengyihao 已提交
349
    if (s == kTypeDeletion) {
dengyihao's avatar
dengyihao 已提交
350
      indexInfo("col: %s already drop by", term->colName);
351
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
352 353
      return 0;
    } else {
dengyihao's avatar
add UT  
dengyihao 已提交
354
      st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
355
      if (0 != indexTFileSearch(sIdx->tindex, query, tr)) {
356
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
357
        goto END;
358
      }
dengyihao's avatar
add UT  
dengyihao 已提交
359 360
      int64_t tfCost = taosGetTimestampUs() - st;
      indexInfo("tfile search cost: %" PRIu64 "us", tfCost);
dengyihao's avatar
dengyihao 已提交
361 362 363
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
364
    goto END;
dengyihao's avatar
dengyihao 已提交
365
  }
dengyihao's avatar
add UT  
dengyihao 已提交
366 367
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("search cost: %" PRIu64 "us", cost);
dengyihao's avatar
dengyihao 已提交
368

dengyihao's avatar
dengyihao 已提交
369
  idxTRsltMergeTo(tr, *result);
dengyihao's avatar
add UT  
dengyihao 已提交
370

dengyihao's avatar
dengyihao 已提交
371
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
372
  return 0;
dengyihao's avatar
dengyihao 已提交
373
END:
dengyihao's avatar
dengyihao 已提交
374
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
375
  return -1;
dengyihao's avatar
dengyihao 已提交
376
}
dengyihao's avatar
dengyihao 已提交
377
static void indexInterResultsDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
378 379 380
  if (results == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
381 382 383

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
384
    SArray* p = taosArrayGetP(results, i);
385 386
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
387 388
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
389

dengyihao's avatar
dengyihao 已提交
390
static int indexMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
391
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
392 393
  for (int i = 0; i < taosArrayGetSize(in); i--) {
    SArray* t = taosArrayGetP(in, i);
dengyihao's avatar
dengyihao 已提交
394 395 396
    taosArraySort(t, uidCompare);
    taosArrayRemoveDuplicate(t, uidCompare, NULL);
  }
dengyihao's avatar
dengyihao 已提交
397

dengyihao's avatar
dengyihao 已提交
398
  if (oType == MUST) {
dengyihao's avatar
dengyihao 已提交
399
    iIntersection(in, out);
dengyihao's avatar
dengyihao 已提交
400
  } else if (oType == SHOULD) {
dengyihao's avatar
dengyihao 已提交
401
    iUnion(in, out);
dengyihao's avatar
dengyihao 已提交
402
  } else if (oType == NOT) {
403
    // just one column index, enhance later
dengyihao's avatar
dengyihao 已提交
404
    // taosArrayAddAll(fResults, interResults);
405
    // not use currently
dengyihao's avatar
dengyihao 已提交
406 407 408
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
409

dengyihao's avatar
dengyihao 已提交
410
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
411 412 413 414
  int32_t sz = taosArrayGetSize(result);
  if (sz > 0) {
    TFileValue* lv = taosArrayGetP(result, sz - 1);
    if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
dengyihao's avatar
dengyihao 已提交
415 416
      idxTRsltMergeTo(tr, lv->tableId);
      idxTRsltClear(tr);
417 418 419

      taosArrayPush(result, &tfv);
    } else if (tfv == NULL) {
dengyihao's avatar
dengyihao 已提交
420
      // handle last iterator
dengyihao's avatar
dengyihao 已提交
421
      idxTRsltMergeTo(tr, lv->tableId);
422
    } else {
dengyihao's avatar
dengyihao 已提交
423
      // temp result saved in help
424 425 426 427 428 429
      tfileValueDestroy(tfv);
    }
  } else {
    taosArrayPush(result, &tfv);
  }
}
dengyihao's avatar
dengyihao 已提交
430
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
431
  char*       colVal = (cv != NULL) ? cv->colVal : tv->colVal;
dengyihao's avatar
dengyihao 已提交
432
  TFileValue* tfv = tfileValueCreate(colVal);
433

dengyihao's avatar
dengyihao 已提交
434
  indexMayMergeTempToFinalResult(result, tfv, tr);
435

dengyihao's avatar
dengyihao 已提交
436
  if (cv != NULL) {
437
    uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
dengyihao's avatar
dengyihao 已提交
438
    uint32_t ver = cv->ver;
dengyihao's avatar
dengyihao 已提交
439
    if (cv->type == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
440
      INDEX_MERGE_ADD_DEL(tr->del, tr->add, id)
dengyihao's avatar
dengyihao 已提交
441
    } else if (cv->type == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
442
      INDEX_MERGE_ADD_DEL(tr->add, tr->del, id)
dengyihao's avatar
dengyihao 已提交
443 444 445
    }
  }
  if (tv != NULL) {
dengyihao's avatar
dengyihao 已提交
446
    taosArrayAddAll(tr->total, tv->val);
dengyihao's avatar
dengyihao 已提交
447 448
  }
}
dengyihao's avatar
dengyihao 已提交
449
static void indexDestroyFinalResult(SArray* result) {
dengyihao's avatar
dengyihao 已提交
450 451 452 453 454 455 456
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  for (size_t i = 0; i < sz; i++) {
    TFileValue* tv = taosArrayGetP(result, i);
    tfileValueDestroy(tv);
  }
  taosArrayDestroy(result);
}
457

dengyihao's avatar
dengyihao 已提交
458
int indexFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
dengyihao's avatar
dengyihao 已提交
459 460 461
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
462
  indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
463

dengyihao's avatar
dengyihao 已提交
464 465
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
466 467
  IndexCache* pCache = (IndexCache*)cache;

dengyihao's avatar
dengyihao 已提交
468
  while (quit && atomic_load_32(&pCache->merging) == 1) {
dengyihao's avatar
dengyihao 已提交
469
  }
dengyihao's avatar
dengyihao 已提交
470
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
dengyihao's avatar
dengyihao 已提交
471 472 473
  if (pReader == NULL) {
    indexWarn("empty tfile reader found");
  }
dengyihao's avatar
dengyihao 已提交
474 475
  // handle flush
  Iterate* cacheIter = indexCacheIteratorCreate(pCache);
dengyihao's avatar
dengyihao 已提交
476 477 478 479
  if (cacheIter == NULL) {
    indexError("%p immtable is empty, ignore merge opera", pCache);
    indexCacheDestroyImm(pCache);
    tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
480 481
    atomic_store_32(&pCache->merging, 0);
    if (quit) {
dengyihao's avatar
dengyihao 已提交
482
      indexPost(sIdx);
dengyihao's avatar
dengyihao 已提交
483
    }
dengyihao's avatar
dengyihao 已提交
484 485 486 487
    indexReleaseRef(sIdx->refId);
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
488
  Iterate* tfileIter = tfileIteratorCreate(pReader);
dengyihao's avatar
dengyihao 已提交
489 490 491
  if (tfileIter == NULL) {
    indexWarn("empty tfile reader iterator");
  }
dengyihao's avatar
dengyihao 已提交
492 493 494

  SArray* result = taosArrayInit(1024, sizeof(void*));

dengyihao's avatar
dengyihao 已提交
495 496
  bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
  bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
497

dengyihao's avatar
dengyihao 已提交
498
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
499 500 501 502 503 504 505 506 507 508 509 510
  while (cn == true || tn == true) {
    IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
    IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;

    int comp = 0;
    if (cn == true && tn == true) {
      comp = strcmp(cv->colVal, tv->colVal);
    } else if (cn == true) {
      comp = -1;
    } else {
      comp = 1;
    }
dengyihao's avatar
dengyihao 已提交
511
    if (comp == 0) {
dengyihao's avatar
dengyihao 已提交
512
      indexMergeCacheAndTFile(result, cv, tv, tr);
dengyihao's avatar
dengyihao 已提交
513 514 515
      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
    } else if (comp < 0) {
dengyihao's avatar
dengyihao 已提交
516
      indexMergeCacheAndTFile(result, cv, NULL, tr);
dengyihao's avatar
dengyihao 已提交
517 518
      cn = cacheIter->next(cacheIter);
    } else {
dengyihao's avatar
dengyihao 已提交
519
      indexMergeCacheAndTFile(result, NULL, tv, tr);
dengyihao's avatar
dengyihao 已提交
520 521 522
      tn = tfileIter->next(tfileIter);
    }
  }
dengyihao's avatar
dengyihao 已提交
523
  indexMayMergeTempToFinalResult(result, NULL, tr);
dengyihao's avatar
dengyihao 已提交
524
  idxTRsltDestroy(tr);
525

dengyihao's avatar
dengyihao 已提交
526
  int ret = indexGenTFile(sIdx, pCache, result);
dengyihao's avatar
dengyihao 已提交
527
  indexDestroyFinalResult(result);
dengyihao's avatar
dengyihao 已提交
528

dengyihao's avatar
dengyihao 已提交
529
  indexCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
530 531 532 533

  indexCacheIteratorDestroy(cacheIter);
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
534 535
  tfileReaderUnRef(pReader);
  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
536 537 538 539 540 541 542

  int64_t cost = taosGetTimestampUs() - st;
  if (ret != 0) {
    indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
  } else {
    indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
  }
dengyihao's avatar
dengyihao 已提交
543 544
  atomic_store_32(&pCache->merging, 0);
  if (quit) {
dengyihao's avatar
dengyihao 已提交
545 546
    indexPost(sIdx);
  }
dengyihao's avatar
dengyihao 已提交
547
  indexReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
548

dengyihao's avatar
dengyihao 已提交
549
  return ret;
dengyihao's avatar
dengyihao 已提交
550
}
dengyihao's avatar
dengyihao 已提交
551 552 553
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
dengyihao's avatar
dengyihao 已提交
554
    value->val = NULL;
dengyihao's avatar
dengyihao 已提交
555
  } else {
dengyihao's avatar
dengyihao 已提交
556 557 558
    if (value->val != NULL) {
      taosArrayClear(value->val);
    }
dengyihao's avatar
dengyihao 已提交
559
  }
wafwerar's avatar
wafwerar 已提交
560
  taosMemoryFree(value->colVal);
dengyihao's avatar
dengyihao 已提交
561 562
  value->colVal = NULL;
}
563 564 565 566

static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) {
  ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
  int64_t   ver = CACHE_VERSION(cache);
dengyihao's avatar
dengyihao 已提交
567

dengyihao's avatar
dengyihao 已提交
568
  IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
569 570

  taosThreadMutexLock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
571
  TFileReader* rd = tfileCacheGet(tf->cache, &key);
dengyihao's avatar
dengyihao 已提交
572 573 574
  taosThreadMutexUnlock(&tf->mtx);

  if (rd != NULL) {
dengyihao's avatar
dengyihao 已提交
575
    ver = (ver > rd->header.version ? ver : rd->header.version) + 1;
dengyihao's avatar
dengyihao 已提交
576
    indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver);
577
  }
dengyihao's avatar
dengyihao 已提交
578
  tfileReaderUnRef(rd);
579 580
  return ver;
}
dengyihao's avatar
dengyihao 已提交
581
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
582 583
  int64_t version = indexGetAvaialbleVer(sIdx, cache);
  indexInfo("file name version: %" PRId64 "", version);
dengyihao's avatar
dengyihao 已提交
584 585
  uint8_t colType = cache->type;

dengyihao's avatar
dengyihao 已提交
586
  TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
dengyihao's avatar
dengyihao 已提交
587 588 589 590 591 592 593 594 595 596 597 598
  if (tw == NULL) {
    indexError("failed to open file to write");
    return -1;
  }

  int ret = tfileWriterPut(tw, batch, true);
  if (ret != 0) {
    indexError("failed to write into tindex ");
    goto END;
  }
  tfileWriterClose(tw);

dengyihao's avatar
dengyihao 已提交
599
  TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
dengyihao's avatar
dengyihao 已提交
600 601 602
  if (reader == NULL) {
    return -1;
  }
603
  indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
604

dengyihao's avatar
dengyihao 已提交
605 606
  IndexTFile* tf = (IndexTFile*)sIdx->tindex;

dengyihao's avatar
dengyihao 已提交
607
  TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
608
  ICacheKey    key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
609

dengyihao's avatar
dengyihao 已提交
610 611 612
  taosThreadMutexLock(&tf->mtx);
  tfileCachePut(tf->cache, &key, reader);
  taosThreadMutexUnlock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
613

dengyihao's avatar
dengyihao 已提交
614 615
  return ret;
END:
dengyihao's avatar
dengyihao 已提交
616 617
  if (tw != NULL) {
    writerCtxDestroy(tw->ctx, true);
wafwerar's avatar
wafwerar 已提交
618
    taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
619
  }
dengyihao's avatar
dengyihao 已提交
620
  return -1;
dengyihao's avatar
dengyihao 已提交
621
}
dengyihao's avatar
dengyihao 已提交
622

dengyihao's avatar
dengyihao 已提交
623 624
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
  bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
625

dengyihao's avatar
dengyihao 已提交
626
  char* p = buf;
dengyihao's avatar
dengyihao 已提交
627 628 629 630
  char  tbuf[65] = {0};
  indexInt2str((int64_t)key->suid, tbuf, 0);

  SERIALIZE_STR_VAR_TO_BUF(buf, tbuf, strlen(tbuf));
dengyihao's avatar
dengyihao 已提交
631
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
632 633 634 635 636
  if (hasJson) {
    SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
  } else {
    SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
  }
dengyihao's avatar
dengyihao 已提交
637 638
  return buf - p;
}