index.c 19.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
dengyihao's avatar
dengyihao 已提交
5 6
 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
 * Software Foundation.
H
refact  
Hongze Cheng 已提交
7 8 9 10 11 12 13 14 15
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "index.h"
dengyihao's avatar
dengyihao 已提交
17 18
#include "indexCache.h"
#include "indexComm.h"
dengyihao's avatar
dengyihao 已提交
19
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
20 21
#include "indexTfile.h"
#include "indexUtil.h"
dengyihao's avatar
dengyihao 已提交
22 23
#include "tcoding.h"
#include "tdataformat.h"
dengyihao's avatar
dengyihao 已提交
24
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
25
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
26
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
27

dengyihao's avatar
dengyihao 已提交
28 29
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
30 31
#endif

dengyihao's avatar
dengyihao 已提交
32
#define INDEX_NUM_OF_THREADS 5
dengyihao's avatar
dengyihao 已提交
33
#define INDEX_QUEUE_SIZE     200
dengyihao's avatar
dengyihao 已提交
34

dengyihao's avatar
dengyihao 已提交
35 36 37
#define INDEX_DATA_BOOL_NULL      0x02
#define INDEX_DATA_TINYINT_NULL   0x80
#define INDEX_DATA_SMALLINT_NULL  0x8000
wafwerar's avatar
wafwerar 已提交
38 39
#define INDEX_DATA_INT_NULL       0x80000000LL
#define INDEX_DATA_BIGINT_NULL    0x8000000000000000LL
dengyihao's avatar
dengyihao 已提交
40 41
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL

dengyihao's avatar
dengyihao 已提交
42
#define INDEX_DATA_FLOAT_NULL    0x7FF00000            // it is an NAN
wafwerar's avatar
wafwerar 已提交
43
#define INDEX_DATA_DOUBLE_NULL   0x7FFFFF0000000000LL  // an NAN
dengyihao's avatar
dengyihao 已提交
44 45 46 47
#define INDEX_DATA_NCHAR_NULL    0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL   0xFF
#define INDEX_DATA_JSON_NULL     0xFFFFFFFF
#define INDEX_DATA_JSON_null     0xFFFFFFFE
dengyihao's avatar
dengyihao 已提交
48 49
#define INDEX_DATA_JSON_NOT_NULL 0x01

dengyihao's avatar
dengyihao 已提交
50
#define INDEX_DATA_UTINYINT_NULL  0xFF
dengyihao's avatar
dengyihao 已提交
51
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
dengyihao's avatar
dengyihao 已提交
52 53
#define INDEX_DATA_UINT_NULL      0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL   0xFFFFFFFFFFFFFFFFL
dengyihao's avatar
dengyihao 已提交
54

dengyihao's avatar
dengyihao 已提交
55
#define INDEX_DATA_NULL_STR   "NULL"
dengyihao's avatar
dengyihao 已提交
56 57
#define INDEX_DATA_NULL_STR_L "null"

dengyihao's avatar
dengyihao 已提交
58 59 60 61 62
void*   indexQhandle = NULL;
int32_t indexRefMgt;

static void indexDestroy(void* sIdx);

dengyihao's avatar
dengyihao 已提交
63 64
void indexInit() {
  // refactor later
dengyihao's avatar
dengyihao 已提交
65
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
dengyihao's avatar
dengyihao 已提交
66
  indexRefMgt = taosOpenRef(1000, indexDestroy);
dengyihao's avatar
dengyihao 已提交
67
}
dengyihao's avatar
dengyihao 已提交
68 69 70 71
void indexCleanUp() {
  // refacto later
  taosCleanUpScheduler(indexQhandle);
}
dengyihao's avatar
dengyihao 已提交
72

