index.c 9.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17
#include "index.h"
#include "indexInt.h"
dengyihao's avatar
dengyihao 已提交
18
#include "index_cache.h"
dengyihao's avatar
dengyihao 已提交
19
#include "index_tfile.h"
dengyihao's avatar
dengyihao 已提交
20
#include "tdef.h"
H
refact  
Hongze Cheng 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
H
refact  
Hongze Cheng 已提交
24 25
#endif

dengyihao's avatar
dengyihao 已提交
26 27 28
static int uidCompare(const void *a, const void *b) {
  uint64_t u1 = *(uint64_t *)a;
  uint64_t u2 = *(uint64_t *)b;
29 30 31 32 33
  if (u1 == u2) {
    return 0;
  } else {
    return u1 < u2 ? -1 : 1;
  }
dengyihao's avatar
dengyihao 已提交
34
}
dengyihao's avatar
dengyihao 已提交
35
typedef struct SIdxColInfo {
36
  int colId;  // generated by index internal
dengyihao's avatar
dengyihao 已提交
37
  int cVersion;
38
} SIdxColInfo;
dengyihao's avatar
dengyihao 已提交
39 40

static pthread_once_t isInit = PTHREAD_ONCE_INIT;
41
static void           indexInit();
dengyihao's avatar
dengyihao 已提交
42 43 44 45 46

static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *term, SArray **result);
static int indexMergeCacheIntoTindex(SIndex *sIdx);

static void indexInterResultsDestroy(SArray *results);
47
static int  indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *finalResult);
dengyihao's avatar
dengyihao 已提交
48

dengyihao's avatar
dengyihao 已提交
49
int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) {
dengyihao's avatar
dengyihao 已提交
50
  pthread_once(&isInit, indexInit);
dengyihao's avatar
dengyihao 已提交
51
  SIndex *sIdx = calloc(1, sizeof(SIndex));
dengyihao's avatar
dengyihao 已提交
52
  if (sIdx == NULL) { return -1; }
dengyihao's avatar
dengyihao 已提交
53

54 55
#ifdef USE_LUCENE
  index_t *index = index_open(path);
dengyihao's avatar
dengyihao 已提交
56
  sIdx->index = index;
dengyihao's avatar
dengyihao 已提交
57
#endif
dengyihao's avatar
dengyihao 已提交
58

dengyihao's avatar
dengyihao 已提交
59
#ifdef USE_INVERTED_INDEX
60 61 62 63 64
  sIdx->cache = (void *)indexCacheCreate();
  sIdx->tindex = NULL;
  sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  sIdx->colId = 1;
  sIdx->cVersion = 1;
dengyihao's avatar
dengyihao 已提交
65
  pthread_mutex_init(&sIdx->mtx, NULL);
dengyihao's avatar
dengyihao 已提交
66

67 68
  *index = sIdx;
  return 0;
dengyihao's avatar
dengyihao 已提交
69 70 71 72
#endif

  *index = NULL;
  return -1;
H
refact  
Hongze Cheng 已提交
73
}
dengyihao's avatar
dengyihao 已提交
74

dengyihao's avatar
dengyihao 已提交
75
void indexClose(SIndex *sIdx) {
76 77
#ifdef USE_LUCENE
  index_close(sIdex->index);
dengyihao's avatar
dengyihao 已提交
78
  sIdx->index = NULL;
H
refact  
Hongze Cheng 已提交
79
#endif
dengyihao's avatar
dengyihao 已提交
80

dengyihao's avatar
dengyihao 已提交
81
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
82
  indexCacheDestroy(sIdx->cache);
83
  taosHashCleanup(sIdx->colObj);
dengyihao's avatar
dengyihao 已提交
84
  pthread_mutex_destroy(&sIdx->mtx);
dengyihao's avatar
dengyihao 已提交
85
#endif
86
  free(sIdx);
dengyihao's avatar
dengyihao 已提交
87 88
  return;
}
dengyihao's avatar
dengyihao 已提交
89

90 91 92 93 94 95
int indexPut(SIndex *index, SIndexMultiTerm *fVals, uint64_t uid) {
#ifdef USE_LUCENE
  index_document_t *doc = index_document_create();

  char buf[16] = {0};
  sprintf(buf, "%d", uid);
dengyihao's avatar
dengyihao 已提交
96

97 98 99 100 101 102 103 104
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
    SIndexTerm *p = taosArrayGetP(fVals, i);
    index_document_add(doc, (const char *)(p->key), p->nKey, (const char *)(p->val), p->nVal, 1);
  }
  index_document_add(doc, NULL, 0, buf, strlen(buf), 0);

  index_put(index->index, doc);
  index_document_destroy(doc);
