未验证 提交 b276ec5d 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #12807 from taosdata/enh/refactorIdx

Enh/refactor idx
......@@ -146,5 +146,6 @@ option(
option(
BUILD_WITH_INVERTEDINDEX
"If use invertedIndex"
ON
OFF
)
......@@ -76,9 +76,14 @@ target_link_libraries(
#PUBLIC scalar
PUBLIC transport
PUBLIC stream
PUBLIC index
)
target_compile_definitions(vnode PUBLIC -DMETA_REFACT)
if (${BUILD_WITH_INVERTEDINDEX})
add_definitions(-DUSE_INVERTED_INDEX)
endif(${BUILD_WITH_INVERTEDINDEX})
if(${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
......@@ -17,6 +17,7 @@
#define _TD_VNODE_META_H_
#include "vnodeInt.h"
#include "index.h"
#ifdef __cplusplus
extern "C" {
......@@ -61,16 +62,20 @@ static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64()
struct SMeta {
TdThreadRwlock lock;
char* path;
SVnode* pVnode;
TDB* pEnv;
TXN txn;
TTB* pTbDb;
TTB* pSkmDb;
TTB* pUidIdx;
TTB* pNameIdx;
TTB* pCtbIdx;
TTB* pTagIdx;
char* path;
SVnode* pVnode;
TDB* pEnv;
TXN txn;
TTB* pTbDb;
TTB* pSkmDb;
TTB* pUidIdx;
TTB* pNameIdx;
TTB* pCtbIdx;
#ifdef USE_INVERTED_INDEX
void* pTagIvtIdx;
#else
TTB* pTagIdx;
#endif
TTB* pTtlIdx;
TTB* pSmaIdx;
SMetaIdx* pIdx;
......
......@@ -53,10 +53,10 @@ int metaOpenIdx(SMeta *pMeta) {
#endif
#ifdef USE_INVERTED_INDEX
SIndexOpts opts;
if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) {
return -1;
}
// SIndexOpts opts;
// if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) {
// return -1;
//}
#endif
return 0;
......@@ -71,36 +71,37 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */
#endif
#ifdef USE_INVERTED_INDEX
SIndexOpts opts;
if (indexClose(pMeta->pIdx->pIdx) != 0) {
return -1;
}
// SIndexOpts opts;
// if (indexClose(pMeta->pIdx->pIdx) != 0) {
// return -1;
//}
// return 0;
#endif
}
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) {
#ifdef USE_INVERTED_INDEX
if (pTbCfgs->type == META_CHILD_TABLE) {
char buf[8] = {0};
int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId;
sprintf(buf, "%d", colId); // colname
char *pTagVal = (char *)tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
tb_uid_t suid = pTbCfg->ctbCfg.suid; // super id
tb_uid_t tuid = 0; // child table uid
SIndexMultiTerm *terms = indexMultiTermCreate();
SIndexTerm *term =
indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_BINARY, buf, strlen(buf), pTagVal, strlen(pTagVal), tuid);
indexMultiTermAdd(terms, term);
int ret = indexPut(pMeta->pIdx->pIdx, terms);
indexMultiTermDestroy(terms);
return ret;
} else {
return DB_DONOTINDEX;
}
// if (pTbCfgs->type == META_CHILD_TABLE) {
// char buf[8] = {0};
// int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId;
// sprintf(buf, "%d", colId); // colname
// char *pTagVal = (char *)tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
// tb_uid_t suid = pTbCfg->ctbCfg.suid; // super id
// tb_uid_t tuid = 0; // child table uid
// SIndexMultiTerm *terms = indexMultiTermCreate();
// SIndexTerm *term =
// indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_BINARY, buf, strlen(buf), pTagVal, strlen(pTagVal), tuid);
// indexMultiTermAdd(terms, term);
// int ret = indexPut(pMeta->pIdx->pIdx, terms);
// indexMultiTermDestroy(terms);
// return ret;
//} else {
// return DB_DONOTINDEX;
//}
#endif
// TODO
return 0;
......@@ -112,4 +113,4 @@ int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
#endif
// TODO
return 0;
}
\ No newline at end of file
}
......@@ -93,11 +93,24 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
}
// open pTagIdx
#ifdef USE_INVERTED_INDEX
// TODO(yihaoDeng), refactor later
char indexFullPath[128] = {0};
sprintf(indexFullPath, "%s/%s", pMeta->path, "invert");
taosMkDir(indexFullPath);
ret = indexOpen(indexOptsCreate(), indexFullPath, (SIndex **)&pMeta->pTagIvtIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
#else
ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
#endif
// open pTtlIdx
ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
......@@ -128,7 +141,11 @@ _err:
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
#ifdef USE_INVERTED_INDEX
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
#else
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
#endif
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
......@@ -145,7 +162,11 @@ int metaClose(SMeta *pMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
#ifdef USE_INVERTED_INDEX
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
#else
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
#endif
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
......
......@@ -30,9 +30,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int vLen = 0;
const void *pKey = NULL;
const void *pVal = NULL;
void *pBuf = NULL;
void * pBuf = NULL;
int32_t szBuf = 0;
void *p = NULL;
void * p = NULL;
SMetaReader mr = {0};
// validate req
......@@ -71,9 +71,9 @@ _err:
}
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
TBC *pNameIdxc = NULL;
TBC *pUidIdxc = NULL;
TBC *pCtbIdxc = NULL;
TBC * pNameIdxc = NULL;
TBC * pUidIdxc = NULL;
TBC * pCtbIdxc = NULL;
SCtbIdxKey *pCtbIdxKey;
const void *pKey = NULL;
int nKey;
......@@ -134,8 +134,8 @@ _err:
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
TBC *pUidIdxc = NULL;
TBC *pTbDbc = NULL;
TBC * pUidIdxc = NULL;
TBC * pTbDbc = NULL;
const void *pData;
int nData;
int64_t oversion;
......@@ -256,9 +256,9 @@ _err:
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
TBC *pTbDbc = NULL;
TBC *pUidIdxc = NULL;
TBC *pNameIdxc = NULL;
TBC * pTbDbc = NULL;
TBC * pUidIdxc = NULL;
TBC * pNameIdxc = NULL;
const void *pData;
int nData;
tb_uid_t uid;
......@@ -377,14 +377,14 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
}
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
void *pVal = NULL;
void * pVal = NULL;
int nVal = 0;
const void *pData = NULL;
const void * pData = NULL;
int nData = 0;
int ret = 0;
tb_uid_t uid;
int64_t oversion;
SSchema *pColumn = NULL;
SSchema * pColumn = NULL;
SMetaEntry entry = {0};
SSchemaWrapper *pSchema;
int c;
......@@ -530,7 +530,7 @@ _err:
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry ctbEntry = {0};
SMetaEntry stbEntry = {0};
void *pVal = NULL;
void * pVal = NULL;
int nVal = 0;
int ret;
int c;
......@@ -561,7 +561,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
oversion = *(int64_t *)pData;
// search table.db
TBC *pTbDbc = NULL;
TBC * pTbDbc = NULL;
SDecoder dc1 = {0};
SDecoder dc2 = {0};
......@@ -585,7 +585,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaDecodeEntry(&dc2, &stbEntry);
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
SSchema *pColumn = NULL;
SSchema * pColumn = NULL;
int32_t iCol = 0;
for (;;) {
pColumn = NULL;
......@@ -681,8 +681,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey;
void *pKey = NULL;
void *pVal = NULL;
void * pKey = NULL;
void * pVal = NULL;
int kLen = 0;
int vLen = 0;
SEncoder coder = {0};
......@@ -797,14 +797,14 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
}
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
void *pData = NULL;
void * pData = NULL;
int nData = 0;
STbDbKey tbDbKey = {0};
SMetaEntry stbEntry = {0};
STagIdxKey *pTagIdxKey = NULL;
STagIdxKey * pTagIdxKey = NULL;
int32_t nTagIdxKey;
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
const void *pTagData = NULL; //
const void * pTagData = NULL; //
SDecoder dc = {0};
// get super table
......@@ -820,22 +820,33 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
pTagData = tdGetKVRowValOfCol((const SKVRow)pCtbEntry->ctbEntry.pTags, pTagColumn->colId);
// update tag index
#ifdef USE_INVERTED_INDEX
tb_uid_t suid = pCtbEntry->ctbEntry.suid;
tb_uid_t tuid = pCtbEntry->uid;
SIndexMultiTerm *tmGroup = indexMultiTermCreate();
SIndexTerm *tm = indexTermCreate(suid, ADD_VALUE, pTagColumn->type, pTagColumn->name, sizeof(pTagColumn->name),
pTagData, pTagData == NULL ? 0 : strlen(pTagData));
indexMultiTermAdd(tmGroup, tm);
int ret = indexPut((SIndex *)pMeta->pTagIvtIdx, tmGroup, tuid);
indexMultiTermDestroy(tmGroup);
#else
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, pTagColumn->type, pCtbEntry->uid,
&pTagIdxKey, &nTagIdxKey) < 0) {
return -1;
}
tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
metaDestroyTagIdxKey(pTagIdxKey);
#endif
tDecoderClear(&dc);
tdbFree(pData);
return 0;
}
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
SEncoder coder = {0};
void *pVal = NULL;
void * pVal = NULL;
int vLen = 0;
int rcode = 0;
SSkmDbKey skmDbKey = {0};
......
......@@ -19,38 +19,11 @@
#include "nodes.h"
#include "tdatablock.h"
typedef struct SIFCtx {
int32_t code;
SHashObj *pRes; /* element is SScalarParam */
bool noExec; // true: just iterate condition tree, and add hint to executor plan
// SIdxFltStatus st;
} SIFCtx;
#define SIF_ERR_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
return _code; \
} \
} while (0)
#define SIF_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
} \
return _code; \
} while (0)
#define SIF_ERR_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
// clang-format off
#define SIF_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SIF_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SIF_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
// clang-format on
typedef struct SIFParam {
SHashObj *pFilter;
......@@ -65,6 +38,13 @@ typedef struct SIFParam {
char colName[TSDB_COL_NAME_LEN];
} SIFParam;
typedef struct SIFCtx {
int32_t code;
SHashObj *pRes; /* element is SIFParam */
bool noExec; // true: just iterate condition tree, and add hint to executor plan
// SIdxFltStatus st;
} SIFCtx;
static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
if (src == OP_TYPE_GREATER_THAN) {
*dst = QUERY_GREATER_THAN;
......
......@@ -31,7 +31,7 @@ if (${BUILD_WITH_INVERTEDINDEX})
endif(${BUILD_WITH_INVERTEDINDEX})
if (${BUILD_TEST})
add_subdirectory(test)
endif(${BUILD_TEST})
#if (${BUILD_TEST})
# add_subdirectory(test)
#endif(${BUILD_TEST})
......@@ -63,7 +63,10 @@ typedef struct CacheTerm {
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type);
void indexCacheForceToMerge(void* cache);
void indexCacheDestroy(void* cache);
void indexCacheBroadcast(void* cache);
void indexCacheWait(void* cache);
Iterate* indexCacheIteratorCreate(IndexCache* cache);
void indexCacheIteratorDestroy(Iterate* iiter);
......
......@@ -58,6 +58,8 @@ struct SIndex {
SIndexStat stat;
TdThreadMutex mtx;
tsem_t sem;
bool quit;
};
struct SIndexOpts {
......@@ -69,6 +71,7 @@ struct SIndexOpts {
int32_t cacheSize; // MB
// add cache module later
#endif
int32_t cacheOpt; // MB
};
struct SIndexMultiTermQuery {
......@@ -131,42 +134,14 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
#define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { \
taosPrintLog("index FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
} \
} while (0)
#define indexError(...) \
do { \
if (sDebugFlag & DEBUG_ERROR) { \
taosPrintLog("index ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \
} while (0)
#define indexWarn(...) \
do { \
if (sDebugFlag & DEBUG_WARN) { \
taosPrintLog("index WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
} while (0)
#define indexInfo(...) \
do { \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("index ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
} while (0)
#define indexDebug(...) \
do { \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("index ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define indexTrace(...) \
do { \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("index ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
} \
} while (0)
// clang-format off
#define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0)
#define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0)
#define indexWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0)
#define indexInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0)
#define indexDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0)
#define indexTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0)
// clang-format on
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
......
......@@ -90,6 +90,15 @@ static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, Iterat
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
static void indexPost(void* idx) {
SIndex* pIdx = idx;
tsem_post(&pIdx->sem);
}
static void indexWait(void* idx) {
SIndex* pIdx = idx;
tsem_wait(&pIdx->sem);
}
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
taosThreadOnce(&isInit, indexInit);
SIndex* sIdx = taosMemoryCalloc(1, sizeof(SIndex));
......@@ -107,6 +116,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
sIdx->cVersion = 1;
sIdx->path = tstrdup(path);
taosThreadMutexInit(&sIdx->mtx, NULL);
tsem_init(&sIdx->sem, 0, 0);
// taosThreadCondInit(&sIdx->finished, NULL);
sIdx->refId = indexAddRef(sIdx);
indexAcquireRef(sIdx->refId);
......@@ -124,16 +135,8 @@ END:
void indexDestroy(void* handle) {
SIndex* sIdx = handle;
void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) {
IndexCache** pCache = iter;
if (*pCache) {
indexCacheUnRef(*pCache);
}
iter = taosHashIterate(sIdx->colObj, iter);
}
taosHashCleanup(sIdx->colObj);
taosThreadMutexDestroy(&sIdx->mtx);
tsem_destroy(&sIdx->sem);
indexTFileDestroy(sIdx->tindex);
taosMemoryFree(sIdx->path);
taosMemoryFree(sIdx);
......@@ -141,6 +144,20 @@ void indexDestroy(void* handle) {
}
void indexClose(SIndex* sIdx) {
indexReleaseRef(sIdx->refId);
bool ref = 0;
if (sIdx->colObj != NULL) {
void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) {
IndexCache** pCache = iter;
indexCacheForceToMerge((void*)(*pCache));
indexWait((void*)(sIdx));
iter = taosHashIterate(sIdx->colObj, iter);
indexCacheUnRef(*pCache);
}
taosHashCleanup(sIdx->colObj);
sIdx->colObj = NULL;
}
// taosMsleep(1000 * 5);
indexRemoveRef(sIdx->refId);
}
int64_t indexAddRef(void* p) {
......@@ -451,6 +468,18 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
}
// handle flush
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
if (cacheIter == NULL) {
indexError("%p immtable is empty, ignore merge opera", pCache);
indexCacheDestroyImm(pCache);
tfileReaderUnRef(pReader);
if (sIdx->quit) {
indexPost(sIdx);
// indexCacheBroadcast(pCache);
}
indexReleaseRef(sIdx->refId);
return 0;
}
Iterate* tfileIter = tfileIteratorCreate(pReader);
if (tfileIter == NULL) {
indexWarn("empty tfile reader iterator");
......@@ -506,7 +535,11 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
} else {
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
}
if (sIdx->quit) {
indexPost(sIdx);
}
indexReleaseRef(sIdx->refId);
return ret;
}
void iterateValueDestroy(IterateValue* value, bool destroy) {
......@@ -563,10 +596,11 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
char* p = buf;
SERIALIZE_MEM_TO_BUF(buf, key, suid);
char tbuf[65] = {0};
indexInt2str((int64_t)key->suid, tbuf, 0);
SERIALIZE_STR_VAR_TO_BUF(buf, tbuf, strlen(tbuf));
SERIALIZE_VAR_TO_BUF(buf, '_', char);
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
if (hasJson) {
SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
} else {
......
......@@ -23,6 +23,7 @@
#define MEM_TERM_LIMIT 10 * 10000
#define MEM_THRESHOLD 64 * 1024
#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 20
#define MEM_ESTIMATE_RADIO 1.5
static void indexMemRef(MemTable* tbl);
......@@ -385,7 +386,7 @@ void indexCacheDebug(IndexCache* cache) {
void indexCacheDestroySkiplist(SSkipList* slt) {
SSkipListIterator* iter = tSkipListCreateIter(slt);
while (tSkipListIterNext(iter)) {
while (iter != NULL && tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) {
......@@ -396,17 +397,24 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
tSkipListDestroyIter(iter);
tSkipListDestroy(slt);
}
void indexCacheBroadcast(void* cache) {
IndexCache* pCache = cache;
taosThreadCondBroadcast(&pCache->finished);
}
void indexCacheWait(void* cache) {
IndexCache* pCache = cache;
taosThreadCondWait(&pCache->finished, &pCache->mtx);
}
void indexCacheDestroyImm(IndexCache* cache) {
if (cache == NULL) {
return;
}
MemTable* tbl = NULL;
taosThreadMutexLock(&cache->mtx);
tbl = cache->imm;
cache->imm = NULL; // or throw int bg thread
taosThreadCondBroadcast(&cache->finished);
indexCacheBroadcast(cache);
taosThreadMutexUnlock(&cache->mtx);
......@@ -429,11 +437,13 @@ void indexCacheDestroy(void* cache) {
}
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
if (cache->imm == NULL) {
return NULL;
}
Iterate* iiter = taosMemoryCalloc(1, sizeof(Iterate));
if (iiter == NULL) {
return NULL;
}
taosThreadMutexLock(&cache->mtx);
indexMemRef(cache->imm);
......@@ -458,17 +468,16 @@ void indexCacheIteratorDestroy(Iterate* iter) {
taosMemoryFree(iter);
}
int indexCacheSchedToMerge(IndexCache* pCache) {
int indexCacheSchedToMerge(IndexCache* pCache, bool notify) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doMergeWork;
schedMsg.ahandle = pCache;
schedMsg.thandle = NULL;
// schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t));
// memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t));
if (notify) {
schedMsg.thandle = taosMemoryMalloc(1);
}
schedMsg.msg = NULL;
indexAcquireRef(pCache->index->refId);
taosScheduleTask(indexQhandle, &schedMsg);
return 0;
}
......@@ -478,8 +487,10 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
break;
} else if (cache->imm != NULL) {
// TODO: wake up by condition variable
taosThreadCondWait(&cache->finished, &cache->mtx);
indexCacheWait(cache);
} else {
bool notifyQuit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false;
indexCacheRef(cache);
cache->imm = cache->mem;
cache->mem = indexInternalCacheCreate(cache->type);
......@@ -487,7 +498,7 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
cache->occupiedMem = 0;
// sched to merge
// unref cache in bgwork
indexCacheSchedToMerge(cache);
indexCacheSchedToMerge(cache, notifyQuit);
}
}
}
......@@ -533,6 +544,19 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
return 0;
// encode end
}
void indexCacheForceToMerge(void* cache) {
IndexCache* pCache = cache;
indexCacheRef(pCache);
taosThreadMutexLock(&pCache->mtx);
indexInfo("%p is forced to merge into tfile", pCache);
pCache->occupiedMem += MEM_SIGNAL_QUIT;
indexCacheMakeRoomForWrite(pCache);
taosThreadMutexUnlock(&pCache->mtx);
indexCacheUnRef(pCache);
return;
}
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
IndexCache* pCache = cache;
return 0;
......@@ -691,6 +715,9 @@ static MemTable* indexInternalCacheCreate(int8_t type) {
static void doMergeWork(SSchedMsg* msg) {
IndexCache* pCache = msg->ahandle;
SIndex* sidx = (SIndex*)pCache->index;
sidx->quit = msg->thandle ? true : false;
taosMemoryFree(msg->thandle);
indexFlushCacheToTFile(sidx, pCache);
}
static bool indexCacheIteratorNext(Iterate* itera) {
......@@ -709,9 +736,6 @@ static bool indexCacheIteratorNext(Iterate* itera) {
iv->type = ct->operaType;
iv->ver = ct->version;
iv->colVal = tstrdup(ct->colVal);
// printf("col Val: %s\n", iv->colVal);
// iv->colType = cv->colType;
taosArrayPush(iv->val, &ct->uid);
}
return next;
......
......@@ -22,6 +22,29 @@
#include "ttypes.h"
#include "tvariant.h"
#define INDEX_DATA_BOOL_NULL 0x02
#define INDEX_DATA_TINYINT_NULL 0x80
#define INDEX_DATA_SMALLINT_NULL 0x8000
#define INDEX_DATA_INT_NULL 0x80000000L
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL 0xFF
#define INDEX_DATA_JSON_NULL 0xFFFFFFFF
#define INDEX_DATA_JSON_null 0xFFFFFFFE
#define INDEX_DATA_JSON_NOT_NULL 0x01
#define INDEX_DATA_UTINYINT_NULL 0xFF
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
#define INDEX_DATA_UINT_NULL 0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
#define INDEX_DATA_NULL_STR "NULL"
#define INDEX_DATA_NULL_STR_L "null"
char JSON_COLUMN[] = "JSON";
char JSON_VALUE_DELIM = '&';
......@@ -372,7 +395,7 @@ int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) {
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src));
*dst = (char*) * dst - tlen;
*dst = (char*)*dst - tlen;
break;
}
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
......
......@@ -171,7 +171,6 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
oldReader->remove = true;
tfileReaderUnRef(oldReader);
}
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
tfileReaderRef(reader);
return;
......@@ -499,15 +498,15 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
int ret = 0;
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
return tfSearch[1][qtype](reader, term, tr);
ret = tfSearch[1][qtype](reader, term, tr);
} else {
return tfSearch[0][qtype](reader, term, tr);
ret = tfSearch[0][qtype](reader, term, tr);
}
tfileReaderUnRef(reader);
return 0;
return ret;
}
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
......
......@@ -272,9 +272,26 @@ void validateFst() {
}
delete m;
}
static std::string logDir = "/tmp/log";
static void initLog() {
const char* defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
sDebugFlag = 143;
strcpy(tsLogDir, logDir.c_str());
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
class IndexEnv : public ::testing::Test {
protected:
virtual void SetUp() {
initLog();
taosRemoveDir(path);
opts = indexOptsCreate();
int ret = indexOpen(opts, path, &index);
......@@ -804,7 +821,7 @@ class IndexObj {
}
~IndexObj() {
indexCleanUp();
// indexCleanUp();
indexClose(idx);
}
......@@ -817,7 +834,10 @@ class IndexObj {
class IndexEnv2 : public ::testing::Test {
protected:
virtual void SetUp() { index = new IndexObj(); }
virtual void SetUp() {
initLog();
index = new IndexObj();
}
virtual void TearDown() { delete index; }
IndexObj* index;
};
......@@ -884,11 +904,34 @@ TEST_F(IndexEnv2, testIndexOpen) {
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
index->Search(mq, result);
std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
assert(taosArrayGetSize(result) == 400);
EXPECT_EQ(400, taosArrayGetSize(result));
taosArrayDestroy(result);
indexMultiTermQueryDestroy(mq);
}
}
TEST_F(IndexEnv2, testEmptyIndexOpen) {
std::string path = "/tmp/test";
if (index->Init(path) != 0) {
std::cout << "failed to init index" << std::endl;
exit(1);
}
int targetSize = 1;
{
std::string colName("tag1"), colVal("Hello");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < targetSize; i++) {
int tableId = i;
int ret = index->Put(terms, tableId);
assert(ret == 0);
}
indexMultiTermDestroy(terms);
}
}
TEST_F(IndexEnv2, testIndex_TrigeFlush) {
std::string path = TD_TMP_DIR_PATH "testxxx";
......
......@@ -20,22 +20,12 @@ extern "C" {
#endif
#include <uv.h>
#include "lz4.h"
#include "os.h"
#include "osSocket.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "theap.h"
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transLog.h"
#include "transportInt.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
typedef void* queue[2];
......@@ -108,27 +98,6 @@ typedef void* queue[2];
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout
typedef struct {
SRpcInfo* pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app
// struct SRpcConn* pConn; // pConn allocated
tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app
int32_t contLen; // content length
// int32_t code; // error code
// int16_t numOfTry; // number of try for different servers
// int8_t oldInUse; // server EP inUse passed by app
// int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
char* ip;
uint32_t port;
// SEpSet* pSet; // for synchronous API
} SRpcReqContext;
typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx;
typedef SRpcCtxVal STransCtxVal;
......
......@@ -22,15 +22,13 @@
#include "lz4.h"
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tref.h"
#include "tmsg.h"
#include "transLog.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
#include "tglobal.h"
#ifdef __cplusplus
extern "C" {
......
......@@ -295,14 +295,14 @@ static void uvHandleReq(SSrvConn* pConn) {
// no ref here
}
// if pHead->noResp = 1,
// pHead->noResp = 1,
// 1. server application should not send resp on handle
// 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist
transMsg.info.handle = (void*)uvAcquireExHandle(pConn->refId);
tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId);
transMsg.info.refId = pConn->refId;
tTrace("server handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId);
assert(transMsg.info.handle != NULL);
if (pHead->noResp == 1) {
transMsg.info.refId = -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册