dengyihao's avatar
dengyihao 已提交
73
typedef struct SIdxColInfo {
74
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
75
  int cVersion;
76
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
77

wafwerar's avatar
wafwerar 已提交
78
static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
79
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
80
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
81

dengyihao's avatar
dengyihao 已提交
82 83
static void idxInterRsltDestroy(SArray* results);
static int  idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
dengyihao's avatar
dengyihao 已提交
84

dengyihao's avatar
dengyihao 已提交
85
static int idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
dengyihao's avatar
dengyihao 已提交
86

dengyihao's avatar
dengyihao 已提交
87
// merge cache and tfile by opera type
dengyihao's avatar
dengyihao 已提交
88
static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
dengyihao's avatar
dengyihao 已提交
89

dengyihao's avatar
dengyihao 已提交
90 91
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t        indexSerialKey(ICacheKey* key, char* buf);
dengyihao's avatar
dengyihao 已提交
92

dengyihao's avatar
dengyihao 已提交
93
static void idxPost(void* idx) {
dengyihao's avatar
dengyihao 已提交
94 95 96 97 98 99 100 101
  SIndex* pIdx = idx;
  tsem_post(&pIdx->sem);
}
static void indexWait(void* idx) {
  SIndex* pIdx = idx;
  tsem_wait(&pIdx->sem);
}

dengyihao's avatar
dengyihao 已提交
102
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
dengyihao's avatar
dengyihao 已提交
103
  int ret = TSDB_CODE_SUCCESS;
wafwerar's avatar
wafwerar 已提交
104
  taosThreadOnce(&isInit, indexInit);
wafwerar's avatar
wafwerar 已提交
105
  SIndex* sIdx = taosMemoryCalloc(1, sizeof(SIndex));
dengyihao's avatar
dengyihao 已提交
106
  if (sIdx == NULL) {
dengyihao's avatar
dengyihao 已提交
107
    return TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
108
  }
dengyihao's avatar
dengyihao 已提交
109

dengyihao's avatar
dengyihao 已提交
110
  sIdx->tindex = idxTFileCreate(path);
dengyihao's avatar
dengyihao 已提交
111
  if (sIdx->tindex == NULL) {
dengyihao's avatar
dengyihao 已提交
112
    ret = TSDB_CODE_OUT_OF_MEMORY;
dengyihao's avatar
dengyihao 已提交
113 114
    goto END;
  }
dengyihao's avatar
dengyihao 已提交
115

116 117
  sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  sIdx->cVersion = 1;
dengyihao's avatar
dengyihao 已提交
118
  sIdx->path = tstrdup(path);
wafwerar's avatar
wafwerar 已提交
119
  taosThreadMutexInit(&sIdx->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
120
  tsem_init(&sIdx->sem, 0, 0);
dengyihao's avatar
dengyihao 已提交
121

dengyihao's avatar
dengyihao 已提交
122 123
  sIdx->refId = idxAddRef(sIdx);
  idxAcquireRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
124

125
  *index = sIdx;
dengyihao's avatar
dengyihao 已提交
126
  return ret;
dengyihao's avatar
dengyihao 已提交
127

dengyihao's avatar
dengyihao 已提交
128
END:
dengyihao's avatar
dengyihao 已提交
129 130 131
  if (sIdx != NULL) {
    indexClose(sIdx);
  }
dengyihao's avatar
dengyihao 已提交
132
  *index = NULL;
dengyihao's avatar
dengyihao 已提交
133
  return ret;
H
refact  
Hongze Cheng 已提交
134
}
dengyihao's avatar
dengyihao 已提交
135

dengyihao's avatar
dengyihao 已提交
136 137
void indexDestroy(void* handle) {
  SIndex* sIdx = handle;
dengyihao's avatar
dengyihao 已提交
138
  taosThreadMutexDestroy(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
139
  tsem_destroy(&sIdx->sem);
dengyihao's avatar
dengyihao 已提交
140
  idxTFileDestroy(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
141 142 143 144 145 146
  taosMemoryFree(sIdx->path);
  taosMemoryFree(sIdx);
  return;
}
void indexClose(SIndex* sIdx) {
  bool ref = 0;
dengyihao's avatar
dengyihao 已提交
147 148 149 150
  if (sIdx->colObj != NULL) {
    void* iter = taosHashIterate(sIdx->colObj, NULL);
    while (iter) {
      IndexCache** pCache = iter;
dengyihao's avatar
dengyihao 已提交
151
      idxCacheForceToMerge((void*)(*pCache));
152
      indexInfo("%s wait to merge", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
153
      indexWait((void*)(sIdx));
dengyihao's avatar
dengyihao 已提交
154
      indexInfo("%s finish to wait", (*pCache)->colName);
dengyihao's avatar
dengyihao 已提交
155
      iter = taosHashIterate(sIdx->colObj, iter);
dengyihao's avatar
dengyihao 已提交
156
      idxCacheUnRef(*pCache);
dengyihao's avatar
dengyihao 已提交
157
    }
dengyihao's avatar
dengyihao 已提交
158 159
    taosHashCleanup(sIdx->colObj);
    sIdx->colObj = NULL;
dengyihao's avatar
dengyihao 已提交
160
  }
dengyihao's avatar
dengyihao 已提交
161 162
  idxReleaseRef(sIdx->refId);
  idxRemoveRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
163
}
dengyihao's avatar
dengyihao 已提交
164
int64_t idxAddRef(void* p) {
dengyihao's avatar
dengyihao 已提交
165 166 167
  // impl
  return taosAddRef(indexRefMgt, p);
}
dengyihao's avatar
dengyihao 已提交
168
int32_t idxRemoveRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
169 170 171 172
  // impl later
  return taosRemoveRef(indexRefMgt, ref);
}

dengyihao's avatar
dengyihao 已提交
173
void idxAcquireRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
174 175 176
  // impl
  taosAcquireRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
177
void idxReleaseRef(int64_t ref) {
dengyihao's avatar
dengyihao 已提交
178 179 180
  // impl
  taosReleaseRef(indexRefMgt, ref);
}
dengyihao's avatar
dengyihao 已提交
181

dengyihao's avatar
dengyihao 已提交
182
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
183
  // TODO(yihao): reduce the lock range
wafwerar's avatar
wafwerar 已提交
184
  taosThreadMutexLock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
185
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
186 187
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
188 189
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
190
    int32_t   sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
191 192

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
193
    if (cache == NULL) {
dengyihao's avatar
dengyihao 已提交
194
      IndexCache* pCache = idxCacheCreate(index, p->suid, p->colName, p->colType);
dengyihao's avatar
dengyihao 已提交
195
      taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
196
    }
197
  }
dengyihao's avatar
dengyihao 已提交
198
  taosThreadMutexUnlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
199 200

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
201 202
    SIndexTerm* p = taosArrayGetP(fVals, i);

dengyihao's avatar
dengyihao 已提交
203 204
    char      buf[128] = {0};
    ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
dengyihao's avatar
dengyihao 已提交
205
    int32_t   sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
206
    indexDebug("w suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
207 208

    IndexCache** cache = taosHashGet(index->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
209
    assert(*cache != NULL);
dengyihao's avatar
dengyihao 已提交
210
    int ret = idxCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
211 212 213
    if (ret != 0) {
      return ret;
    }
dengyihao's avatar
dengyihao 已提交
214
  }
dengyihao's avatar
dengyihao 已提交
215
  return 0;
dengyihao's avatar
dengyihao 已提交
216
}
dengyihao's avatar
dengyihao 已提交
217
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
218 219
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
220
  SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
221
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
222
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
223 224
    SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
    SArray*          trslt = NULL;
dengyihao's avatar
dengyihao 已提交
225
    idxTermSearch(index, qterm, &trslt);
dengyihao's avatar
dengyihao 已提交
226
    taosArrayPush(iRslts, (void*)&trslt);
227
  }
dengyihao's avatar
dengyihao 已提交
228 229
  idxMergeFinalResults(iRslts, opera, result);
  idxInterRsltDestroy(iRslts);
dengyihao's avatar
dengyihao 已提交
230
  return 0;
dengyihao's avatar
dengyihao 已提交
231 232
}

