index.c 10.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17
#include "index.h"
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
18
#include "index_cache.h"
dengyihao's avatar
dengyihao 已提交
19
#include "index_tfile.h"
dengyihao's avatar
dengyihao 已提交
20
#include "tdef.h"
dengyihao's avatar
dengyihao 已提交
21
#include "tsched.h"
H
refact  
Hongze Cheng 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
25 26
#endif

dengyihao's avatar
dengyihao 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40
#define INDEX_NUM_OF_THREADS 4
#define INDEX_QUEUE_SIZE 4

void* indexQhandle = NULL;

int32_t indexInit() {
  indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
  return indexQhandle == NULL ? -1 : 0;
  // do nothing
}
void indexCleanUp() {
  taosCleanUpScheduler(indexQhandle);
}

dengyihao's avatar
dengyihao 已提交
41 42 43
static int uidCompare(const void* a, const void* b) {
  uint64_t u1 = *(uint64_t*)a;
  uint64_t u2 = *(uint64_t*)b;
44 45 46 47 48
  if (u1 == u2) {
    return 0;
  } else {
    return u1 < u2 ? -1 : 1;
  }
dengyihao's avatar
dengyihao 已提交
49
}
dengyihao's avatar
dengyihao 已提交
50
typedef struct SIdxColInfo {
51
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
52
  int cVersion;
53
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
54 55

static pthread_once_t isInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
56
// static void           indexInit();
dengyihao's avatar
dengyihao 已提交
57

dengyihao's avatar
dengyihao 已提交
58
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
dengyihao's avatar
dengyihao 已提交
59

dengyihao's avatar
dengyihao 已提交
60 61
static void indexInterResultsDestroy(SArray* results);
static int  indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult);
dengyihao's avatar
dengyihao 已提交
62

dengyihao's avatar
dengyihao 已提交
63
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
dengyihao's avatar
dengyihao 已提交
64
  // pthread_once(&isInit, indexInit);
dengyihao's avatar
dengyihao 已提交
65
  SIndex* sIdx = calloc(1, sizeof(SIndex));
dengyihao's avatar
dengyihao 已提交
66
  if (sIdx == NULL) { return -1; }
dengyihao's avatar
dengyihao 已提交
67

68
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
69
  index_t* index = index_open(path);
dengyihao's avatar
dengyihao 已提交
70
  sIdx->index = index;
dengyihao's avatar
dengyihao 已提交
71
#endif
dengyihao's avatar
dengyihao 已提交
72

dengyihao's avatar
dengyihao 已提交
73
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
74 75
  // sIdx->cache = (void*)indexCacheCreate(sIdx);
  sIdx->tindex = indexTFileCreate(path);
76 77
  sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  sIdx->cVersion = 1;
dengyihao's avatar
dengyihao 已提交
78
  pthread_mutex_init(&sIdx->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
79

80 81
  *index = sIdx;
  return 0;
dengyihao's avatar
dengyihao 已提交
82 83 84 85
#endif

  *index = NULL;
  return -1;
H
refact  
Hongze Cheng 已提交
86
}
dengyihao's avatar
dengyihao 已提交
87

dengyihao's avatar
dengyihao 已提交
88
void indexClose(SIndex* sIdx) {
89 90
#ifdef USE_LUCENE
  index_close(sIdex->index);
dengyihao's avatar
dengyihao 已提交
91
  sIdx->index = NULL;
H
refact  
Hongze Cheng 已提交
92
#endif
dengyihao's avatar
dengyihao 已提交
93

dengyihao's avatar
dengyihao 已提交
94
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
95
  indexCacheDestroy(sIdx->cache);
dengyihao's avatar
dengyihao 已提交
96 97 98 99 100 101
  void* iter = taosHashIterate(sIdx->colObj, NULL);
  while (iter) {
    IndexCache** pCache = iter;
    if (*pCache) { indexCacheUnRef(*pCache); }
    iter = taosHashIterate(sIdx->colObj, iter);
  }
102
  taosHashCleanup(sIdx->colObj);
dengyihao's avatar
dengyihao 已提交
103
  pthread_mutex_destroy(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
104
#endif
105
  free(sIdx);
dengyihao's avatar
dengyihao 已提交
106 107
  return;
}
dengyihao's avatar
dengyihao 已提交
108

dengyihao's avatar
dengyihao 已提交
109
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
110
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
111
  index_document_t* doc = index_document_create();
112 113 114

  char buf[16] = {0};
  sprintf(buf, "%d", uid);
dengyihao's avatar
dengyihao 已提交
115

116
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
117 118
    SIndexTerm* p = taosArrayGetP(fVals, i);
    index_document_add(doc, (const char*)(p->key), p->nKey, (const char*)(p->val), p->nVal, 1);
119 120 121 122 123
  }
  index_document_add(doc, NULL, 0, buf, strlen(buf), 0);

  index_put(index->index, doc);
  index_document_destroy(doc);
