提交 10a7547b 编写于 作者: dengyihao's avatar dengyihao

update index cache

上级 7daca7ff
...@@ -37,9 +37,11 @@ struct SIndex { ...@@ -37,9 +37,11 @@ struct SIndex {
#endif #endif
void *cache; void *cache;
void *tindex; void *tindex;
SHashObj *fieldObj; // <field name, field id> SHashObj *fieldObj;// < field name, field id>
uint64_t suid;
int fieldId; int64_t suid; // current super table id, -1 is normal table
int fieldId; // field id allocated to cache
int32_t cVersion; // current version allocated to cache
pthread_mutex_t mtx; pthread_mutex_t mtx;
}; };
...@@ -58,6 +60,7 @@ struct SIndexMultiTermQuery { ...@@ -58,6 +60,7 @@ struct SIndexMultiTermQuery {
// field and key; // field and key;
typedef struct SIndexTerm { typedef struct SIndexTerm {
uint8_t type; // term data type, str/interger/json
char *key; char *key;
int32_t nKey; int32_t nKey;
char *val; char *val;
......
...@@ -20,8 +20,8 @@ ...@@ -20,8 +20,8 @@
// ----------------- row structure in skiplist --------------------- // ----------------- row structure in skiplist ---------------------
/* A data row, the format is like below: /* A data row, the format is like below:
* |<--totalLen-->|<-- fieldId-->|<-- value len--->|<-- value-->|<--version--->|<-- itermType -->| * content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid -->|<--version--->|<-- itermType -->|
* * len : |<--int32_t -->|<-- int16_t-->|<-- int16_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->|
*/ */
#ifdef __cplusplus #ifdef __cplusplus
...@@ -30,7 +30,6 @@ extern "C" { ...@@ -30,7 +30,6 @@ extern "C" {
typedef struct IndexCache { typedef struct IndexCache {
T_REF_DECLARE() T_REF_DECLARE()
int cVersion; //
} IndexCache; } IndexCache;
...@@ -39,7 +38,8 @@ IndexCache *indexCacheCreate(); ...@@ -39,7 +38,8 @@ IndexCache *indexCacheCreate();
void indexCacheDestroy(IndexCache *cache); void indexCacheDestroy(IndexCache *cache);
int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldVale, int32_t fvlen, uint64_t uid, int8_t operaType); int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen,
uint32_t version, uint64_t uid, int8_t operType);
int indexCacheGet(IndexCache *cache, uint64_t *rst); int indexCacheGet(IndexCache *cache, uint64_t *rst);
int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result); int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result);
......
...@@ -23,7 +23,8 @@ ...@@ -23,7 +23,8 @@
typedef struct SIdxFieldInfo { typedef struct SIdxFieldInfo {
int id; // generated by index internal int fieldId; // generated by index internal
int cVersion;
int type; // field type int type; // field type
} SIdxFieldInfo; } SIdxFieldInfo;
...@@ -39,7 +40,7 @@ static int indexMergeCacheIntoTindex(struct SIndex *sIdx) { ...@@ -39,7 +40,7 @@ static int indexMergeCacheIntoTindex(struct SIndex *sIdx) {
} }
SIndex *indexOpen(SIndexOpts *opts, const char *path) { SIndex *indexOpen(SIndexOpts *opts, const char *path) {
pthread_once(&isInit, indexInit); pthread_once(&isInit, indexInit);
SIndex *sIdx = malloc(sizeof(SIndex)); SIndex *sIdx = calloc(1, sizeof(SIndex));
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_t *index = index_open(path); index_t *index = index_open(path);
...@@ -49,6 +50,8 @@ SIndex *indexOpen(SIndexOpts *opts, const char *path) { ...@@ -49,6 +50,8 @@ SIndex *indexOpen(SIndexOpts *opts, const char *path) {
sIdx->cache = (void*)indexCacheCreate(); sIdx->cache = (void*)indexCacheCreate();
sIdx->tindex = NULL; sIdx->tindex = NULL;
sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); sIdx->fieldObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->fieldId = 1;
sIdx->cVersion = 1;
pthread_mutex_init(&sIdx->mtx, NULL); pthread_mutex_init(&sIdx->mtx, NULL);
return sIdx; return sIdx;
} }
...@@ -65,7 +68,7 @@ void indexClose(SIndex *sIdx) { ...@@ -65,7 +68,7 @@ void indexClose(SIndex *sIdx) {
return; return;
} }
int indexPut(SIndex *index, SArray* field_vals, int uid) { int indexPut(SIndex *index, SArray* fVals, int uid) {
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_document_t *doc = index_document_create(); index_document_t *doc = index_document_create();
...@@ -73,8 +76,8 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) { ...@@ -73,8 +76,8 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) {
char buf[16] = {0}; char buf[16] = {0};
sprintf(buf, "%d", uid); sprintf(buf, "%d", uid);
for (int i = 0; i < taosArrayGetSize(field_vals); i++) { for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm *p = taosArrayGetP(field_vals, i); SIndexTerm *p = taosArrayGetP(fVals, i);
index_document_add(doc, (const char *)(p->key), p->nKey, (const char *)(p->val), p->nVal, 1); index_document_add(doc, (const char *)(p->key), p->nKey, (const char *)(p->val), p->nVal, 1);
} }
index_document_add(doc, NULL, 0, buf, strlen(buf), 0); index_document_add(doc, NULL, 0, buf, strlen(buf), 0);
...@@ -82,10 +85,33 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) { ...@@ -82,10 +85,33 @@ int indexPut(SIndex *index, SArray* field_vals, int uid) {
index_put(index->index, doc); index_put(index->index, doc);
index_document_destroy(doc); index_document_destroy(doc);
#endif #endif
//TODO(yihao): reduce the lock range
pthread_mutex_lock(&index->mtx); pthread_mutex_lock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm *p = taosArrayGetP(fVals, i);
SIdxFieldInfo *fi = taosHashGet(index->fieldObj, p->key, p->nKey);
if (fi == NULL) {
SIdxFieldInfo tfi = {.fieldId = index->fieldId, .type = p->type};
index->cVersion++;
index->fieldId++;
taosHashPut(index->fieldObj, p->key, p->nKey, &tfi, sizeof(tfi));
} else {
//TODO, del
}
}
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm *p = taosArrayGetP(fVals, i);
SIdxFieldInfo *fi = taosHashGet(index->fieldObj, p->key, p->nKey);
assert(fi != NULL);
int32_t fieldId = fi->fieldId;
int32_t colType = fi->type;
int32_t version = index->cVersion;
}
pthread_mutex_unlock(&index->mtx); pthread_mutex_unlock(&index->mtx);
return 1; return 1;
} }
int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result) { int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result) {
#ifdef USE_LUCENE #ifdef USE_LUCENE
...@@ -152,7 +178,7 @@ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) { ...@@ -152,7 +178,7 @@ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) {
SIndexMultiTermQuery *p = (SIndexMultiTermQuery *)malloc(sizeof(SIndexMultiTermQuery)); SIndexMultiTermQuery *p = (SIndexMultiTermQuery *)malloc(sizeof(SIndexMultiTermQuery));
if (p == NULL) { return NULL; } if (p == NULL) { return NULL; }
p->opera = opera; p->opera = opera;
p->query = taosArrayInit(1, sizeof(SIndexTermQuery)); p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
return p; return p;
} }
void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery) { void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery) {
......
...@@ -19,7 +19,7 @@ static int32_t compareKey(const void *l, const void *r) { ...@@ -19,7 +19,7 @@ static int32_t compareKey(const void *l, const void *r) {
char *lp = (char *)l; char *lp = (char *)l;
char *rp = (char *)r; char *rp = (char *)r;
// skip total len // skip total len, not compare
int32_t ll, rl; // len int32_t ll, rl; // len
memcpy(&ll, lp, sizeof(int32_t)); memcpy(&ll, lp, sizeof(int32_t));
memcpy(&rl, rp, sizeof(int32_t)); memcpy(&rl, rp, sizeof(int32_t));
...@@ -27,7 +27,7 @@ static int32_t compareKey(const void *l, const void *r) { ...@@ -27,7 +27,7 @@ static int32_t compareKey(const void *l, const void *r) {
rp += sizeof(int32_t); rp += sizeof(int32_t);
// compare field id // compare field id
int32_t lf, rf; // field id int16_t lf, rf; // field id
memcpy(&lf, lp, sizeof(lf)); memcpy(&lf, lp, sizeof(lf));
memcpy(&rf, rp, sizeof(rf)); memcpy(&rf, rp, sizeof(rf));
if (lf != rf) { if (lf != rf) {
...@@ -36,14 +36,22 @@ static int32_t compareKey(const void *l, const void *r) { ...@@ -36,14 +36,22 @@ static int32_t compareKey(const void *l, const void *r) {
lp += sizeof(lf); lp += sizeof(lf);
rp += sizeof(rf); rp += sizeof(rf);
// compare field value // compare field type
int16_t lft, rft;
memcpy(&lft, lp, sizeof(lft));
memcpy(&rft, rp, sizeof(rft));
lp += sizeof(lft);
rp += sizeof(rft);
assert(rft == rft);
// skip value len
int32_t lfl, rfl; int32_t lfl, rfl;
memcpy(&lfl, lp, sizeof(lfl)); memcpy(&lfl, lp, sizeof(lfl));
memcpy(&rfl, rp, sizeof(rfl)); memcpy(&rfl, rp, sizeof(rfl));
lp += sizeof(lfl); lp += sizeof(lfl);
rp += sizeof(rfl); rp += sizeof(rfl);
//refator later // compare value
int32_t i, j; int32_t i, j;
for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) { for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) {
if (lp[i] == rp[j]) { continue; } if (lp[i] == rp[j]) { continue; }
...@@ -54,17 +62,24 @@ static int32_t compareKey(const void *l, const void *r) { ...@@ -54,17 +62,24 @@ static int32_t compareKey(const void *l, const void *r) {
lp += lfl; lp += lfl;
rp += rfl; rp += rfl;
// compare version // skip uid
uint64_t lu, ru;
memcpy(&lu, lp, sizeof(lu));
memcpy(&ru, rp, sizeof(ru));
lp += sizeof(lu);
rp += sizeof(ru);
// compare version, desc order
int32_t lv, rv; int32_t lv, rv;
memcpy(&lv, lp, sizeof(lv)); memcpy(&lv, lp, sizeof(lv));
memcpy(&rv, rp, sizeof(rv)); memcpy(&rv, rp, sizeof(rv));
if (lv != rv) { if (lv != rv) {
return lv > rv ? -1 : 1; return lv > rv ? -1 : 1;
} }
lp += sizeof(lv); lp += sizeof(lv);
rp += sizeof(rv); rp += sizeof(rv);
// not care item type
return 0; return 0;
} }
...@@ -77,35 +92,40 @@ void indexCacheDestroy(IndexCache *cache) { ...@@ -77,35 +92,40 @@ void indexCacheDestroy(IndexCache *cache) {
free(cache); free(cache);
} }
int indexCachePut(IndexCache *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen,
uint32_t version, uint64_t uid, int8_t operType) {
if (cache == NULL) { return -1;} if (cache == NULL) { return -1;}
int32_t version = T_REF_INC(cache);
int32_t total = sizeof(int32_t) + sizeof(fieldId) + 4 + fvlen + sizeof(version) + sizeof(uid) + sizeof(operType); int32_t total = sizeof(int32_t) + sizeof(fieldId) + sizeof(fieldType) + sizeof(fvLen) + fvLen + sizeof(version) + sizeof(uid) + sizeof(operType);
char *buf = calloc(1, total); char *buf = calloc(1, total);
char *p = buf; char *p = buf;
memcpy(buf, &total, sizeof(total)); memcpy(p, &total, sizeof(total));
total += total; p += sizeof(total);
memcpy(buf, &fieldId, sizeof(fieldId)); memcpy(p, &fieldId, sizeof(fieldId));
buf += sizeof(fieldId); p += sizeof(fieldId);
memcpy(buf, &fvlen, sizeof(fvlen)); memcpy(p, &fieldType, sizeof(fieldType));
buf += sizeof(fvlen); p += sizeof(fieldType);
memcpy(buf, fieldValue, fvlen);
buf += fvlen; memcpy(p, &fvLen, sizeof(fvLen));
p += sizeof(fvLen);
memcpy(p, fieldValue, fvLen);
p += fvLen;
memcpy(buf, &version, sizeof(version)); memcpy(p, &version, sizeof(version));
buf += sizeof(version); p += sizeof(version);
memcpy(buf, &uid, sizeof(uid)); memcpy(p, &uid, sizeof(uid));
buf += sizeof(uid); p += sizeof(uid);
memcpy(buf, &operType, sizeof(operType)); memcpy(p, &operType, sizeof(operType));
buf += sizeof(operType); p += sizeof(operType);
}
int indexCacheDel(IndexCache *cache, int32_t fieldId, const char *fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
} }
int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result) { int indexCacheSearch(IndexCache *cache, SIndexMultiTermQuery *query, SArray *result) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册