233
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
dengyihao's avatar
dengyihao 已提交
234
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
dengyihao's avatar
dengyihao 已提交
235

236 237
SIndexOpts* indexOptsCreate() { return NULL; }
void        indexOptsDestroy(SIndexOpts* opts) { return; }
dengyihao's avatar
dengyihao 已提交
238 239 240 241
/*
 * @param: oper
 *
 */
dengyihao's avatar
dengyihao 已提交
242
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
dengyihao's avatar
dengyihao 已提交
243 244
  SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
  if (mtq == NULL) {
dengyihao's avatar
dengyihao 已提交
245 246
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
247 248 249
  mtq->opera = opera;
  mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
  return mtq;
dengyihao's avatar
dengyihao 已提交
250
}
dengyihao's avatar
dengyihao 已提交
251
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
252
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
253
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
254
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
255
  }
256
  taosArrayDestroy(pQuery->query);
wafwerar's avatar
wafwerar 已提交
257
  taosMemoryFree(pQuery);
dengyihao's avatar
dengyihao 已提交
258
};
dengyihao's avatar
dengyihao 已提交
259
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
260
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
261 262
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
263 264
}

dengyihao's avatar
dengyihao 已提交
265 266
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
                            int32_t nColName, const char* colVal, int32_t nColVal) {
dengyihao's avatar
dengyihao 已提交
267 268
  SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
  if (tm == NULL) {
dengyihao's avatar
dengyihao 已提交
269 270
    return NULL;
  }
dengyihao's avatar
dengyihao 已提交
271

dengyihao's avatar
dengyihao 已提交
272 273 274
  tm->suid = suid;
  tm->operType = oper;
  tm->colType = colType;
dengyihao's avatar
dengyihao 已提交
275

dengyihao's avatar
dengyihao 已提交
276 277 278 279 280
  tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
  memcpy(tm->colName, colName, nColName);
  tm->nColName = nColName;

  char*   buf = NULL;
dengyihao's avatar
dengyihao 已提交
281
  int32_t len = idxConvertDataToStr((void*)colVal, IDX_TYPE_GET_TYPE(colType), (void**)&buf);
dengyihao's avatar
dengyihao 已提交
282 283 284 285 286
  assert(len != -1);

  tm->colVal = buf;
  tm->nColVal = len;

dengyihao's avatar
dengyihao 已提交
287
  return tm;
dengyihao's avatar
dengyihao 已提交
288
}
dengyihao's avatar
dengyihao 已提交
289

