未验证 提交 2bfecf6f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16586 from taosdata/feature/idxRefactor

refactor idx code
...@@ -40,26 +40,31 @@ extern "C" { ...@@ -40,26 +40,31 @@ extern "C" {
#define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("IDX", DEBUG_TRACE, idxDebugFlag, __VA_ARGS__);} } while (0) #define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("IDX", DEBUG_TRACE, idxDebugFlag, __VA_ARGS__);} } while (0)
// clang-format on // clang-format on
extern void* indexQhandle;
typedef enum { LT, LE, GT, GE, CONTAINS, EQ } RangeType; typedef enum { LT, LE, GT, GE, CONTAINS, EQ } RangeType;
typedef enum { kTypeValue, kTypeDeletion } STermValueType; typedef enum { kTypeValue, kTypeDeletion } STermValueType;
typedef enum { kRebuild, kFinished } SIdxStatus; typedef enum { kRebuild, kFinished } SIdxStatus;
typedef struct SIndexStat { typedef struct SIndexStat {
int32_t totalAdded; // int32_t total;
int32_t totalDeled; // int32_t add; //
int32_t totalUpdated; // int32_t del; //
int32_t totalTerms; // int32_t update; //
int32_t distinctCol; // distinct column int32_t terms; //
int32_t distCol; // distinct column
} SIndexStat; } SIndexStat;
struct SIndex { struct SIndex {
SIndexOpts opts;
int64_t refId; int64_t refId;
void* cache; void* cache;
void* tindex; void* tindex;
SHashObj* colObj; // < field name, field id> SHashObj* colObj; // < field name, field id>
int64_t suid; // current super table id, -1 is normal table int64_t suid; // current super table id, -1 is normal table
int32_t cVersion; // current version allocated to cache int32_t version; // current version allocated to cache
SLRUCache* lru; SLRUCache* lru;
char* path; char* path;
...@@ -68,7 +73,6 @@ struct SIndex { ...@@ -68,7 +73,6 @@ struct SIndex {
TdThreadMutex mtx; TdThreadMutex mtx;
tsem_t sem; tsem_t sem;
bool quit; bool quit;
SIndexOpts opts;
}; };
struct SIndexMultiTermQuery { struct SIndexMultiTermQuery {
...@@ -111,14 +115,15 @@ typedef struct Iterate { ...@@ -111,14 +115,15 @@ typedef struct Iterate {
void iterateValueDestroy(IterateValue* iv, bool destroy); void iterateValueDestroy(IterateValue* iv, bool destroy);
extern void* indexQhandle;
typedef struct TFileCacheKey { typedef struct TFileCacheKey {
uint64_t suid; uint64_t suid;
uint8_t colType; uint8_t colType;
char* colName; char* colName;
int32_t nColName; int32_t nColName;
} ICacheKey; } ICacheKey;
int32_t idxSerialCacheKey(ICacheKey* key, char* buf);
int idxFlushCacheToTFile(SIndex* sIdx, void*, bool quit); int idxFlushCacheToTFile(SIndex* sIdx, void*, bool quit);
int64_t idxAddRef(void* p); int64_t idxAddRef(void* p);
...@@ -126,10 +131,6 @@ int32_t idxRemoveRef(int64_t ref); ...@@ -126,10 +131,6 @@ int32_t idxRemoveRef(int64_t ref);
void idxAcquireRef(int64_t ref); void idxAcquireRef(int64_t ref);
void idxReleaseRef(int64_t ref); void idxReleaseRef(int64_t ref);
int32_t idxSerialCacheKey(ICacheKey* key, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
#define IDX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0) #define IDX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
#define IDX_TYPE_GET_TYPE(ty) (ty & 0x0F) #define IDX_TYPE_GET_TYPE(ty) (ty & 0x0F)
......
...@@ -25,10 +25,6 @@ ...@@ -25,10 +25,6 @@
#include "tref.h" #include "tref.h"
#include "tsched.h" #include "tsched.h"
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
#endif
#define INDEX_NUM_OF_THREADS 5 #define INDEX_NUM_OF_THREADS 5
#define INDEX_QUEUE_SIZE 200 #define INDEX_QUEUE_SIZE 200
...@@ -74,7 +70,7 @@ void indexCleanup() { ...@@ -74,7 +70,7 @@ void indexCleanup() {
typedef struct SIdxColInfo { typedef struct SIdxColInfo {
int colId; // generated by index internal int colId; // generated by index internal
int cVersion; int version;
} SIdxColInfo; } SIdxColInfo;
static TdThreadOnce isInit = PTHREAD_ONCE_INIT; static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
...@@ -123,7 +119,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -123,7 +119,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
} }
idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
idx->cVersion = 1; idx->version = 1;
idx->path = tstrdup(path); idx->path = tstrdup(path);
taosThreadMutexInit(&idx->mtx, NULL); taosThreadMutexInit(&idx->mtx, NULL);
tsem_init(&idx->sem, 0, 0); tsem_init(&idx->sem, 0, 0);
......
...@@ -566,7 +566,6 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) { ...@@ -566,7 +566,6 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
taosThreadMutexUnlock(&pCache->mtx); taosThreadMutexUnlock(&pCache->mtx);
idxCacheUnRef(pCache); idxCacheUnRef(pCache);
return 0; return 0;
// encode end
} }
void idxCacheForceToMerge(void* cache) { void idxCacheForceToMerge(void* cache) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
...@@ -602,10 +601,10 @@ static int32_t idxQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* tr, ...@@ -602,10 +601,10 @@ static int32_t idxQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* tr,
} }
} }
int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STermValueType* s) { int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STermValueType* s) {
int64_t st = taosGetTimestampUs();
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
} }
IndexCache* pCache = cache; IndexCache* pCache = cache;
MemTable *mem = NULL, *imm = NULL; MemTable *mem = NULL, *imm = NULL;
...@@ -616,6 +615,8 @@ int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STerm ...@@ -616,6 +615,8 @@ int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STerm
idxMemRef(imm); idxMemRef(imm);
taosThreadMutexUnlock(&pCache->mtx); taosThreadMutexUnlock(&pCache->mtx);
int64_t st = taosGetTimestampUs();
int ret = (mem && mem->mem) ? idxQueryMem(mem, query, result, s) : 0; int ret = (mem && mem->mem) ? idxQueryMem(mem, query, result, s) : 0;
if (ret == 0 && *s != kTypeDeletion) { if (ret == 0 && *s != kTypeDeletion) {
// continue search in imm // continue search in imm
......
...@@ -178,9 +178,9 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { ...@@ -178,9 +178,9 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
// optime later // optime later
int32_t ret = func(a, b); int32_t ret = func(a, b);
switch (comparType) { switch (comparType) {
case QUERY_LESS_THAN: { case QUERY_LESS_THAN:
if (ret < 0) return MATCH; if (ret < 0) return MATCH;
} break; break;
case QUERY_LESS_EQUAL: { case QUERY_LESS_EQUAL: {
if (ret <= 0) return MATCH; if (ret <= 0) return MATCH;
break; break;
......
...@@ -27,6 +27,44 @@ ...@@ -27,6 +27,44 @@
#define SIF_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define SIF_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SIF_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define SIF_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
// clang-format on // clang-format on
typedef union {
uint8_t u8;
uint16_t u16;
uint32_t u32;
uint64_t u64;
int8_t i8;
int16_t i16;
int32_t i32;
int64_t i64;
double d;
float f;
} SDataTypeBuf;
#define SIF_DATA_CONVERT(type, val, dst) \
do { \
if (type == TSDB_DATA_TYPE_DOUBLE) \
dst = GET_DOUBLE_VAL(val); \
else if (type == TSDB_DATA_TYPE_BIGINT) \
dst = *(int64_t *)val; \
else if (type == TSDB_DATA_TYPE_INT) \
dst = *(int32_t *)val; \
else if (type == TSDB_DATA_TYPE_SMALLINT) \
dst = *(int16_t *)val; \
else if (type == TSDB_DATA_TYPE_TINYINT) \
dst = *(int8_t *)val; \
else if (type == TSDB_DATA_TYPE_UTINYINT) \
dst = *(uint8_t *)val; \
else if (type == TSDB_DATA_TYPE_USMALLINT) \
dst = *(uint16_t *)val; \
else if (type == TSDB_DATA_TYPE_UINT) \
dst = *(uint32_t *)val; \
else if (type == TSDB_DATA_TYPE_UBIGINT) \
dst = *(uint64_t *)val; \
} while (0);
typedef struct SIFParam { typedef struct SIFParam {
SHashObj *pFilter; SHashObj *pFilter;
...@@ -48,7 +86,6 @@ typedef struct SIFCtx { ...@@ -48,7 +86,6 @@ typedef struct SIFCtx {
SHashObj *pRes; /* element is SIFParam */ SHashObj *pRes; /* element is SIFParam */
bool noExec; // true: just iterate condition tree, and add hint to executor plan bool noExec; // true: just iterate condition tree, and add hint to executor plan
SIndexMetaArg arg; SIndexMetaArg arg;
// SIdxFltStatus st;
} SIFCtx; } SIFCtx;
static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
...@@ -75,11 +112,6 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { ...@@ -75,11 +112,6 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output); typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
static sif_func_t sifNullFunc = NULL; static sif_func_t sifNullFunc = NULL;
// typedef struct SIFWalkParm
// construct tag filter operator later
// static void destroyTagFilterOperatorInfo(void *param) {
// STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param;
//}
static void sifFreeParam(SIFParam *param) { static void sifFreeParam(SIFParam *param) {
if (param == NULL) return; if (param == NULL) return;
...@@ -365,42 +397,6 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) { ...@@ -365,42 +397,6 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
} }
return NULL; return NULL;
} }
typedef union {
uint8_t u8;
uint16_t u16;
uint32_t u32;
uint64_t u64;
int8_t i8;
int16_t i16;
int32_t i32;
int64_t i64;
double d;
float f;
} SDataTypeBuf;
#define SIF_DATA_CONVERT(type, val, dst) \
do { \
if (type == TSDB_DATA_TYPE_DOUBLE) \
dst = GET_DOUBLE_VAL(val); \
else if (type == TSDB_DATA_TYPE_BIGINT) \
dst = *(int64_t *)val; \
else if (type == TSDB_DATA_TYPE_INT) \
dst = *(int32_t *)val; \
else if (type == TSDB_DATA_TYPE_SMALLINT) \
dst = *(int16_t *)val; \
else if (type == TSDB_DATA_TYPE_TINYINT) \
dst = *(int8_t *)val; \
else if (type == TSDB_DATA_TYPE_UTINYINT) \
dst = *(uint8_t *)val; \
else if (type == TSDB_DATA_TYPE_USMALLINT) \
dst = *(uint16_t *)val; \
else if (type == TSDB_DATA_TYPE_UINT) \
dst = *(uint32_t *)val; \
else if (type == TSDB_DATA_TYPE_UBIGINT) \
dst = *(uint64_t *)val; \
} while (0);
static void sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typedata, SMetaFltParam *param) { static void sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typedata, SMetaFltParam *param) {
int8_t ltype = left->colValType, rtype = right->colValType; int8_t ltype = left->colValType, rtype = right->colValType;
...@@ -693,11 +689,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou ...@@ -693,11 +689,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
for (int32_t m = 0; m < node->pParameterList->length; m++) { for (int32_t m = 0; m < node->pParameterList->length; m++) {
if (node->condType == LOGIC_COND_TYPE_AND) { if (node->condType == LOGIC_COND_TYPE_AND) {
taosArrayAddAll(output->result, params[m].result); taosArrayAddAll(output->result, params[m].result);
// taosArrayDestroy(params[m].result);
// params[m].result = NULL;
} else if (node->condType == LOGIC_COND_TYPE_OR) { } else if (node->condType == LOGIC_COND_TYPE_OR) {
taosArrayAddAll(output->result, params[m].result); taosArrayAddAll(output->result, params[m].result);
// params[m].result = NULL;
} else if (node->condType == LOGIC_COND_TYPE_NOT) { } else if (node->condType == LOGIC_COND_TYPE_NOT) {
// taosArrayAddAll(output->result, params[m].result); // taosArrayAddAll(output->result, params[m].result);
} }
......
...@@ -211,9 +211,7 @@ IdxFstFile* idxFileCreate(void* wrt) { ...@@ -211,9 +211,7 @@ IdxFstFile* idxFileCreate(void* wrt) {
return cw; return cw;
} }
void idxFileDestroy(IdxFstFile* cw) { void idxFileDestroy(IdxFstFile* cw) {
// free wrt object: close fd or free mem
idxFileFlush(cw); idxFileFlush(cw);
// idxFileCtxDestroy((IFileCtx *)(cw->wrt));
taosMemoryFree(cw); taosMemoryFree(cw);
} }
...@@ -222,9 +220,7 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) { ...@@ -222,9 +220,7 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
return 0; return 0;
} }
// update checksum // update checksum
// write data to file/socket or mem
IFileCtx* ctx = write->wrt; IFileCtx* ctx = write->wrt;
int nWrite = ctx->write(ctx, buf, len); int nWrite = ctx->write(ctx, buf, len);
assert(nWrite == len); assert(nWrite == len);
write->count += len; write->count += len;
......
...@@ -183,13 +183,14 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) { ...@@ -183,13 +183,14 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) {
return NULL; return NULL;
} }
reader->ctx = ctx; reader->ctx = ctx;
reader->remove = false;
if (0 != tfileReaderVerify(reader)) { if (0 != tfileReaderVerify(reader)) {
indexError("invalid tfile, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName); indexError("invalid tfile, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName);
tfileReaderDestroy(reader); tfileReaderDestroy(reader);
return NULL; return NULL;
} }
// T_REF_INC(reader);
if (0 != tfileReaderLoadHeader(reader)) { if (0 != tfileReaderLoadHeader(reader)) {
indexError("failed to load index header, suid:%" PRIu64 ", colName:%s", reader->header.suid, indexError("failed to load index header, suid:%" PRIu64 ", colName:%s", reader->header.suid,
reader->header.colName); reader->header.colName);
...@@ -203,7 +204,6 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) { ...@@ -203,7 +204,6 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) {
tfileReaderDestroy(reader); tfileReaderDestroy(reader);
return NULL; return NULL;
} }
reader->remove = false;
return reader; return reader;
} }
...@@ -211,7 +211,6 @@ void tfileReaderDestroy(TFileReader* reader) { ...@@ -211,7 +211,6 @@ void tfileReaderDestroy(TFileReader* reader) {
if (reader == NULL) { if (reader == NULL) {
return; return;
} }
// T_REF_INC(reader);
fstDestroy(reader->fst); fstDestroy(reader->fst);
if (reader->remove) { if (reader->remove) {
indexInfo("%s is removed", reader->ctx->file.buf); indexInfo("%s is removed", reader->ctx->file.buf);
...@@ -222,6 +221,7 @@ void tfileReaderDestroy(TFileReader* reader) { ...@@ -222,6 +221,7 @@ void tfileReaderDestroy(TFileReader* reader) {
taosMemoryFree(reader); taosMemoryFree(reader);
} }
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
int ret = 0; int ret = 0;
char* p = tem->colVal; char* p = tem->colVal;
...@@ -494,7 +494,6 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr ...@@ -494,7 +494,6 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) { TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) {
char fullname[256] = {0}; char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version); tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname);
IFileCtx* wcx = idxFileCtxCreate(TFILE, fullname, false, 1024 * 1024 * 64); IFileCtx* wcx = idxFileCtxCreate(TFILE, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) { if (wcx == NULL) {
return NULL; return NULL;
...@@ -503,8 +502,8 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c ...@@ -503,8 +502,8 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
TFileHeader tfh = {0}; TFileHeader tfh = {0};
tfh.suid = suid; tfh.suid = suid;
tfh.version = version; tfh.version = version;
memcpy(tfh.colName, colName, strlen(colName));
tfh.colType = colType; tfh.colType = colType;
memcpy(tfh.colName, colName, strlen(colName));
return tfileWriterCreate(wcx, &tfh); return tfileWriterCreate(wcx, &tfh);
} }
...@@ -706,7 +705,6 @@ static bool tfileIteratorNext(Iterate* iiter) { ...@@ -706,7 +705,6 @@ static bool tfileIteratorNext(Iterate* iiter) {
iv->type = ADD_VALUE; // value in tfile always ADD_VALUE iv->type = ADD_VALUE; // value in tfile always ADD_VALUE
iv->colVal = colVal; iv->colVal = colVal;
return true; return true;
// std::string key(ch, sz);
} }
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; } static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
......
...@@ -1304,11 +1304,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1304,11 +1304,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pTransInst->cfp(pTransInst->parent, pResp, NULL); pTransInst->cfp(pTransInst->parent, pResp, NULL);
return 0; return 0;
} }
/*
* no retry
* 1. query conn
* 2. rpc thread already receive quit msg
*/
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code; int32_t code = pResp->code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册