dengyihao's avatar
dengyihao 已提交
124
#endif
dengyihao's avatar
dengyihao 已提交
125

dengyihao's avatar
dengyihao 已提交
126
#ifdef USE_INVERTED_INDEX
127 128 129

  // TODO(yihao): reduce the lock range
  pthread_mutex_lock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
130
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
131
    SIndexTerm*  p = taosArrayGetP(fVals, i);
dengyihao's avatar
dengyihao 已提交
132 133 134 135
    IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
    if (*cache == NULL) {
      IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType);
      taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*));
dengyihao's avatar
dengyihao 已提交
136
    }
137
  }
dengyihao's avatar
dengyihao 已提交
138
  pthread_mutex_unlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
139 140

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
dengyihao's avatar
dengyihao 已提交
141
    SIndexTerm*  p = taosArrayGetP(fVals, i);
dengyihao's avatar
dengyihao 已提交
142 143 144 145
    IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);

    assert(*cache != NULL);
    int ret = indexCachePut(*cache, p, uid);
dengyihao's avatar
dengyihao 已提交
146
    if (ret != 0) { return ret; }
dengyihao's avatar
dengyihao 已提交
147
  }
dengyihao's avatar
dengyihao 已提交
148

dengyihao's avatar
dengyihao 已提交
149
#endif
dengyihao's avatar
dengyihao 已提交
150
  return 0;
dengyihao's avatar
dengyihao 已提交
151
}
dengyihao's avatar
dengyihao 已提交
152
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
153 154
#ifdef USE_LUCENE
  EIndexOperatorType opera = multiQuerys->opera;
dengyihao's avatar
dengyihao 已提交
155

156
  int    nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
157 158 159
  char** fields = malloc(sizeof(char*) * nQuery);
  char** keys = malloc(sizeof(char*) * nQuery);
  int*   types = malloc(sizeof(int) * nQuery);
dengyihao's avatar
dengyihao 已提交
160 161

  for (int i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
162 163
    SIndexTermQuery* p = taosArrayGet(multiQuerys->query, i);
    SIndexTerm*      term = p->field_value;
164 165 166 167 168 169 170 171

    fields[i] = calloc(1, term->nKey + 1);
    keys[i] = calloc(1, term->nVal + 1);

    memcpy(fields[i], term->key, term->nKey);
    memcpy(keys[i], term->val, term->nVal);
    types[i] = (int)(p->type);
  }
dengyihao's avatar
dengyihao 已提交
172
  int* tResult = NULL;
173
  int  tsz = 0;
dengyihao's avatar
dengyihao 已提交
174
  index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz);
175

dengyihao's avatar
dengyihao 已提交
176 177 178
  for (int i = 0; i < tsz; i++) {
    taosArrayPush(result, &tResult[i]);
  }
dengyihao's avatar
dengyihao 已提交
179 180 181 182 183 184 185 186

  for (int i = 0; i < nQuery; i++) {
    free(fields[i]);
    free(keys[i]);
  }
  free(fields);
  free(keys);
  free(types);
dengyihao's avatar
dengyihao 已提交
187 188
#endif