dengyihao's avatar
dengyihao 已提交
290
void indexTermDestroy(SIndexTerm* p) {
wafwerar's avatar
wafwerar 已提交
291 292 293
  taosMemoryFree(p->colName);
  taosMemoryFree(p->colVal);
  taosMemoryFree(p);
dengyihao's avatar
dengyihao 已提交
294 295
}

dengyihao's avatar
dengyihao 已提交
296
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
297

dengyihao's avatar
dengyihao 已提交
298
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
299 300
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
301
}
dengyihao's avatar
dengyihao 已提交
302
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
303
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
304
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
305 306
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
307
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
308
}
dengyihao's avatar
dengyihao 已提交
309

dengyihao's avatar
dengyihao 已提交
310 311 312 313 314
/*
 * rebuild index
 */

static void idxSchedRebuildIdx(SSchedMsg* msg) {
dengyihao's avatar
dengyihao 已提交
315
  // TODO, no need rebuild index
dengyihao's avatar
dengyihao 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
  SIndex* idx = msg->ahandle;

  int8_t st = kFinished;
  atomic_store_8(&idx->status, st);
  idxReleaseRef(idx->refId);
}
void indexRebuild(SIndexJson* idx, void* iter) {
  // set up rebuild status
  int8_t st = kRebuild;
  atomic_store_8(&idx->status, st);

  // task put into BG thread
  SSchedMsg schedMsg = {0};
  schedMsg.fp = idxSchedRebuildIdx;
  schedMsg.ahandle = idx;
  idxAcquireRef(idx->refId);
  taosScheduleTask(indexQhandle, &schedMsg);
}

/*
 * check index json status
 **/