dengyihao's avatar
dengyihao 已提交
105
#endif
dengyihao's avatar
dengyihao 已提交
106

dengyihao's avatar
dengyihao 已提交
107
#ifdef USE_INVERTED_INDEX
108 109 110

  // TODO(yihao): reduce the lock range
  pthread_mutex_lock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
111
  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
112
    SIndexTerm * p = taosArrayGetP(fVals, i);
dengyihao's avatar
dengyihao 已提交
113
    SIdxColInfo *fi = taosHashGet(index->colObj, p->colName, p->nColName);
dengyihao's avatar
dengyihao 已提交
114
    if (fi == NULL) {
dengyihao's avatar
dengyihao 已提交
115
      SIdxColInfo tfi = {.colId = index->colId};
116
      index->cVersion++;
dengyihao's avatar
dengyihao 已提交
117
      index->colId++;
118
      taosHashPut(index->colObj, p->colName, p->nColName, &tfi, sizeof(tfi));
dengyihao's avatar
dengyihao 已提交
119
    } else {
120
      // TODO, del
dengyihao's avatar
dengyihao 已提交
121
    }
122
  }
dengyihao's avatar
dengyihao 已提交
123
  pthread_mutex_unlock(&index->mtx);
dengyihao's avatar
dengyihao 已提交
124 125

  for (int i = 0; i < taosArrayGetSize(fVals); i++) {
126
    SIndexTerm * p = taosArrayGetP(fVals, i);
dengyihao's avatar
dengyihao 已提交
127
    SIdxColInfo *fi = taosHashGet(index->colObj, p->colName, p->nColName);
128 129
    assert(fi != NULL);
    int32_t colId = fi->colId;
dengyihao's avatar
dengyihao 已提交
130
    int32_t version = index->cVersion;
131
    int     ret = indexCachePut(index->cache, p, colId, version, uid);
dengyihao's avatar
dengyihao 已提交
132
    if (ret != 0) { return ret; }
dengyihao's avatar
dengyihao 已提交
133
  }
dengyihao's avatar
dengyihao 已提交
134 135
#endif

dengyihao's avatar
dengyihao 已提交
136
  return 0;
dengyihao's avatar
dengyihao 已提交
137
}
dengyihao's avatar
dengyihao 已提交
138
int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result) {
139 140
#ifdef USE_LUCENE
  EIndexOperatorType opera = multiQuerys->opera;
dengyihao's avatar
dengyihao 已提交
141

142
  int    nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
143
  char **fields = malloc(sizeof(char *) * nQuery);
144 145
  char **keys = malloc(sizeof(char *) * nQuery);
  int *  types = malloc(sizeof(int) * nQuery);
dengyihao's avatar
dengyihao 已提交
146 147

  for (int i = 0; i < nQuery; i++) {
148 149 150 151 152 153 154 155 156 157 158 159
    SIndexTermQuery *p = taosArrayGet(multiQuerys->query, i);
    SIndexTerm *     term = p->field_value;

    fields[i] = calloc(1, term->nKey + 1);
    keys[i] = calloc(1, term->nVal + 1);

    memcpy(fields[i], term->key, term->nKey);
    memcpy(keys[i], term->val, term->nVal);
    types[i] = (int)(p->type);
  }
  int *tResult = NULL;
  int  tsz = 0;
dengyihao's avatar
dengyihao 已提交
160
  index_multi_search(index->index, (const char **)fields, (const char **)keys, types, nQuery, opera, &tResult, &tsz);
161

dengyihao's avatar
dengyihao 已提交
162
  for (int i = 0; i < tsz; i++) { taosArrayPush(result, &tResult[i]); }
dengyihao's avatar
dengyihao 已提交
163 164 165 166 167 168 169 170

  for (int i = 0; i < nQuery; i++) {
    free(fields[i]);
    free(keys[i]);
  }
  free(fields);
  free(keys);
  free(types);
dengyihao's avatar
dengyihao 已提交
171 172
#endif

dengyihao's avatar
dengyihao 已提交
173
#ifdef USE_INVERTED_INDEX
174 175
  EIndexOperatorType opera = multiQuerys->opera;  // relation of querys

dengyihao's avatar
dengyihao 已提交
176
  SArray *interResults = taosArrayInit(4, POINTER_BYTES);
177
  int     nQuery = taosArrayGetSize(multiQuerys->query);
dengyihao's avatar
dengyihao 已提交
178
  for (size_t i = 0; i < nQuery; i++) {
179 180 181 182 183
    SIndexTermQuery *qTerm = taosArrayGet(multiQuerys->query, i);
    SArray *         tResult = NULL;
    indexTermSearch(index, qTerm, &tResult);
    taosArrayPush(interResults, (void *)&tResult);
  }
dengyihao's avatar
dengyihao 已提交
184 185
  indexMergeFinalResults(interResults, opera, result);
  indexInterResultsDestroy(interResults);
186

dengyihao's avatar
dengyihao 已提交
187
#endif
dengyihao's avatar
dengyihao 已提交
188 189 190
  return 1;
}