dengyihao's avatar
dengyihao 已提交
189
#ifdef USE_INVERTED_INDEX
190 191
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
192
  SArray* interResults = taosArrayInit(4, POINTER_BYTES);
193
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
194
  for (size_t i = 0; i < nQuery; i++) {
dengyihao's avatar
dengyihao 已提交
195 196
    SIndexTermQuery* qTerm = taosArrayGet(multiQuerys->query, i);
    SArray*          tResult = NULL;
197
    indexTermSearch(index, qTerm, &tResult);
dengyihao's avatar
dengyihao 已提交
198
    taosArrayPush(interResults, (void*)&tResult);
199
  }
dengyihao's avatar
dengyihao 已提交
200 201
  indexMergeFinalResults(interResults, opera, result);
  indexInterResultsDestroy(interResults);
202

dengyihao's avatar
dengyihao 已提交
203
#endif
dengyihao's avatar
dengyihao 已提交
204 205 206
  return 1;
}

dengyihao's avatar
dengyihao 已提交
207
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
dengyihao's avatar
dengyihao 已提交
208
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
209
#endif
210

dengyihao's avatar
dengyihao 已提交
211 212
  return 1;
}
dengyihao's avatar
dengyihao 已提交
213
int indexRebuild(SIndex* index, SIndexOpts* opts){
dengyihao's avatar
dengyihao 已提交
214
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
215 216 217
#endif

}
dengyihao's avatar
dengyihao 已提交
218

dengyihao's avatar
dengyihao 已提交
219
SIndexOpts* indexOptsCreate() {
220
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
221
#endif
222
  return NULL;
dengyihao's avatar
dengyihao 已提交
223
}
dengyihao's avatar
dengyihao 已提交
224
void indexOptsDestroy(SIndexOpts* opts){
225
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
226
#endif
227 228 229 230
} /*
   * @param: oper
   *
   */
dengyihao's avatar
dengyihao 已提交
231

dengyihao's avatar
dengyihao 已提交
232 233
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
  SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
dengyihao's avatar
dengyihao 已提交
234
  if (p == NULL) { return NULL; }
235 236
  p->opera = opera;
  p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
dengyihao's avatar
dengyihao 已提交
237 238
  return p;
}
dengyihao's avatar
dengyihao 已提交
239
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
dengyihao's avatar
dengyihao 已提交
240
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
dengyihao's avatar
dengyihao 已提交
241
    SIndexTermQuery* p = (SIndexTermQuery*)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
242
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
243
  }
244
  taosArrayDestroy(pQuery->query);
dengyihao's avatar
dengyihao 已提交
245 246
  free(pQuery);
};
dengyihao's avatar
dengyihao 已提交
247
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
248
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
249 250
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
251 252
}

dengyihao's avatar
dengyihao 已提交
253 254 255 256 257 258 259 260
SIndexTerm* indexTermCreate(int64_t            suid,
                            SIndexOperOnColumn oper,
                            uint8_t            colType,
                            const char*        colName,
                            int32_t            nColName,
                            const char*        colVal,
                            int32_t            nColVal) {
  SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
dengyihao's avatar
dengyihao 已提交
261
  if (t == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
262

263 264
  t->suid = suid;
  t->operType = oper;
dengyihao's avatar
dengyihao 已提交
265 266
  t->colType = colType;

dengyihao's avatar
dengyihao 已提交
267
  t->colName = (char*)calloc(1, nColName + 1);
dengyihao's avatar
dengyihao 已提交
268 269
  memcpy(t->colName, colName, nColName);
  t->nColName = nColName;
dengyihao's avatar
dengyihao 已提交
270

dengyihao's avatar
dengyihao 已提交
271
  t->colVal = (char*)calloc(1, nColVal + 1);
dengyihao's avatar
dengyihao 已提交
272 273
  memcpy(t->colVal, colVal, nColVal);
  t->nColVal = nColVal;
dengyihao's avatar
dengyihao 已提交
274
  return t;
dengyihao's avatar
dengyihao 已提交
275
}
dengyihao's avatar
dengyihao 已提交
276
void indexTermDestroy(SIndexTerm* p) {
dengyihao's avatar
dengyihao 已提交
277 278
  free(p->colName);
  free(p->colVal);
dengyihao's avatar
dengyihao 已提交
279
  free(p);
dengyihao's avatar
dengyihao 已提交
280 281
}

dengyihao's avatar
dengyihao 已提交
282 283 284
SIndexMultiTerm* indexMultiTermCreate() {
  return taosArrayInit(4, sizeof(SIndexTerm*));
}
285

dengyihao's avatar
dengyihao 已提交
286
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
287 288
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
289
}
dengyihao's avatar
dengyihao 已提交
290
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
dengyihao's avatar
dengyihao 已提交
291
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
dengyihao's avatar
dengyihao 已提交
292
    SIndexTerm* p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
