diff --git a/source/dnode/vnode/meta/src/metaIdx.c b/source/dnode/vnode/meta/src/metaIdx.c index 828bd120881fca49fabd2b66ce0e626e8ddc12b2..3da56fc394f382e6235b201ce180ee0abfa674bd 100644 --- a/source/dnode/vnode/meta/src/metaIdx.c +++ b/source/dnode/vnode/meta/src/metaIdx.c @@ -49,9 +49,7 @@ int metaOpenIdx(SMeta *pMeta) { #ifdef USE_INVERTED_INDEX SIndexOpts opts; - if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) { - return -1; - } + if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) { return -1; } #endif return 0; @@ -67,16 +65,14 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */ #ifdef USE_INVERTED_INDEX SIndexOpts opts; - if (indexClose(pMeta->pIdx->pIdx) != 0) { - return -1; - } + if (indexClose(pMeta->pIdx->pIdx) != 0) { return -1; } #endif } int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) { #ifdef USE_INVERTED_INDEX - if (pTbCfgs - type == META_CHILD_TABLE) { + if (pTbCfgs->type == META_CHILD_TABLE) { char buf[8] = {0}; int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId; sprintf(buf, "%d", colId); // colname diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 19e93754914dac64076db779b455fa0b5d8dff26..0c222eae1a0b7a72ef252fce4afea9395ca78bf4 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -59,6 +59,10 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); +// merge cache and tfile by opera type +static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv); +static void indexMergeSameKey(SArray* result, TFileValue* tv); + int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { pthread_once(&isInit, indexInit); SIndex* sIdx = calloc(1, sizeof(SIndex)); @@ -385,6 +389,27 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) { taosArrayPush(result, &tv); } } +static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv) { + // opt + char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; + // design merge-algorithm later, too complicated to handle all kind of situation + TFileValue* tfv = tfileValueCreate(colVal); + if (cv != NULL) { + if (cv->type == ADD_VALUE) { + taosArrayAddAll(tfv->tableId, cv->val); + } else if (cv->type == DEL_VALUE) { + } else if (cv->type == UPDATE_VALUE) { + } else { + // do nothing + } + } + if (tv != NULL) { + // opt later + taosArrayAddAll(tfv->tableId, tv->val); + } + + indexMergeSameKey(result, tfv); +} static void indexDestroyTempResult(SArray* result) { int32_t sz = result ? taosArrayGetSize(result) : 0; for (size_t i = 0; i < sz; i++) { @@ -411,51 +436,30 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false; - while (cn == true && tn == true) { - IterateValue* cv = cacheIter->getValue(cacheIter); - IterateValue* tv = tfileIter->getValue(tfileIter); - - // dump value - int comp = strcmp(cv->colVal, tv->colVal); + while (cn == true || tn == true) { + IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; + IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; + + int comp = 0; + if (cn == true && tn == true) { + comp = strcmp(cv->colVal, tv->colVal); + } else if (cn == true) { + comp = -1; + } else { + comp = 1; + } if (comp == 0) { - TFileValue* tfv = tfileValueCreate(cv->colVal); - taosArrayAddAll(tfv->tableId, cv->val); - taosArrayAddAll(tfv->tableId, tv->val); - indexMergeSameKey(result, tfv); - + indexMergeCacheAndTFile(result, cv, tv); cn = cacheIter->next(cacheIter); tn = tfileIter->next(tfileIter); - continue; } else if (comp < 0) { - TFileValue* tfv = tfileValueCreate(cv->colVal); - taosArrayAddAll(tfv->tableId, cv->val); - - indexMergeSameKey(result, tfv); - // copy to final Result; + indexMergeCacheAndTFile(result, cv, NULL); cn = cacheIter->next(cacheIter); } else { - TFileValue* tfv = tfileValueCreate(tv->colVal); - taosArrayAddAll(tfv->tableId, tv->val); - - indexMergeSameKey(result, tfv); - // copy to final result + indexMergeCacheAndTFile(result, NULL, tv); tn = tfileIter->next(tfileIter); } } - while (cn == true) { - IterateValue* cv = cacheIter->getValue(cacheIter); - TFileValue* tfv = tfileValueCreate(cv->colVal); - taosArrayAddAll(tfv->tableId, cv->val); - indexMergeSameKey(result, tfv); - cn = cacheIter->next(cacheIter); - } - while (tn == true) { - IterateValue* tv = tfileIter->getValue(tfileIter); - TFileValue* tfv = tfileValueCreate(tv->colVal); - taosArrayAddAll(tfv->tableId, tv->val); - indexMergeSameKey(result, tfv); - tn = tfileIter->next(tfileIter); - } int ret = indexGenTFile(sIdx, pCache, result); indexDestroyTempResult(result); @@ -503,7 +507,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { tfileWriterClose(tw); TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName); - if (reader == NULL) { goto END; } + if (reader == NULL) { return -1; } TFileHeader* header = &reader->header; ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 294c8192e8f030ad59a8a80b18a31d3b6d2715cf..48566a86744e6d68360fdc2c0c682882e6cf6b2d 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -217,9 +217,9 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { // set value ct->uid = uid; ct->operaType = term->operType; - // ugly code, refactor later int64_t estimate = sizeof(ct) + strlen(ct->colVal); + pthread_mutex_lock(&pCache->mtx); pCache->occupiedMem += estimate; indexCacheMakeRoomForWrite(pCache); @@ -331,7 +331,6 @@ static char* indexCacheTermGet(const void* pData) { static int32_t indexCacheTermCompare(const void* l, const void* r) { CacheTerm* lt = (CacheTerm*)l; CacheTerm* rt = (CacheTerm*)r; - // compare colVal int32_t cmp = strcmp(lt->colVal, rt->colVal); if (cmp == 0) { return rt->version - lt->version; } @@ -359,17 +358,32 @@ static bool indexCacheIteratorNext(Iterate* itera) { IterateValue* iv = &itera->val; iterateValueDestroy(iv, false); + // IterateValue* iv = &itera->val; + // IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))}; + bool next = tSkipListIterNext(iter); if (next) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); + // equal func + // if (iv->colVal != NULL && ct->colVal != NULL) { + // if (0 == strcmp(iv->colVal, ct->colVal)) { if (iv->type == ADD_VALUE) } + //} else { + // tIterVal.colVal = calloc(1, strlen(ct->colVal) + 1); + // tIterval.colVal = tstrdup(ct->colVal); + //} iv->type = ct->operaType; - iv->colVal = calloc(1, strlen(ct->colVal) + 1); - memcpy(iv->colVal, ct->colVal, strlen(ct->colVal)); + iv->colVal = tstrdup(ct->colVal); + // iv->colVal = calloc(1, strlen(ct->colVal) + 1); + // memcpy(iv->colVal, ct->colVal, strlen(ct->colVal)); taosArrayPush(iv->val, &ct->uid); } + // IterateValue* iv = &itera->val; + // iterateValueDestroy(iv, true); + //*iv = tIterVal; + return next; } diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 4f782cef26a78c9fce89d5ce94f00d516620d0ce..5299a7dc5f5d71dc0308bd8d5f1ed29d3caadfe6 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -936,6 +936,7 @@ Fst* fstCreate(FstSlice* slice) { len -= sizeof(checkSum); taosDecodeFixedU32(buf + len, &checkSum); if (taosCheckChecksum(buf, len, checkSum)) { + indexError("index file is corrupted"); // verify fst return NULL; } diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 6db5555aa6ad09586196ba9111d889ead62d6f61..b57f639726c5f738d2ba9d69b3aab74cb6f02dbf 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -60,9 +60,10 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off return nRead; } static int writeCtxGetSize(WriterCtx* ctx) { - if (ctx->type == TFile && ctx->file.readOnly) { - // refactor later - return ctx->file.size; + if (ctx->type == TFile) { + struct stat fstat; + stat(ctx->file.buf, &fstat); + return fstat.st_size; } return 0; } @@ -88,7 +89,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int if (readOnly == false) { // ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); ctx->file.fd = tfOpenCreateWriteAppend(path); - + tfFtruncate(ctx->file.fd, 0); struct stat fstat; stat(path, &fstat); ctx->file.size = fstat.st_size; @@ -138,6 +139,11 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { munmap(ctx->file.ptr, ctx->file.size); #endif } + if (ctx->file.readOnly == false) { + struct stat fstat; + stat(ctx->file.buf, &fstat); + // indexError("write file size: %d", (int)(fstat.st_size)); + } if (remove) { unlink(ctx->file.buf); } } free(ctx); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 4b764025605f06d2424a96b7a23338f225d106ce..98fede4f7b142132bbf0838762f488b19bd990ec 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -147,21 +147,22 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { reader->ctx = ctx; if (0 != tfileReaderVerify(reader)) { - tfileReaderDestroy(reader); indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); + tfileReaderDestroy(reader); return NULL; } // T_REF_INC(reader); if (0 != tfileReaderLoadHeader(reader)) { - tfileReaderDestroy(reader); indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); + tfileReaderDestroy(reader); return NULL; } if (0 != tfileReaderLoadFst(reader)) { + indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s, errno: %d", reader->header.suid, + reader->header.colName, errno); tfileReaderDestroy(reader); - indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); return NULL; } @@ -303,6 +304,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { } else { // indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset, // (int)taosArrayGetSize(v->tableId)); + + // indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx)); } } fstBuilderFinish(tw->fb); @@ -485,7 +488,9 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) { static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { int32_t fstOffset = offset + sizeof(tw->header.fstOffset); tw->header.fstOffset = fstOffset; + if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; } + indexInfo("tfile write fst offset: %d", tw->ctx->size(tw->ctx)); tw->offset += sizeof(fstOffset); return 0; } @@ -495,8 +500,11 @@ static int tfileWriteHeader(TFileWriter* writer) { TFileHeader* header = &writer->header; memcpy(buf, (char*)header, sizeof(buf)); + indexInfo("tfile pre write header size: %d", writer->ctx->size(writer->ctx)); int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf)); if (sizeof(buf) != nwrite) { return -1; } + + indexInfo("tfile after write header size: %d", writer->ctx->size(writer->ctx)); writer->offset = nwrite; return 0; } @@ -521,6 +529,8 @@ static int tfileWriteFooter(TFileWriter* write) { void* pBuf = (void*)buf; taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber); int nwrite = write->ctx->write(write->ctx, buf, strlen(buf)); + + indexInfo("tfile write footer size: %d", write->ctx->size(write->ctx)); assert(nwrite == sizeof(tfileMagicNumber)); return nwrite; }