diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index f6ff9bc1392aa4ad2d065c00c3b1942da90d1486..cc740826e9add90f9ac9d5cf28cf6bb0989e2d49 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -37,9 +37,11 @@ struct SIndex { #endif void *cache; void *tindex; - SHashObj *fieldObj; // - uint64_t suid; - int fieldId; + SHashObj *fieldObj;// < field name, field id> + + int64_t suid; // current super table id, -1 is normal table + int fieldId; // field id allocated to cache + int32_t cVersion; // current version allocated to cache pthread_mutex_t mtx; }; @@ -58,6 +60,7 @@ struct SIndexMultiTermQuery { // field and key; typedef struct SIndexTerm { + uint8_t type; // term data type, str/interger/json char *key; int32_t nKey; char *val; diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 27e095ff31676df7a5623b6468e56fa03f179007..ff915a39308f376c342d45b04018542c2f5bf198 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -20,8 +20,8 @@ // ----------------- row structure in skiplist --------------------- /* A data row, the format is like below: - * |<--totalLen-->|<-- fieldId-->|<-- value len--->|<-- value-->|<--version--->|<-- itermType -->| - * + * content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid -->|<--version--->|<-- itermType -->| + * len : |<--int32_t -->|<-- int16_t-->|<-- int16_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->| */ #ifdef __cplusplus @@ -30,7 +30,6 @@ extern "C" { typedef struct IndexCache { T_REF_DECLARE() - int cVersion; // } IndexCache; @@ -39,7 +38,8 @@ IndexCache *indexCacheCreate(); void indexCacheDestroy(IndexCache *cache); -int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldVale, int32_t fvlen, uint64_t uid, int8_t operaType); +int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, + uint32_t version, uint64_t uid, int8_t operType); int indexCacheGet(IndexCache *cache, uint64_t *rst); int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index c01141118988d2b729e902334f2f1ba587de8961..6a2697491d4b570d80842e2ee2e4394193663b23 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -23,7 +23,8 @@ typedef struct SIdxFieldInfo { - int id; // generated by index internal + int fieldId; // generated by index internal + int cVersion; int type; // field type } SIdxFieldInfo; @@ -39,7 +40,7 @@ static int indexMergeCacheIntoTindex(struct SIndex *sIdx) { } SIndex *indexOpen(SIndexOpts *opts, const char *path) { pthread_once(&isInit, indexInit); - SIndex *sIdx = malloc(sizeof(SIndex)); + SIndex *sIdx = calloc(1, sizeof(SIndex)); #ifdef USE_LUCENE index_t *index = index_open(path); @@ -49,6 +50,8 @@ SIndex *indexOpen(SIndexOpts *opts, const char *path) { sIdx->cache = (void*)indexCacheCreate(); sIdx->tindex = NULL; sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + sIdx->fieldId = 1; + sIdx->cVersion = 1; pthread_mutex_init(&sIdx->mtx, NULL); return sIdx; } @@ -65,7 +68,7 @@ void indexClose(SIndex *sIdx) { return; } -int indexPut(SIndex *index, SArray* field_vals, int uid) { +int indexPut(SIndex *index, SArray* fVals, int uid) { #ifdef USE_LUCENE index_document_t *doc = index_document_create(); @@ -73,8 +76,8 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) { char buf[16] = {0}; sprintf(buf, "%d", uid); - for (int i = 0; i < taosArrayGetSize(field_vals); i++) { - SIndexTerm *p = taosArrayGetP(field_vals, i); + for (int i = 0; i < taosArrayGetSize(fVals); i++) { + SIndexTerm *p = taosArrayGetP(fVals, i); index_document_add(doc, (const char *)(p->key), p->nKey, (const char *)(p->val), p->nVal, 1); } index_document_add(doc, NULL, 0, buf, strlen(buf), 0); @@ -82,10 +85,33 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) { index_put(index->index, doc); index_document_destroy(doc); #endif + + //TODO(yihao): reduce the lock range pthread_mutex_lock(&index->mtx); + for (int i = 0; i < taosArrayGetSize(fVals); i++) { + SIndexTerm *p = taosArrayGetP(fVals, i); + SIdxFieldInfo *fi = taosHashGet(index->fieldObj, p->key, p->nKey); + if (fi == NULL) { + SIdxFieldInfo tfi = {.fieldId = index->fieldId, .type = p->type}; + index->cVersion++; + index->fieldId++; + taosHashPut(index->fieldObj, p->key, p->nKey, &tfi, sizeof(tfi)); + } else { + //TODO, del + } + } + + for (int i = 0; i < taosArrayGetSize(fVals); i++) { + SIndexTerm *p = taosArrayGetP(fVals, i); + SIdxFieldInfo *fi = taosHashGet(index->fieldObj, p->key, p->nKey); + assert(fi != NULL); + int32_t fieldId = fi->fieldId; + int32_t colType = fi->type; + int32_t version = index->cVersion; + + } pthread_mutex_unlock(&index->mtx); return 1; - } int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result) { #ifdef USE_LUCENE @@ -152,7 +178,7 @@ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) { SIndexMultiTermQuery *p = (SIndexMultiTermQuery *)malloc(sizeof(SIndexMultiTermQuery)); if (p == NULL) { return NULL; } p->opera = opera; - p->query = taosArrayInit(1, sizeof(SIndexTermQuery)); + p->query = taosArrayInit(4, sizeof(SIndexTermQuery)); return p; } void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery) { diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 7c355b0f0a5e712a8ce0143256a9cd76a2fd6632..2ecc823ef9e73b40e8497a0ce2468d2ada67fb01 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -19,7 +19,7 @@ static int32_t compareKey(const void *l, const void *r) { char *lp = (char *)l; char *rp = (char *)r; - // skip total len + // skip total len, not compare int32_t ll, rl; // len memcpy(&ll, lp, sizeof(int32_t)); memcpy(&rl, rp, sizeof(int32_t)); @@ -27,7 +27,7 @@ static int32_t compareKey(const void *l, const void *r) { rp += sizeof(int32_t); // compare field id - int32_t lf, rf; // field id + int16_t lf, rf; // field id memcpy(&lf, lp, sizeof(lf)); memcpy(&rf, rp, sizeof(rf)); if (lf != rf) { @@ -36,14 +36,22 @@ static int32_t compareKey(const void *l, const void *r) { lp += sizeof(lf); rp += sizeof(rf); - // compare field value + // compare field type + int16_t lft, rft; + memcpy(&lft, lp, sizeof(lft)); + memcpy(&rft, rp, sizeof(rft)); + lp += sizeof(lft); + rp += sizeof(rft); + assert(rft == rft); + + // skip value len int32_t lfl, rfl; memcpy(&lfl, lp, sizeof(lfl)); memcpy(&rfl, rp, sizeof(rfl)); lp += sizeof(lfl); rp += sizeof(rfl); - //refator later + // compare value int32_t i, j; for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) { if (lp[i] == rp[j]) { continue; } @@ -54,17 +62,24 @@ static int32_t compareKey(const void *l, const void *r) { lp += lfl; rp += rfl; - // compare version + // skip uid + uint64_t lu, ru; + memcpy(&lu, lp, sizeof(lu)); + memcpy(&ru, rp, sizeof(ru)); + lp += sizeof(lu); + rp += sizeof(ru); + + // compare version, desc order int32_t lv, rv; memcpy(&lv, lp, sizeof(lv)); memcpy(&rv, rp, sizeof(rv)); if (lv != rv) { return lv > rv ? -1 : 1; - } + } lp += sizeof(lv); rp += sizeof(rv); + // not care item type - return 0; } @@ -77,35 +92,40 @@ void indexCacheDestroy(IndexCache *cache) { free(cache); } -int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { +int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, + uint32_t version, uint64_t uid, int8_t operType) { if (cache == NULL) { return -1;} - int32_t version = T_REF_INC(cache); - int32_t total = sizeof(int32_t) + sizeof(fieldId) + 4 + fvlen + sizeof(version) + sizeof(uid) + sizeof(operType); + int32_t total = sizeof(int32_t) + sizeof(fieldId) + sizeof(fieldType) + sizeof(fvLen) + fvLen + sizeof(version) + sizeof(uid) + sizeof(operType); char *buf = calloc(1, total); char *p = buf; - memcpy(buf, &total, sizeof(total)); - total += total; + memcpy(p, &total, sizeof(total)); + p += sizeof(total); - memcpy(buf, &fieldId, sizeof(fieldId)); - buf += sizeof(fieldId); + memcpy(p, &fieldId, sizeof(fieldId)); + p += sizeof(fieldId); - memcpy(buf, &fvlen, sizeof(fvlen)); - buf += sizeof(fvlen); - memcpy(buf, fieldValue, fvlen); - buf += fvlen; + memcpy(p, &fieldType, sizeof(fieldType)); + p += sizeof(fieldType); + + memcpy(p, &fvLen, sizeof(fvLen)); + p += sizeof(fvLen); + memcpy(p, fieldValue, fvLen); + p += fvLen; - memcpy(buf, &version, sizeof(version)); - buf += sizeof(version); + memcpy(p, &version, sizeof(version)); + p += sizeof(version); - memcpy(buf, &uid, sizeof(uid)); - buf += sizeof(uid); + memcpy(p, &uid, sizeof(uid)); + p += sizeof(uid); - memcpy(buf, &operType, sizeof(operType)); - buf += sizeof(operType); - + memcpy(p, &operType, sizeof(operType)); + p += sizeof(operType); + +} +int indexCacheDel(IndexCache *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { } int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result) {