diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 065f4acb576263d1f7d5cbe8238273dc325ccb09..9605528ad6ae150fd88f512cdf5344b81d486a99 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -40,26 +40,31 @@ extern "C" { #define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("IDX", DEBUG_TRACE, idxDebugFlag, __VA_ARGS__);} } while (0) // clang-format on +extern void* indexQhandle; + typedef enum { LT, LE, GT, GE, CONTAINS, EQ } RangeType; typedef enum { kTypeValue, kTypeDeletion } STermValueType; typedef enum { kRebuild, kFinished } SIdxStatus; typedef struct SIndexStat { - int32_t totalAdded; // - int32_t totalDeled; // - int32_t totalUpdated; // - int32_t totalTerms; // - int32_t distinctCol; // distinct column + int32_t total; + int32_t add; // + int32_t del; // + int32_t update; // + int32_t terms; // + int32_t distCol; // distinct column } SIndexStat; struct SIndex { + SIndexOpts opts; + int64_t refId; void* cache; void* tindex; SHashObj* colObj; // < field name, field id> - int64_t suid; // current super table id, -1 is normal table - int32_t cVersion; // current version allocated to cache + int64_t suid; // current super table id, -1 is normal table + int32_t version; // current version allocated to cache SLRUCache* lru; char* path; @@ -68,7 +73,6 @@ struct SIndex { TdThreadMutex mtx; tsem_t sem; bool quit; - SIndexOpts opts; }; struct SIndexMultiTermQuery { @@ -111,14 +115,15 @@ typedef struct Iterate { void iterateValueDestroy(IterateValue* iv, bool destroy); -extern void* indexQhandle; - typedef struct TFileCacheKey { uint64_t suid; uint8_t colType; char* colName; int32_t nColName; } ICacheKey; + +int32_t idxSerialCacheKey(ICacheKey* key, char* buf); + int idxFlushCacheToTFile(SIndex* sIdx, void*, bool quit); int64_t idxAddRef(void* p); @@ -126,10 +131,6 @@ int32_t idxRemoveRef(int64_t ref); void idxAcquireRef(int64_t ref); void idxReleaseRef(int64_t ref); -int32_t idxSerialCacheKey(ICacheKey* key, char* buf); -// int32_t indexSerialKey(ICacheKey* key, char* buf); -// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); - #define IDX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0) #define IDX_TYPE_GET_TYPE(ty) (ty & 0x0F) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index be64a8b44d28a76a0b04a78b3940bcb0c86101da..d9a6b80f3d9762aa778d1167833a76255eafb932 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -25,10 +25,6 @@ #include "tref.h" #include "tsched.h" -#ifdef USE_LUCENE -#include "lucene++/Lucene_c.h" -#endif - #define INDEX_NUM_OF_THREADS 5 #define INDEX_QUEUE_SIZE 200 @@ -74,7 +70,7 @@ void indexCleanup() { typedef struct SIdxColInfo { int colId; // generated by index internal - int cVersion; + int version; } SIdxColInfo; static TdThreadOnce isInit = PTHREAD_ONCE_INIT; @@ -123,7 +119,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { } idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - idx->cVersion = 1; + idx->version = 1; idx->path = tstrdup(path); taosThreadMutexInit(&idx->mtx, NULL); tsem_init(&idx->sem, 0, 0); diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 794b85d244f875b6874855ac6a36a7500114afd5..7e867db755c658abad341e8fa59cd6687ef9b959 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -566,7 +566,6 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) { taosThreadMutexUnlock(&pCache->mtx); idxCacheUnRef(pCache); return 0; - // encode end } void idxCacheForceToMerge(void* cache) { IndexCache* pCache = cache; @@ -602,10 +601,10 @@ static int32_t idxQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* tr, } } int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STermValueType* s) { - int64_t st = taosGetTimestampUs(); if (cache == NULL) { return 0; } + IndexCache* pCache = cache; MemTable *mem = NULL, *imm = NULL; @@ -616,6 +615,8 @@ int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STerm idxMemRef(imm); taosThreadMutexUnlock(&pCache->mtx); + int64_t st = taosGetTimestampUs(); + int ret = (mem && mem->mem) ? idxQueryMem(mem, query, result, s) : 0; if (ret == 0 && *s != kTypeDeletion) { // continue search in imm diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index cd52d122f781e3210448904af7b9ac0d3b4f9046..691eb6771cfb0de12fe0328129dfa153b3eece8c 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -178,9 +178,9 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { // optime later int32_t ret = func(a, b); switch (comparType) { - case QUERY_LESS_THAN: { + case QUERY_LESS_THAN: if (ret < 0) return MATCH; - } break; + break; case QUERY_LESS_EQUAL: { if (ret <= 0) return MATCH; break; diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 75844ce76f1cb50d6847709309dae1ed3f77bf70..e7f221de3d16ed0186f630ebdfe412f77d440dc3 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -27,6 +27,44 @@ #define SIF_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define SIF_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) // clang-format on + +typedef union { + uint8_t u8; + uint16_t u16; + uint32_t u32; + uint64_t u64; + + int8_t i8; + int16_t i16; + int32_t i32; + int64_t i64; + + double d; + float f; +} SDataTypeBuf; + +#define SIF_DATA_CONVERT(type, val, dst) \ + do { \ + if (type == TSDB_DATA_TYPE_DOUBLE) \ + dst = GET_DOUBLE_VAL(val); \ + else if (type == TSDB_DATA_TYPE_BIGINT) \ + dst = *(int64_t *)val; \ + else if (type == TSDB_DATA_TYPE_INT) \ + dst = *(int32_t *)val; \ + else if (type == TSDB_DATA_TYPE_SMALLINT) \ + dst = *(int16_t *)val; \ + else if (type == TSDB_DATA_TYPE_TINYINT) \ + dst = *(int8_t *)val; \ + else if (type == TSDB_DATA_TYPE_UTINYINT) \ + dst = *(uint8_t *)val; \ + else if (type == TSDB_DATA_TYPE_USMALLINT) \ + dst = *(uint16_t *)val; \ + else if (type == TSDB_DATA_TYPE_UINT) \ + dst = *(uint32_t *)val; \ + else if (type == TSDB_DATA_TYPE_UBIGINT) \ + dst = *(uint64_t *)val; \ + } while (0); + typedef struct SIFParam { SHashObj *pFilter; @@ -48,7 +86,6 @@ typedef struct SIFCtx { SHashObj *pRes; /* element is SIFParam */ bool noExec; // true: just iterate condition tree, and add hint to executor plan SIndexMetaArg arg; - // SIdxFltStatus st; } SIFCtx; static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { @@ -75,11 +112,6 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output); static sif_func_t sifNullFunc = NULL; -// typedef struct SIFWalkParm -// construct tag filter operator later -// static void destroyTagFilterOperatorInfo(void *param) { -// STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param; -//} static void sifFreeParam(SIFParam *param) { if (param == NULL) return; @@ -365,42 +397,6 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) { } return NULL; } -typedef union { - uint8_t u8; - uint16_t u16; - uint32_t u32; - uint64_t u64; - - int8_t i8; - int16_t i16; - int32_t i32; - int64_t i64; - - double d; - float f; -} SDataTypeBuf; - -#define SIF_DATA_CONVERT(type, val, dst) \ - do { \ - if (type == TSDB_DATA_TYPE_DOUBLE) \ - dst = GET_DOUBLE_VAL(val); \ - else if (type == TSDB_DATA_TYPE_BIGINT) \ - dst = *(int64_t *)val; \ - else if (type == TSDB_DATA_TYPE_INT) \ - dst = *(int32_t *)val; \ - else if (type == TSDB_DATA_TYPE_SMALLINT) \ - dst = *(int16_t *)val; \ - else if (type == TSDB_DATA_TYPE_TINYINT) \ - dst = *(int8_t *)val; \ - else if (type == TSDB_DATA_TYPE_UTINYINT) \ - dst = *(uint8_t *)val; \ - else if (type == TSDB_DATA_TYPE_USMALLINT) \ - dst = *(uint16_t *)val; \ - else if (type == TSDB_DATA_TYPE_UINT) \ - dst = *(uint32_t *)val; \ - else if (type == TSDB_DATA_TYPE_UBIGINT) \ - dst = *(uint64_t *)val; \ - } while (0); static void sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typedata, SMetaFltParam *param) { int8_t ltype = left->colValType, rtype = right->colValType; @@ -693,11 +689,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou for (int32_t m = 0; m < node->pParameterList->length; m++) { if (node->condType == LOGIC_COND_TYPE_AND) { taosArrayAddAll(output->result, params[m].result); - // taosArrayDestroy(params[m].result); - // params[m].result = NULL; } else if (node->condType == LOGIC_COND_TYPE_OR) { taosArrayAddAll(output->result, params[m].result); - // params[m].result = NULL; } else if (node->condType == LOGIC_COND_TYPE_NOT) { // taosArrayAddAll(output->result, params[m].result); } diff --git a/source/libs/index/src/indexFstFile.c b/source/libs/index/src/indexFstFile.c index 1900e50973ade5278482162df05d1cb528365238..e6d1edfeda1972821c3c1d1341c65a0ebdd6bee8 100644 --- a/source/libs/index/src/indexFstFile.c +++ b/source/libs/index/src/indexFstFile.c @@ -211,9 +211,7 @@ IdxFstFile* idxFileCreate(void* wrt) { return cw; } void idxFileDestroy(IdxFstFile* cw) { - // free wrt object: close fd or free mem idxFileFlush(cw); - // idxFileCtxDestroy((IFileCtx *)(cw->wrt)); taosMemoryFree(cw); } @@ -222,10 +220,8 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) { return 0; } // update checksum - // write data to file/socket or mem IFileCtx* ctx = write->wrt; - - int nWrite = ctx->write(ctx, buf, len); + int nWrite = ctx->write(ctx, buf, len); assert(nWrite == len); write->count += len; diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 0a47fc0f167a359b35952f0c1e88af03d544c95d..48a0c631cf1af64f9a4676567a827accc4261226 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -183,13 +183,14 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) { return NULL; } reader->ctx = ctx; + reader->remove = false; if (0 != tfileReaderVerify(reader)) { indexError("invalid tfile, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName); tfileReaderDestroy(reader); return NULL; } - // T_REF_INC(reader); + if (0 != tfileReaderLoadHeader(reader)) { indexError("failed to load index header, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName); @@ -203,7 +204,6 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) { tfileReaderDestroy(reader); return NULL; } - reader->remove = false; return reader; } @@ -211,7 +211,6 @@ void tfileReaderDestroy(TFileReader* reader) { if (reader == NULL) { return; } - // T_REF_INC(reader); fstDestroy(reader->fst); if (reader->remove) { indexInfo("%s is removed", reader->ctx->file.buf); @@ -222,6 +221,7 @@ void tfileReaderDestroy(TFileReader* reader) { taosMemoryFree(reader); } + static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { int ret = 0; char* p = tem->colVal; @@ -494,7 +494,6 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) { char fullname[256] = {0}; tfileGenFileFullName(fullname, path, suid, colName, version); - // indexInfo("open write file name %s", fullname); IFileCtx* wcx = idxFileCtxCreate(TFILE, fullname, false, 1024 * 1024 * 64); if (wcx == NULL) { return NULL; @@ -503,8 +502,8 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c TFileHeader tfh = {0}; tfh.suid = suid; tfh.version = version; - memcpy(tfh.colName, colName, strlen(colName)); tfh.colType = colType; + memcpy(tfh.colName, colName, strlen(colName)); return tfileWriterCreate(wcx, &tfh); } @@ -706,7 +705,6 @@ static bool tfileIteratorNext(Iterate* iiter) { iv->type = ADD_VALUE; // value in tfile always ADD_VALUE iv->colVal = colVal; return true; - // std::string key(ch, sz); } static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 41688c733079f12fbd04683183dd80db3b65606d..4b5441f738d303fd8286e4cf856d1feacfee3eb5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1304,11 +1304,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { pTransInst->cfp(pTransInst->parent, pResp, NULL); return 0; } - /* - * no retry - * 1. query conn - * 2. rpc thread already receive quit msg - */ + STransConnCtx* pCtx = pMsg->ctx; int32_t code = pResp->code;