bool indexIsRebuild(SIndex* idx) {
  // idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}
/*
 * rebuild index
 */
void indexJsonRebuild(SIndexJson* idx, void* iter) {
  // idx rebuild or not
  indexRebuild(idx, iter);
}

/*
 * check index json status
 **/
bool indexJsonIsRebuild(SIndexJson* idx) {
  // load idx rebuild or not
  return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}

dengyihao's avatar
dengyihao 已提交
358
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
dengyihao's avatar
dengyihao 已提交
359 360
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
361
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
362

dengyihao's avatar
dengyihao 已提交
363 364
  // Get col info
  IndexCache* cache = NULL;
dengyihao's avatar
dengyihao 已提交
365 366

  char      buf[128] = {0};
dengyihao's avatar
dengyihao 已提交
367 368
  ICacheKey key = {
      .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
dengyihao's avatar
dengyihao 已提交
369
  indexDebug("r suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType);
dengyihao's avatar
dengyihao 已提交
370
  int32_t sz = idxSerialCacheKey(&key, buf);
dengyihao's avatar
dengyihao 已提交
371

wafwerar's avatar
wafwerar 已提交
372
  taosThreadMutexLock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
373
  IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
dengyihao's avatar
dengyihao 已提交
374
  cache = (pCache == NULL) ? NULL : *pCache;
wafwerar's avatar
wafwerar 已提交
375
  taosThreadMutexUnlock(&sIdx->mtx);
376

dengyihao's avatar
dengyihao 已提交
377
  *result = taosArrayInit(4, sizeof(uint64_t));
378
  // TODO: iterator mem and tidex
dengyihao's avatar
dengyihao 已提交
379
  STermValueType s = kTypeValue;
dengyihao's avatar
dengyihao 已提交
380

dengyihao's avatar
add UT  
dengyihao 已提交
381 382
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
383
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
384
  if (0 == idxCacheSearch(cache, query, tr, &s)) {
dengyihao's avatar
dengyihao 已提交
385
    if (s == kTypeDeletion) {
dengyihao's avatar
dengyihao 已提交
386
      indexInfo("col: %s already drop by", term->colName);
387
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
388 389
      return 0;
    } else {
dengyihao's avatar
add UT  
dengyihao 已提交
390
      st = taosGetTimestampUs();
dengyihao's avatar
dengyihao 已提交
391
      if (0 != idxTFileSearch(sIdx->tindex, query, tr)) {
392
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
393
        goto END;
394
      }
dengyihao's avatar
add UT  
dengyihao 已提交
395 396
      int64_t tfCost = taosGetTimestampUs() - st;
      indexInfo("tfile search cost: %" PRIu64 "us", tfCost);
dengyihao's avatar
dengyihao 已提交
397 398 399
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
dengyihao's avatar
dengyihao 已提交
400
    goto END;
dengyihao's avatar
dengyihao 已提交
401
  }
dengyihao's avatar
add UT  
dengyihao 已提交
402 403
  int64_t cost = taosGetTimestampUs() - st;
  indexInfo("search cost: %" PRIu64 "us", cost);
dengyihao's avatar
dengyihao 已提交
404

dengyihao's avatar
dengyihao 已提交
405
  idxTRsltMergeTo(tr, *result);
dengyihao's avatar
add UT  
dengyihao 已提交
406

dengyihao's avatar
dengyihao 已提交
407
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
408
  return 0;
dengyihao's avatar
dengyihao 已提交
409
END:
dengyihao's avatar
dengyihao 已提交
410
  idxTRsltDestroy(tr);
dengyihao's avatar
dengyihao 已提交
411
  return -1;
dengyihao's avatar
dengyihao 已提交
412
}
dengyihao's avatar
dengyihao 已提交
413
static void idxInterRsltDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
414 415 416
  if (results == NULL) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
417 418 419

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
420
    SArray* p = taosArrayGetP(results, i);
421 422
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
423 424
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
425

dengyihao's avatar
dengyihao 已提交
426
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
427
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
428 429
  for (int i = 0; i < taosArrayGetSize(in); i--) {
    SArray* t = taosArrayGetP(in, i);
dengyihao's avatar
dengyihao 已提交
430 431 432
    taosArraySort(t, uidCompare);
    taosArrayRemoveDuplicate(t, uidCompare, NULL);
  }
dengyihao's avatar
dengyihao 已提交
433

dengyihao's avatar
dengyihao 已提交
434
  if (oType == MUST) {
dengyihao's avatar
dengyihao 已提交
435
    iIntersection(in, out);
dengyihao's avatar
dengyihao 已提交
436
  } else if (oType == SHOULD) {
dengyihao's avatar
dengyihao 已提交
437
    iUnion(in, out);
dengyihao's avatar
dengyihao 已提交
438
  } else if (oType == NOT) {
439
    // just one column index, enhance later
dengyihao's avatar
dengyihao 已提交
440
    // taosArrayAddAll(fResults, interResults);
441
    // not use currently
dengyihao's avatar
dengyihao 已提交
442 443 444
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
445

dengyihao's avatar
dengyihao 已提交
446
static void idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
447 448 449 450
  int32_t sz = taosArrayGetSize(result);
  if (sz > 0) {
    TFileValue* lv = taosArrayGetP(result, sz - 1);
    if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
dengyihao's avatar
dengyihao 已提交
451 452
      idxTRsltMergeTo(tr, lv->tableId);
      idxTRsltClear(tr);
453 454 455

      taosArrayPush(result, &tfv);
    } else if (tfv == NULL) {
dengyihao's avatar
dengyihao 已提交
456
      // handle last iterator
dengyihao's avatar
dengyihao 已提交
457
      idxTRsltMergeTo(tr, lv->tableId);
458
    } else {
dengyihao's avatar
dengyihao 已提交
459
      // temp result saved in help
460 461 462 463 464 465
      tfileValueDestroy(tfv);
    }
  } else {
    taosArrayPush(result, &tfv);
  }
}
dengyihao's avatar
dengyihao 已提交
466
static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
467
  char*       colVal = (cv != NULL) ? cv->colVal : tv->colVal;
dengyihao's avatar
dengyihao 已提交
468
  TFileValue* tfv = tfileValueCreate(colVal);
469

dengyihao's avatar
dengyihao 已提交
470
  idxMayMergeTempToFinalRslt(result, tfv, tr);
471

dengyihao's avatar
dengyihao 已提交
472
  if (cv != NULL) {
473
    uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
dengyihao's avatar
dengyihao 已提交
474
    uint32_t ver = cv->ver;
dengyihao's avatar
dengyihao 已提交
475
    if (cv->type == ADD_VALUE) {
dengyihao's avatar
dengyihao 已提交
476
      INDEX_MERGE_ADD_DEL(tr->del, tr->add, id)
dengyihao's avatar
dengyihao 已提交
477
    } else if (cv->type == DEL_VALUE) {
dengyihao's avatar
dengyihao 已提交
478
      INDEX_MERGE_ADD_DEL(tr->add, tr->del, id)
dengyihao's avatar
dengyihao 已提交
479 480 481
    }
  }
  if (tv != NULL) {
dengyihao's avatar
dengyihao 已提交
482
    taosArrayAddAll(tr->total, tv->val);
dengyihao's avatar
dengyihao 已提交
483 484
  }
}
dengyihao's avatar
dengyihao 已提交
485
static void idxDestroyFinalRslt(SArray* result) {
dengyihao's avatar
dengyihao 已提交
486 487 488 489 490 491 492
  int32_t sz = result ? taosArrayGetSize(result) : 0;
  for (size_t i = 0; i < sz; i++) {
    TFileValue* tv = taosArrayGetP(result, i);
    tfileValueDestroy(tv);
  }
  taosArrayDestroy(result);
}
493

dengyihao's avatar
dengyihao 已提交
494
int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
dengyihao's avatar
dengyihao 已提交
495 496 497
  if (sIdx == NULL) {
    return -1;
  }
dengyihao's avatar
dengyihao 已提交
498
  indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
499

dengyihao's avatar
dengyihao 已提交
500 501
  int64_t st = taosGetTimestampUs();

dengyihao's avatar
dengyihao 已提交
502 503
  IndexCache* pCache = (IndexCache*)cache;

dengyihao's avatar
dengyihao 已提交
504 505
  while (quit && atomic_load_32(&pCache->merging) == 1)
    ;
dengyihao's avatar
dengyihao 已提交
506
  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
dengyihao's avatar
dengyihao 已提交
507 508 509
  if (pReader == NULL) {
    indexWarn("empty tfile reader found");
  }
dengyihao's avatar
dengyihao 已提交
510
  // handle flush
dengyihao's avatar
dengyihao 已提交
511
  Iterate* cacheIter = idxCacheIteratorCreate(pCache);
dengyihao's avatar
dengyihao 已提交
512 513
  if (cacheIter == NULL) {
    indexError("%p immtable is empty, ignore merge opera", pCache);
dengyihao's avatar
dengyihao 已提交
514
    idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
515
    tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
516 517
    atomic_store_32(&pCache->merging, 0);
    if (quit) {
dengyihao's avatar
dengyihao 已提交
518
      idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
519
    }
dengyihao's avatar
dengyihao 已提交
520
    idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
521 522 523
    return 0;
  }

dengyihao's avatar
dengyihao 已提交
524
  Iterate* tfileIter = tfileIteratorCreate(pReader);
dengyihao's avatar
dengyihao 已提交
525 526 527
  if (tfileIter == NULL) {
    indexWarn("empty tfile reader iterator");
  }
dengyihao's avatar
dengyihao 已提交
528 529 530

  SArray* result = taosArrayInit(1024, sizeof(void*));

dengyihao's avatar
dengyihao 已提交
531 532
  bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
  bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
533

dengyihao's avatar
dengyihao 已提交
534
  SIdxTRslt* tr = idxTRsltCreate();
dengyihao's avatar
dengyihao 已提交
535 536 537 538 539 540 541 542 543 544 545 546
  while (cn == true || tn == true) {
    IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
    IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;

    int comp = 0;
    if (cn == true && tn == true) {
      comp = strcmp(cv->colVal, tv->colVal);
    } else if (cn == true) {
      comp = -1;
    } else {
      comp = 1;
    }
dengyihao's avatar
dengyihao 已提交
547
    if (comp == 0) {
dengyihao's avatar
dengyihao 已提交
548
      idxMergeCacheAndTFile(result, cv, tv, tr);
dengyihao's avatar
dengyihao 已提交
549 550 551
      cn = cacheIter->next(cacheIter);
      tn = tfileIter->next(tfileIter);
    } else if (comp < 0) {
dengyihao's avatar
dengyihao 已提交
552
      idxMergeCacheAndTFile(result, cv, NULL, tr);
dengyihao's avatar
dengyihao 已提交
553 554
      cn = cacheIter->next(cacheIter);
    } else {
dengyihao's avatar
dengyihao 已提交
555
      idxMergeCacheAndTFile(result, NULL, tv, tr);
dengyihao's avatar
dengyihao 已提交
556 557 558
      tn = tfileIter->next(tfileIter);
    }
  }
dengyihao's avatar
dengyihao 已提交
559
  idxMayMergeTempToFinalRslt(result, NULL, tr);
dengyihao's avatar
dengyihao 已提交
560
  idxTRsltDestroy(tr);
561

dengyihao's avatar
dengyihao 已提交
562 563
  int ret = idxGenTFile(sIdx, pCache, result);
  idxDestroyFinalRslt(result);
dengyihao's avatar
dengyihao 已提交
564

dengyihao's avatar
dengyihao 已提交
565
  idxCacheDestroyImm(pCache);
dengyihao's avatar
dengyihao 已提交
566

dengyihao's avatar
dengyihao 已提交
567
  idxCacheIteratorDestroy(cacheIter);
dengyihao's avatar
dengyihao 已提交
568 569
  tfileIteratorDestroy(tfileIter);

dengyihao's avatar
dengyihao 已提交
570
  tfileReaderUnRef(pReader);
dengyihao's avatar
dengyihao 已提交
571
  idxCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
572 573 574 575 576 577 578

  int64_t cost = taosGetTimestampUs() - st;
  if (ret != 0) {
    indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
  } else {
    indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
  }
dengyihao's avatar
dengyihao 已提交
579 580
  atomic_store_32(&pCache->merging, 0);
  if (quit) {
dengyihao's avatar
dengyihao 已提交
581
    idxPost(sIdx);
dengyihao's avatar
dengyihao 已提交
582
  }
dengyihao's avatar
dengyihao 已提交
583
  idxReleaseRef(sIdx->refId);
dengyihao's avatar
dengyihao 已提交
584

dengyihao's avatar
dengyihao 已提交
585
  return ret;
dengyihao's avatar
dengyihao 已提交
586
}
dengyihao's avatar
dengyihao 已提交
587 588 589
void iterateValueDestroy(IterateValue* value, bool destroy) {
  if (destroy) {
    taosArrayDestroy(value->val);
dengyihao's avatar
dengyihao 已提交
590
    value->val = NULL;
dengyihao's avatar
dengyihao 已提交
591
  } else {
dengyihao's avatar
dengyihao 已提交
592 593 594
    if (value->val != NULL) {
      taosArrayClear(value->val);
    }
dengyihao's avatar
dengyihao 已提交
595
  }
wafwerar's avatar
wafwerar 已提交
596
  taosMemoryFree(value->colVal);
dengyihao's avatar
dengyihao 已提交
597 598
  value->colVal = NULL;
}
599