293 294
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
295
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
296
}
dengyihao's avatar
dengyihao 已提交
297

dengyihao's avatar
dengyihao 已提交
298 299 300
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
  SIndexTerm* term = query->term;
  const char* colName = term->colName;
301
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
302

dengyihao's avatar
dengyihao 已提交
303 304
  // Get col info
  IndexCache* cache = NULL;
305
  pthread_mutex_lock(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
306 307
  IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
  if (*pCache == NULL) {
308 309
    pthread_mutex_unlock(&sIdx->mtx);
    return -1;
dengyihao's avatar
dengyihao 已提交
310
  }
dengyihao's avatar
dengyihao 已提交
311
  cache = *pCache;
312 313
  pthread_mutex_unlock(&sIdx->mtx);

dengyihao's avatar
dengyihao 已提交
314
  *result = taosArrayInit(4, sizeof(uint64_t));
315 316
  // TODO: iterator mem and tidex
  STermValueType s;
dengyihao's avatar
dengyihao 已提交
317
  if (0 == indexCacheSearch(cache, query, *result, &s)) {
dengyihao's avatar
dengyihao 已提交
318 319
    if (s == kTypeDeletion) {
      indexInfo("col: %s already drop by other opera", term->colName);
320
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
321 322 323
      return 0;
    } else {
      if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
324 325 326
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
        return -1;
      }
dengyihao's avatar
dengyihao 已提交
327 328 329 330 331 332
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
333
}
dengyihao's avatar
dengyihao 已提交
334
static void indexInterResultsDestroy(SArray* results) {
dengyihao's avatar
dengyihao 已提交
335
  if (results == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
336 337 338

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
dengyihao's avatar
dengyihao 已提交
339
    SArray* p = taosArrayGetP(results, i);
340 341
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
342 343
  taosArrayDestroy(results);
}
dengyihao's avatar
dengyihao 已提交
344
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) {
345
  // refactor, merge interResults into fResults by oType
dengyihao's avatar
dengyihao 已提交
346
  SArray* first = taosArrayGetP(interResults, 0);
347
  taosArraySort(first, uidCompare);
dengyihao's avatar
dengyihao 已提交
348
  taosArrayRemoveDuplicate(first, uidCompare, NULL);
dengyihao's avatar
dengyihao 已提交
349

dengyihao's avatar
dengyihao 已提交
350
  if (oType == MUST) {
351 352
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
dengyihao's avatar
dengyihao 已提交
353
  } else if (oType == SHOULD) {
354 355
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
dengyihao's avatar
dengyihao 已提交
356 357
    // tag1 condistion || tag2 condition
  } else if (oType == NOT) {
358 359 360
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
    // not use currently
dengyihao's avatar
dengyihao 已提交
361 362 363
  }
  return 0;
}
dengyihao's avatar
dengyihao 已提交
364
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
dengyihao's avatar
dengyihao 已提交
365
  if (sIdx == NULL) { return -1; }
366
  indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
367 368 369
  IndexCache* pCache = (IndexCache*)cache;

  TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName);
dengyihao's avatar
dengyihao 已提交
370

dengyihao's avatar
dengyihao 已提交
371 372
  tfileReaderUnRef(pReader);
  indexCacheUnRef(pCache);
dengyihao's avatar
dengyihao 已提交
373 374
  return 0;
}