dengyihao's avatar
dengyihao 已提交
191
int indexDelete(SIndex *index, SIndexMultiTermQuery *query) {
dengyihao's avatar
dengyihao 已提交
192
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
193
#endif
194

dengyihao's avatar
dengyihao 已提交
195 196
  return 1;
}
197
int indexRebuild(SIndex *index, SIndexOpts *opts){
dengyihao's avatar
dengyihao 已提交
198
#ifdef USE_INVERTED_INDEX
dengyihao's avatar
dengyihao 已提交
199 200 201
#endif

}
dengyihao's avatar
dengyihao 已提交
202 203

SIndexOpts *indexOptsCreate() {
204
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
205
#endif
206
  return NULL;
dengyihao's avatar
dengyihao 已提交
207
}
208 209
void indexOptsDestroy(SIndexOpts *opts){
#ifdef USE_LUCENE
dengyihao's avatar
dengyihao 已提交
210
#endif
211 212 213 214
} /*
   * @param: oper
   *
   */
dengyihao's avatar
dengyihao 已提交
215

dengyihao's avatar
dengyihao 已提交
216 217
SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) {
  SIndexMultiTermQuery *p = (SIndexMultiTermQuery *)malloc(sizeof(SIndexMultiTermQuery));
dengyihao's avatar
dengyihao 已提交
218
  if (p == NULL) { return NULL; }
219 220
  p->opera = opera;
  p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
dengyihao's avatar
dengyihao 已提交
221 222 223 224 225
  return p;
}
void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery) {
  for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
    SIndexTermQuery *p = (SIndexTermQuery *)taosArrayGet(pQuery->query, i);
dengyihao's avatar
dengyihao 已提交
226
    indexTermDestroy(p->term);
dengyihao's avatar
dengyihao 已提交
227
  }
228
  taosArrayDestroy(pQuery->query);
dengyihao's avatar
dengyihao 已提交
229 230
  free(pQuery);
};
231 232
int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EIndexQueryType qType) {
  SIndexTermQuery q = {.qType = qType, .term = term};
dengyihao's avatar
dengyihao 已提交
233 234
  taosArrayPush(pQuery->query, &q);
  return 0;
dengyihao's avatar
dengyihao 已提交
235 236
}

237 238 239
SIndexTerm *indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char *colName,
    int32_t nColName, const char *colVal, int32_t nColVal) {
  SIndexTerm *t = (SIndexTerm *)calloc(1, (sizeof(SIndexTerm)));
dengyihao's avatar
dengyihao 已提交
240
  if (t == NULL) { return NULL; }
dengyihao's avatar
dengyihao 已提交
241

242 243
  t->suid = suid;
  t->operType = oper;
dengyihao's avatar
dengyihao 已提交
244 245
  t->colType = colType;

246
  t->colName = (char *)calloc(1, nColName + 1);
dengyihao's avatar
dengyihao 已提交
247 248
  memcpy(t->colName, colName, nColName);
  t->nColName = nColName;
dengyihao's avatar
dengyihao 已提交
249

dengyihao's avatar
dengyihao 已提交
250 251 252
  t->colVal = (char *)calloc(1, nColVal + 1);
  memcpy(t->colVal, colVal, nColVal);
  t->nColVal = nColVal;
dengyihao's avatar
dengyihao 已提交
253
  return t;
dengyihao's avatar
dengyihao 已提交
254
}
dengyihao's avatar
dengyihao 已提交
255
void indexTermDestroy(SIndexTerm *p) {
dengyihao's avatar
dengyihao 已提交
256 257
  free(p->colName);
  free(p->colVal);
dengyihao's avatar
dengyihao 已提交
258
  free(p);
dengyihao's avatar
dengyihao 已提交
259 260
}

261 262
SIndexMultiTerm *indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm *)); }