dengyihao's avatar
dengyihao 已提交
600
static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) {
601 602
  ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
  int64_t   ver = CACHE_VERSION(cache);
dengyihao's avatar
dengyihao 已提交
603

dengyihao's avatar
dengyihao 已提交
604
  IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
dengyihao's avatar
dengyihao 已提交
605 606

  taosThreadMutexLock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
607
  TFileReader* rd = tfileCacheGet(tf->cache, &key);
dengyihao's avatar
dengyihao 已提交
608 609 610
  taosThreadMutexUnlock(&tf->mtx);

  if (rd != NULL) {
dengyihao's avatar
dengyihao 已提交
611
    ver = (ver > rd->header.version ? ver : rd->header.version) + 1;
dengyihao's avatar
dengyihao 已提交
612
    indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver);
613
  }
dengyihao's avatar
dengyihao 已提交
614
  tfileReaderUnRef(rd);
615 616
  return ver;
}
dengyihao's avatar
dengyihao 已提交
617
static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
dengyihao's avatar
dengyihao 已提交
618
  int64_t version = idxGetAvailableVer(sIdx, cache);
619
  indexInfo("file name version: %" PRId64 "", version);
dengyihao's avatar
dengyihao 已提交
620 621
  uint8_t colType = cache->type;

dengyihao's avatar
dengyihao 已提交
622
  TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
