提交 47ff0e34 编写于 作者: dengyihao's avatar dengyihao

refactor idx code

上级 8d0767a9
......@@ -40,26 +40,31 @@ extern "C" {
#define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("IDX", DEBUG_TRACE, idxDebugFlag, __VA_ARGS__);} } while (0)
// clang-format on
extern void* indexQhandle;
typedef enum { LT, LE, GT, GE, CONTAINS, EQ } RangeType;
typedef enum { kTypeValue, kTypeDeletion } STermValueType;
typedef enum { kRebuild, kFinished } SIdxStatus;
typedef struct SIndexStat {
int32_t totalAdded; //
int32_t totalDeled; //
int32_t totalUpdated; //
int32_t totalTerms; //
int32_t distinctCol; // distinct column
int32_t total;
int32_t add; //
int32_t del; //
int32_t update; //
int32_t terms; //
int32_t distCol; // distinct column
} SIndexStat;
struct SIndex {
SIndexOpts opts;
int64_t refId;
void* cache;
void* tindex;
SHashObj* colObj; // < field name, field id>
int64_t suid; // current super table id, -1 is normal table
int32_t cVersion; // current version allocated to cache
int64_t suid; // current super table id, -1 is normal table
int32_t version; // current version allocated to cache
SLRUCache* lru;
char* path;
......@@ -68,7 +73,6 @@ struct SIndex {
TdThreadMutex mtx;
tsem_t sem;
bool quit;
SIndexOpts opts;
};
struct SIndexMultiTermQuery {
......@@ -111,14 +115,15 @@ typedef struct Iterate {
void iterateValueDestroy(IterateValue* iv, bool destroy);
extern void* indexQhandle;
typedef struct TFileCacheKey {
uint64_t suid;
uint8_t colType;
char* colName;
int32_t nColName;
} ICacheKey;
int32_t idxSerialCacheKey(ICacheKey* key, char* buf);
int idxFlushCacheToTFile(SIndex* sIdx, void*, bool quit);
int64_t idxAddRef(void* p);
......@@ -126,10 +131,6 @@ int32_t idxRemoveRef(int64_t ref);
void idxAcquireRef(int64_t ref);
void idxReleaseRef(int64_t ref);
int32_t idxSerialCacheKey(ICacheKey* key, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
#define IDX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
#define IDX_TYPE_GET_TYPE(ty) (ty & 0x0F)
......
......@@ -25,10 +25,6 @@
#include "tref.h"
#include "tsched.h"
#ifdef USE_LUCENE
#include "lucene++/Lucene_c.h"
#endif
#define INDEX_NUM_OF_THREADS 5
#define INDEX_QUEUE_SIZE 200
......@@ -74,7 +70,7 @@ void indexCleanup() {
typedef struct SIdxColInfo {
int colId; // generated by index internal
int cVersion;
int version;
} SIdxColInfo;
static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
......@@ -123,7 +119,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
}
idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
idx->cVersion = 1;
idx->version = 1;
idx->path = tstrdup(path);
taosThreadMutexInit(&idx->mtx, NULL);
tsem_init(&idx->sem, 0, 0);
......
......@@ -566,7 +566,6 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
taosThreadMutexUnlock(&pCache->mtx);
idxCacheUnRef(pCache);
return 0;
// encode end
}
void idxCacheForceToMerge(void* cache) {
IndexCache* pCache = cache;
......@@ -602,10 +601,10 @@ static int32_t idxQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* tr,
}
}
int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STermValueType* s) {
int64_t st = taosGetTimestampUs();
if (cache == NULL) {
return 0;
}
IndexCache* pCache = cache;
MemTable *mem = NULL, *imm = NULL;
......@@ -616,6 +615,8 @@ int idxCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STerm
idxMemRef(imm);
taosThreadMutexUnlock(&pCache->mtx);
int64_t st = taosGetTimestampUs();
int ret = (mem && mem->mem) ? idxQueryMem(mem, query, result, s) : 0;
if (ret == 0 && *s != kTypeDeletion) {
// continue search in imm
......
......@@ -178,9 +178,9 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
// optime later
int32_t ret = func(a, b);
switch (comparType) {
case QUERY_LESS_THAN: {
case QUERY_LESS_THAN:
if (ret < 0) return MATCH;
} break;
break;
case QUERY_LESS_EQUAL: {
if (ret <= 0) return MATCH;
break;
......
......@@ -26,6 +26,44 @@
#define SIF_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SIF_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SIF_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
typedef union {
uint8_t u8;
uint16_t u16;
uint32_t u32;
uint64_t u64;
int8_t i8;
int16_t i16;
int32_t i32;
int64_t i64;
double d;
float f;
} SDataTypeBuf;
#define SIF_DATA_CONVERT(type, val, dst) \
do { \
if (type == TSDB_DATA_TYPE_DOUBLE) \
dst = GET_DOUBLE_VAL(val); \
else if (type == TSDB_DATA_TYPE_BIGINT) \
dst = *(int64_t *)val; \
else if (type == TSDB_DATA_TYPE_INT) \
dst = *(int32_t *)val; \
else if (type == TSDB_DATA_TYPE_SMALLINT) \
dst = *(int16_t *)val; \
else if (type == TSDB_DATA_TYPE_TINYINT) \
dst = *(int8_t *)val; \
else if (type == TSDB_DATA_TYPE_UTINYINT) \
dst = *(uint8_t *)val; \
else if (type == TSDB_DATA_TYPE_USMALLINT) \
dst = *(uint16_t *)val; \
else if (type == TSDB_DATA_TYPE_UINT) \
dst = *(uint32_t *)val; \
else if (type == TSDB_DATA_TYPE_UBIGINT) \
dst = *(uint64_t *)val; \
} while (0);
// clang-format on
typedef struct SIFParam {
SHashObj *pFilter;
......@@ -48,7 +86,6 @@ typedef struct SIFCtx {
SHashObj *pRes; /* element is SIFParam */
bool noExec; // true: just iterate condition tree, and add hint to executor plan
SIndexMetaArg arg;
// SIdxFltStatus st;
} SIFCtx;
static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
......@@ -75,11 +112,6 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
static sif_func_t sifNullFunc = NULL;
// typedef struct SIFWalkParm
// construct tag filter operator later
// static void destroyTagFilterOperatorInfo(void *param) {
// STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param;
//}
static void sifFreeParam(SIFParam *param) {
if (param == NULL) return;
......@@ -365,42 +397,6 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
}
return NULL;
}
typedef union {
uint8_t u8;
uint16_t u16;
uint32_t u32;
uint64_t u64;
int8_t i8;
int16_t i16;
int32_t i32;
int64_t i64;
double d;
float f;
} SDataTypeBuf;
#define SIF_DATA_CONVERT(type, val, dst) \
do { \
if (type == TSDB_DATA_TYPE_DOUBLE) \
dst = GET_DOUBLE_VAL(val); \
else if (type == TSDB_DATA_TYPE_BIGINT) \
dst = *(int64_t *)val; \
else if (type == TSDB_DATA_TYPE_INT) \
dst = *(int32_t *)val; \
else if (type == TSDB_DATA_TYPE_SMALLINT) \
dst = *(int16_t *)val; \
else if (type == TSDB_DATA_TYPE_TINYINT) \
dst = *(int8_t *)val; \
else if (type == TSDB_DATA_TYPE_UTINYINT) \
dst = *(uint8_t *)val; \
else if (type == TSDB_DATA_TYPE_USMALLINT) \
dst = *(uint16_t *)val; \
else if (type == TSDB_DATA_TYPE_UINT) \
dst = *(uint32_t *)val; \
else if (type == TSDB_DATA_TYPE_UBIGINT) \
dst = *(uint64_t *)val; \
} while (0);
static void sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typedata, SMetaFltParam *param) {
int8_t ltype = left->colValType, rtype = right->colValType;
......@@ -693,11 +689,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
for (int32_t m = 0; m < node->pParameterList->length; m++) {
if (node->condType == LOGIC_COND_TYPE_AND) {
taosArrayAddAll(output->result, params[m].result);
// taosArrayDestroy(params[m].result);
// params[m].result = NULL;
} else if (node->condType == LOGIC_COND_TYPE_OR) {
taosArrayAddAll(output->result, params[m].result);
// params[m].result = NULL;
} else if (node->condType == LOGIC_COND_TYPE_NOT) {
// taosArrayAddAll(output->result, params[m].result);
}
......
......@@ -211,9 +211,7 @@ IdxFstFile* idxFileCreate(void* wrt) {
return cw;
}
void idxFileDestroy(IdxFstFile* cw) {
// free wrt object: close fd or free mem
idxFileFlush(cw);
// idxFileCtxDestroy((IFileCtx *)(cw->wrt));
taosMemoryFree(cw);
}
......@@ -222,10 +220,8 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
return 0;
}
// update checksum
// write data to file/socket or mem
IFileCtx* ctx = write->wrt;
int nWrite = ctx->write(ctx, buf, len);
int nWrite = ctx->write(ctx, buf, len);
assert(nWrite == len);
write->count += len;
......
......@@ -183,13 +183,14 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) {
return NULL;
}
reader->ctx = ctx;
reader->remove = false;
if (0 != tfileReaderVerify(reader)) {
indexError("invalid tfile, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName);
tfileReaderDestroy(reader);
return NULL;
}
// T_REF_INC(reader);
if (0 != tfileReaderLoadHeader(reader)) {
indexError("failed to load index header, suid:%" PRIu64 ", colName:%s", reader->header.suid,
reader->header.colName);
......@@ -203,7 +204,6 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) {
tfileReaderDestroy(reader);
return NULL;
}
reader->remove = false;
return reader;
}
......@@ -211,7 +211,6 @@ void tfileReaderDestroy(TFileReader* reader) {
if (reader == NULL) {
return;
}
// T_REF_INC(reader);
fstDestroy(reader->fst);
if (reader->remove) {
indexInfo("%s is removed", reader->ctx->file.buf);
......@@ -222,6 +221,7 @@ void tfileReaderDestroy(TFileReader* reader) {
taosMemoryFree(reader);
}
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
int ret = 0;
char* p = tem->colVal;
......@@ -494,7 +494,6 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) {
char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname);
IFileCtx* wcx = idxFileCtxCreate(TFILE, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) {
return NULL;
......@@ -503,8 +502,8 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
TFileHeader tfh = {0};
tfh.suid = suid;
tfh.version = version;
memcpy(tfh.colName, colName, strlen(colName));
tfh.colType = colType;
memcpy(tfh.colName, colName, strlen(colName));
return tfileWriterCreate(wcx, &tfh);
}
......@@ -706,7 +705,6 @@ static bool tfileIteratorNext(Iterate* iiter) {
iv->type = ADD_VALUE; // value in tfile always ADD_VALUE
iv->colVal = colVal;
return true;
// std::string key(ch, sz);
}
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
......
......@@ -1304,11 +1304,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pTransInst->cfp(pTransInst->parent, pResp, NULL);
return 0;
}
/*
* no retry
* 1. query conn
* 2. rpc thread already receive quit msg
*/
STransConnCtx* pCtx = pMsg->ctx;
int32_t code = pResp->code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册