dengyihao's avatar
dengyihao 已提交
263
int indexMultiTermAdd(SIndexMultiTerm *terms, SIndexTerm *term) {
264 265
  taosArrayPush(terms, &term);
  return 0;
dengyihao's avatar
dengyihao 已提交
266
}
dengyihao's avatar
dengyihao 已提交
267 268 269
void indexMultiTermDestroy(SIndexMultiTerm *terms) {
  for (int32_t i = 0; i < taosArrayGetSize(terms); i++) {
    SIndexTerm *p = taosArrayGetP(terms, i);
dengyihao's avatar
dengyihao 已提交
270 271
    indexTermDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
272
  taosArrayDestroy(terms);
dengyihao's avatar
dengyihao 已提交
273
}
dengyihao's avatar
dengyihao 已提交
274

dengyihao's avatar
dengyihao 已提交
275
void indexInit() {
276
  // do nothing
dengyihao's avatar
dengyihao 已提交
277
}
dengyihao's avatar
dengyihao 已提交
278
static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *query, SArray **result) {
279 280
  int32_t      version = -1;
  int16_t      colId = -1;
dengyihao's avatar
dengyihao 已提交
281 282 283 284
  SIdxColInfo *colInfo = NULL;

  SIndexTerm *term = query->term;
  const char *colName = term->colName;
285
  int32_t     nColName = term->nColName;
dengyihao's avatar
dengyihao 已提交
286

287 288
  pthread_mutex_lock(&sIdx->mtx);
  colInfo = taosHashGet(sIdx->colObj, colName, nColName);
dengyihao's avatar
dengyihao 已提交
289
  if (colInfo == NULL) {
290 291
    pthread_mutex_unlock(&sIdx->mtx);
    return -1;
dengyihao's avatar
dengyihao 已提交
292
  }
293
  colId = colInfo->colId;
dengyihao's avatar
dengyihao 已提交
294
  version = colInfo->cVersion;
295 296
  pthread_mutex_unlock(&sIdx->mtx);

dengyihao's avatar
dengyihao 已提交
297
  *result = taosArrayInit(4, sizeof(uint64_t));
298 299
  // TODO: iterator mem and tidex
  STermValueType s;
dengyihao's avatar
dengyihao 已提交
300 301 302
  if (0 == indexCacheSearch(sIdx->cache, query, colId, version, *result, &s)) {
    if (s == kTypeDeletion) {
      indexInfo("col: %s already drop by other opera", term->colName);
303
      // coloum already drop by other oper, no need to query tindex
dengyihao's avatar
dengyihao 已提交
304 305 306
      return 0;
    } else {
      if (0 != indexTFileSearch(sIdx->tindex, query, *result)) {
307 308 309
        indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal);
        return -1;
      }
dengyihao's avatar
dengyihao 已提交
310 311 312 313 314 315
    }
  } else {
    indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal);
    return -1;
  }
  return 0;
dengyihao's avatar
dengyihao 已提交
316 317
}
static void indexInterResultsDestroy(SArray *results) {
dengyihao's avatar
dengyihao 已提交
318
  if (results == NULL) { return; }
dengyihao's avatar
dengyihao 已提交
319 320 321 322

  size_t sz = taosArrayGetSize(results);
  for (size_t i = 0; i < sz; i++) {
    SArray *p = taosArrayGetP(results, i);
323 324
    taosArrayDestroy(p);
  }
dengyihao's avatar
dengyihao 已提交
325 326 327
  taosArrayDestroy(results);
}
static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *fResults) {
328 329 330
  // refactor, merge interResults into fResults by oType
  SArray *first = taosArrayGetP(interResults, 0);
  taosArraySort(first, uidCompare);
dengyihao's avatar
dengyihao 已提交
331
  taosArrayRemoveDuplicate(first, uidCompare, NULL);
dengyihao's avatar
dengyihao 已提交
332

dengyihao's avatar
dengyihao 已提交
333
  if (oType == MUST) {
334 335
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
dengyihao's avatar
dengyihao 已提交
336
  } else if (oType == SHOULD) {
337 338
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
dengyihao's avatar
dengyihao 已提交
339 340
    // tag1 condistion || tag2 condition
  } else if (oType == NOT) {
341 342 343
    // just one column index, enhance later
    taosArrayAddAll(fResults, first);
    // not use currently
dengyihao's avatar
dengyihao 已提交
344 345 346 347
  }
  return 0;
}
static int indexMergeCacheIntoTindex(SIndex *sIdx) {
dengyihao's avatar
dengyihao 已提交
348
  if (sIdx == NULL) { return -1; }
349
  indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
dengyihao's avatar
dengyihao 已提交
350 351
  return 0;
}