dengyihao's avatar
dengyihao 已提交
623 624 625 626 627 628 629 630 631 632 633 634
  if (tw == NULL) {
    indexError("failed to open file to write");
    return -1;
  }

  int ret = tfileWriterPut(tw, batch, true);
  if (ret != 0) {
    indexError("failed to write into tindex ");
    goto END;
  }
  tfileWriterClose(tw);

dengyihao's avatar
dengyihao 已提交
635
  TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
dengyihao's avatar
dengyihao 已提交
636 637 638
  if (reader == NULL) {
    return -1;
  }
639
  indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
dengyihao's avatar
dengyihao 已提交
640

dengyihao's avatar
dengyihao 已提交
641 642
  IndexTFile* tf = (IndexTFile*)sIdx->tindex;

dengyihao's avatar
dengyihao 已提交
643
  TFileHeader* header = &reader->header;
dengyihao's avatar
dengyihao 已提交
644
  ICacheKey    key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
dengyihao's avatar
dengyihao 已提交
645

dengyihao's avatar
dengyihao 已提交
646 647 648
  taosThreadMutexLock(&tf->mtx);
  tfileCachePut(tf->cache, &key, reader);
  taosThreadMutexUnlock(&tf->mtx);
dengyihao's avatar
dengyihao 已提交
649

dengyihao's avatar
dengyihao 已提交
650 651
  return ret;
END:
dengyihao's avatar
dengyihao 已提交
652
  if (tw != NULL) {
dengyihao's avatar
dengyihao 已提交
653
    idxFileCtxDestroy(tw->ctx, true);
wafwerar's avatar
wafwerar 已提交
654
    taosMemoryFree(tw);
dengyihao's avatar
dengyihao 已提交
655
  }
dengyihao's avatar
dengyihao 已提交
656
  return -1;
dengyihao's avatar
dengyihao 已提交
657
}
dengyihao's avatar
dengyihao 已提交
658

dengyihao's avatar
dengyihao 已提交
659
int32_t idxSerialCacheKey(ICacheKey* key, char* buf) {
dengyihao's avatar
dengyihao 已提交
660
  bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
dengyihao's avatar
dengyihao 已提交
661

dengyihao's avatar
dengyihao 已提交
662
  char* p = buf;
dengyihao's avatar
dengyihao 已提交
663
  char  tbuf[65] = {0};
dengyihao's avatar
dengyihao 已提交
664
  idxInt2str((int64_t)key->suid, tbuf, 0);
dengyihao's avatar
dengyihao 已提交
665 666

  SERIALIZE_STR_VAR_TO_BUF(buf, tbuf, strlen(tbuf));
dengyihao's avatar
dengyihao 已提交
667
  SERIALIZE_VAR_TO_BUF(buf, '_', char);
dengyihao's avatar
dengyihao 已提交
668 669 670 671 672
  if (hasJson) {
    SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
  } else {
    SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
  }
dengyihao's avatar
dengyihao 已提交
673 674
  return buf - p;
}