You need to sign in or sign up before continuing.
提交 6b39c690 编写于 作者: dengyihao's avatar dengyihao

json idx

...@@ -32,7 +32,7 @@ extern "C" { ...@@ -32,7 +32,7 @@ extern "C" {
#define TD_VER_MAX UINT64_MAX // TODO: use the real max version from query handle #define TD_VER_MAX UINT64_MAX // TODO: use the real max version from query handle
// Bytes for each type. // Bytes for each type.
extern const int32_t TYPE_BYTES[15]; extern const int32_t TYPE_BYTES[16];
// TODO: replace and remove code below // TODO: replace and remove code below
#define CHAR_BYTES sizeof(char) #define CHAR_BYTES sizeof(char)
......
...@@ -1280,8 +1280,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -1280,8 +1280,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
varDataVal(dst) + CHAR_BYTES); varDataVal(dst) + CHAR_BYTES);
if (length <= 0) { if (length <= 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset);
varDataVal(jsonInnerData));
length = 0; length = 0;
} }
varDataSetLen(dst, length + CHAR_BYTES * 2); varDataSetLen(dst, length + CHAR_BYTES * 2);
......
...@@ -1271,12 +1271,12 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { ...@@ -1271,12 +1271,12 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
void colDataDestroy(SColumnInfoData* pColData) { void colDataDestroy(SColumnInfoData* pColData) {
if (IS_VAR_DATA_TYPE(pColData->info.type)) { if (IS_VAR_DATA_TYPE(pColData->info.type)) {
taosMemoryFree(pColData->varmeta.offset); taosMemoryFreeClear(pColData->varmeta.offset);
} else { } else {
taosMemoryFree(pColData->nullbitmap); taosMemoryFreeClear(pColData->nullbitmap);
} }
taosMemoryFree(pColData->pData); taosMemoryFreeClear(pColData->pData);
} }
static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
......
...@@ -332,7 +332,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { ...@@ -332,7 +332,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1; if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "smlTagNullName", tsSmlTagName, 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 4; tsNumOfTaskQueueThreads = tsNumOfCores / 4;
...@@ -532,7 +532,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -532,7 +532,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
} }
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagNullName")->str, TSDB_COL_NAME_LEN); tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "tcompression.h" #include "tcompression.h"
#include "trow.h" #include "trow.h"
const int32_t TYPE_BYTES[15] = { const int32_t TYPE_BYTES[16] = {
-1, // TSDB_DATA_TYPE_NULL -1, // TSDB_DATA_TYPE_NULL
CHAR_BYTES, // TSDB_DATA_TYPE_BOOL CHAR_BYTES, // TSDB_DATA_TYPE_BOOL
CHAR_BYTES, // TSDB_DATA_TYPE_TINYINT CHAR_BYTES, // TSDB_DATA_TYPE_TINYINT
...@@ -34,6 +34,7 @@ const int32_t TYPE_BYTES[15] = { ...@@ -34,6 +34,7 @@ const int32_t TYPE_BYTES[15] = {
SHORT_BYTES, // TSDB_DATA_TYPE_USMALLINT SHORT_BYTES, // TSDB_DATA_TYPE_USMALLINT
INT_BYTES, // TSDB_DATA_TYPE_UINT INT_BYTES, // TSDB_DATA_TYPE_UINT
sizeof(uint64_t), // TSDB_DATA_TYPE_UBIGINT sizeof(uint64_t), // TSDB_DATA_TYPE_UBIGINT
TSDB_MAX_JSON_TAG_LEN, // TSDB_DATA_TYPE_JSON
}; };
#define DO_STATICS(__sum, __min, __max, __minIndex, __maxIndex, _list, _index) \ #define DO_STATICS(__sum, __min, __max, __minIndex, __maxIndex, _list, _index) \
......
...@@ -61,14 +61,14 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const ...@@ -61,14 +61,14 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
if (tTagToValArray((const STag *)data, &pTagVals) != 0) { if (tTagToValArray((const STag *)data, &pTagVals) != 0) {
return -1; return -1;
} }
char key[512] = {0};
SIndexMultiTerm *terms = indexMultiTermCreate(); SIndexMultiTerm *terms = indexMultiTermCreate();
int16_t nCols = taosArrayGetSize(pTagVals); int16_t nCols = taosArrayGetSize(pTagVals);
for (int i = 0; i < nCols; i++) { for (int i = 0; i < nCols; i++) {
STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i); STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
char type = pTagVal->type; char type = pTagVal->type;
sprintf(key, "%s_%s", tagName, pTagVal->pKey);
char * key = pTagVal->pKey;
int32_t nKey = strlen(key); int32_t nKey = strlen(key);
SIndexTerm *term = NULL; SIndexTerm *term = NULL;
...@@ -93,12 +93,11 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const ...@@ -93,12 +93,11 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const
} else if (type == TSDB_DATA_TYPE_BOOL) { } else if (type == TSDB_DATA_TYPE_BOOL) {
int val = *(int *)(&pTagVal->i64); int val = *(int *)(&pTagVal->i64);
int len = 0; int len = 0;
term = indexTermCreate(suid, ADD_VALUE, type, key, nKey, (const char *)&val, len); term = indexTermCreate(suid, ADD_VALUE, TSDB_DATA_TYPE_INT, key, nKey, (const char *)&val, len);
} }
if (term != NULL) { if (term != NULL) {
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
} }
memset(key, 0, sizeof(key));
} }
tIndexJsonPut(pMeta->pTagIvtIdx, terms, tuid); tIndexJsonPut(pMeta->pTagIvtIdx, terms, tuid);
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
......
...@@ -353,6 +353,7 @@ typedef struct STagScanInfo { ...@@ -353,6 +353,7 @@ typedef struct STagScanInfo {
int32_t curPos; int32_t curPos;
SReadHandle readHandle; SReadHandle readHandle;
STableListInfo *pTableList; STableListInfo *pTableList;
SNode* pFilterNode; // filter info,
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
...@@ -747,7 +748,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan ...@@ -747,7 +748,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator); void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols); void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
...@@ -777,7 +778,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -777,7 +778,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
......
...@@ -1818,9 +1818,9 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO ...@@ -1818,9 +1818,9 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
} }
} }
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, bool needFree);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree) {
if (pFilterNode == NULL) { if (pFilterNode == NULL) {
return; return;
} }
...@@ -1839,11 +1839,11 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { ...@@ -1839,11 +1839,11 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
filterFreeInfo(filter); filterFreeInfo(filter);
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); extractQualifiedTupleByFilterResult(pBlock, rowRes, keep, needFree);
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
} }
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) { void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep, bool needFree) {
if (keep) { if (keep) {
return; return;
} }
...@@ -1883,8 +1883,20 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR ...@@ -1883,8 +1883,20 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
ASSERT(pBlock->info.rows == numOfRows); ASSERT(pBlock->info.rows == numOfRows);
} }
SColumnInfoData tmp = *pSrc;
*pSrc = *pDst; *pSrc = *pDst;
*pDst = tmp;
if (!needFree) {
if (IS_VAR_DATA_TYPE(pDst->info.type)) { // this elements do not need free
pDst->varmeta.offset = NULL;
} else {
pDst->nullbitmap = NULL;
}
pDst->pData = NULL;
}
} }
blockDataDestroy(px); // fix memory leak
} else { } else {
// do nothing // do nothing
pBlock->info.rows = 0; pBlock->info.rows = 0;
...@@ -3640,7 +3652,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -3640,7 +3652,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
doFilter(pProjectInfo->pFilterNode, pBlock); doFilter(pProjectInfo->pFilterNode, pBlock, true);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
......
...@@ -359,7 +359,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -359,7 +359,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
while(1) { while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes); doFilter(pInfo->pCondition, pRes, true);
bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo); bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
......
...@@ -267,7 +267,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -267,7 +267,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
} }
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock); doFilter(pTableScanInfo->pFilterNode, pBlock, false);
int64_t et = taosGetTimestampMs(); int64_t et = taosGetTimestampMs();
pTableScanInfo->readRecorder.filterTime += (et - st); pTableScanInfo->readRecorder.filterTime += (et - st);
...@@ -950,7 +950,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { ...@@ -950,7 +950,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes); addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
} }
doFilter(pInfo->pCondition, pInfo->pRes); doFilter(pInfo->pCondition, pInfo->pRes, false);
blockDataUpdateTsWindow(pInfo->pRes, 0); blockDataUpdateTsWindow(pInfo->pRes, 0);
break; break;
} }
...@@ -1722,7 +1722,9 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1722,7 +1722,9 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} }
pRes->info.rows = count; pRes->info.rows = count;
pOperator->resultInfo.totalRows += count; doFilter(pInfo->pFilterNode, pRes, true);
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
} }
......
...@@ -28,6 +28,7 @@ extern char JSON_VALUE_DELIM; ...@@ -28,6 +28,7 @@ extern char JSON_VALUE_DELIM;
char* indexPackJsonData(SIndexTerm* itm); char* indexPackJsonData(SIndexTerm* itm);
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip); char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip);
char* indexPackJsonDataPrefixNoType(SIndexTerm* itm, int32_t* skip);
typedef enum { MATCH, CONTINUE, BREAK } TExeCond; typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
......
...@@ -43,7 +43,7 @@ extern "C" { ...@@ -43,7 +43,7 @@ extern "C" {
#define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("IDX", DEBUG_TRACE, idxDebugFlag, __VA_ARGS__);} } while (0) #define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("IDX", DEBUG_TRACE, idxDebugFlag, __VA_ARGS__);} } while (0)
// clang-format on // clang-format on
typedef enum { LT, LE, GT, GE, CONTAINS } RangeType; typedef enum { LT, LE, GT, GE, CONTAINS, EQ } RangeType;
typedef enum { kTypeValue, kTypeDeletion } STermValueType; typedef enum { kTypeValue, kTypeDeletion } STermValueType;
typedef struct SIndexStat { typedef struct SIndexStat {
......
...@@ -48,6 +48,7 @@ static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STer ...@@ -48,6 +48,7 @@ static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STer
/*comm func of compare, used in (LE/LT/GE/GT compare)*/ /*comm func of compare, used in (LE/LT/GE/GT compare)*/
static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s, RangeType type); static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s, RangeType type);
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchEqual_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s);
...@@ -63,7 +64,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR ...@@ -63,7 +64,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s) = { static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s) = {
{cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual, {cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual,
cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange}, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange},
{cacheSearchTerm_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON, {cacheSearchEqual_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON,
cacheSearchLessThan_JSON, cacheSearchLessEqual_JSON, cacheSearchGreaterThan_JSON, cacheSearchGreaterEqual_JSON, cacheSearchLessThan_JSON, cacheSearchLessEqual_JSON, cacheSearchGreaterThan_JSON, cacheSearchGreaterEqual_JSON,
cacheSearchRange_JSON}}; cacheSearchRange_JSON}};
...@@ -123,12 +124,11 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt* ...@@ -123,12 +124,11 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt*
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
} }
_cache_range_compare cmpFn = indexGetCompare(type);
MemTable* mem = cache; MemTable* mem = cache;
IndexCache* pCache = mem->pCache; IndexCache* pCache = mem->pCache;
_cache_range_compare cmpFn = indexGetCompare(type);
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal; pCt->colVal = term->colVal;
pCt->colType = term->colType; pCt->colType = term->colType;
...@@ -221,15 +221,18 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr ...@@ -221,15 +221,18 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, CONTAINS);
}
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, EQ);
}
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, CONTAINS);
}
static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT); return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT);
} }
...@@ -267,13 +270,20 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR ...@@ -267,13 +270,20 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
int skip = 0; int skip = 0;
char* exBuf = NULL; char* exBuf = NULL;
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { if (type == CONTAINS) {
SIndexTerm tm = {.suid = term->suid,
.operType = term->operType,
.colType = term->colType,
.colName = term->colVal,
.nColName = term->nColVal};
exBuf = indexPackJsonDataPrefixNoType(&tm, &skip);
pCt->colVal = exBuf;
} else {
exBuf = indexPackJsonDataPrefix(term, &skip); exBuf = indexPackJsonDataPrefix(term, &skip);
pCt->colVal = exBuf; pCt->colVal = exBuf;
} }
char* key = indexCacheTermGet(pCt); char* key = indexCacheTermGet(pCt);
// SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
...@@ -281,14 +291,22 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR ...@@ -281,14 +291,22 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
break; break;
} }
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
// printf("json val: %s\n", c->colVal); TExeCond cond = CONTINUE;
if (0 != strncmp(c->colVal, pCt->colVal, skip)) { if (type == CONTAINS) {
break; if (0 == strncmp(c->colVal, pCt->colVal, skip)) {
cond = MATCH;
} }
} else {
if (0 != strncmp(c->colVal, pCt->colVal, skip - 1)) {
break;
} else if (0 != strncmp(c->colVal, pCt->colVal, skip)) {
continue;
} else {
char* p = taosMemoryCalloc(1, strlen(c->colVal) + 1); char* p = taosMemoryCalloc(1, strlen(c->colVal) + 1);
memcpy(p, c->colVal, strlen(c->colVal)); memcpy(p, c->colVal, strlen(c->colVal));
cond = cmpFn(p + skip, term->colVal, dType);
TExeCond cond = cmpFn(p + skip, term->colVal, dType); }
}
if (cond == MATCH) { if (cond == MATCH) {
if (c->operaType == ADD_VALUE) { if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
...@@ -302,7 +320,6 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR ...@@ -302,7 +320,6 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
} else if (cond == BREAK) { } else if (cond == BREAK) {
break; break;
} }
taosMemoryFree(p);
} }
taosMemoryFree(pCt); taosMemoryFree(pCt);
......
...@@ -102,6 +102,10 @@ static TExeCond tCompareContains(void* a, void* b, int8_t type) { ...@@ -102,6 +102,10 @@ static TExeCond tCompareContains(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type); __compar_fn_t func = indexGetCompar(type);
return tCompare(func, QUERY_TERM, a, b, type); return tCompare(func, QUERY_TERM, a, b, type);
} }
static TExeCond tCompareEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type);
return tCompare(func, QUERY_TERM, a, b, type);
}
TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) { TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) {
if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY) { if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY) {
return tDoCompare(func, cmptype, a, b); return tDoCompare(func, cmptype, a, b);
...@@ -186,9 +190,11 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { ...@@ -186,9 +190,11 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
} }
case QUERY_GREATER_EQUAL: { case QUERY_GREATER_EQUAL: {
if (ret >= 0) return MATCH; if (ret >= 0) return MATCH;
break;
} }
case QUERY_TERM: { case QUERY_TERM: {
if (ret == 0) return MATCH; if (ret == 0) return MATCH;
break;
} }
default: default:
return BREAK; return BREAK;
...@@ -197,7 +203,7 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { ...@@ -197,7 +203,7 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
} }
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = { static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {
tCompareLessThan, tCompareLessEqual, tCompareGreaterThan, tCompareGreaterEqual, tCompareContains}; tCompareLessThan, tCompareLessEqual, tCompareGreaterThan, tCompareGreaterEqual, tCompareContains, tCompareEqual};
_cache_range_compare indexGetCompare(RangeType ty) { return rangeCompare[ty]; } _cache_range_compare indexGetCompare(RangeType ty) { return rangeCompare[ty]; }
...@@ -256,6 +262,26 @@ char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) { ...@@ -256,6 +262,26 @@ char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) {
return buf; return buf;
} }
char* indexPackJsonDataPrefixNoType(SIndexTerm* itm, int32_t* skip) {
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
*/
uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType);
int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1;
char* buf = (char*)taosMemoryCalloc(1, sz);
char* p = buf;
memcpy(p, itm->colName, itm->nColName);
p += itm->nColName;
memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM));
p += sizeof(JSON_VALUE_DELIM);
*skip = p - buf;
return buf;
}
int32_t indexConvertData(void* src, int8_t type, void** dst) { int32_t indexConvertData(void* src, int8_t type, void** dst) {
int tlen = -1; int tlen = -1;
......
...@@ -173,10 +173,8 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) { ...@@ -173,10 +173,8 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
param->colId = l->colId; param->colId = l->colId;
param->colValType = l->node.resType.type; param->colValType = l->node.resType.type;
memcpy(param->dbName, l->dbName, sizeof(l->dbName)); memcpy(param->dbName, l->dbName, sizeof(l->dbName));
#pragma GCC diagnostic push memcpy(param->colName, r->literal, strlen(r->literal));
#pragma GCC diagnostic ignored "-Wformat-overflow" // sprintf(param->colName, "%s_%s", l->colName, r->literal);
sprintf(param->colName, "%s_%s", l->colName, r->literal);
#pragma GCC diagnostic pop
param->colValType = r->typeData; param->colValType = r->typeData;
return 0; return 0;
// memcpy(param->colName, l->colName, sizeof(l->colName)); // memcpy(param->colName, l->colName, sizeof(l->colName));
...@@ -188,6 +186,9 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { ...@@ -188,6 +186,9 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
SIF_ERR_RET(sifGetValueFromNode(node, &param->condValue)); SIF_ERR_RET(sifGetValueFromNode(node, &param->condValue));
param->colId = -1; param->colId = -1;
param->colValType = (uint8_t)(vn->node.resType.type); param->colValType = (uint8_t)(vn->node.resType.type);
if (vn->literal == NULL || strlen(vn->literal) == 0) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
memcpy(param->colName, vn->literal, strlen(vn->literal)); memcpy(param->colName, vn->literal, strlen(vn->literal));
break; break;
} }
...@@ -340,9 +341,9 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) { ...@@ -340,9 +341,9 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
return NULL; return NULL;
} }
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
SIndexMetaArg *arg = &output->arg;
int ret = 0; int ret = 0;
SIndexMetaArg * arg = &output->arg;
EIndexQueryType qtype = 0; EIndexQueryType qtype = 0;
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype)); SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
if (left->colValType == TSDB_DATA_TYPE_JSON) { if (left->colValType == TSDB_DATA_TYPE_JSON) {
...@@ -506,7 +507,6 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { ...@@ -506,7 +507,6 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
if (nParam <= 1) { if (nParam <= 1) {
output->status = SFLT_NOT_INDEX; output->status = SFLT_NOT_INDEX;
return code; return code;
// SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
if (node->opType == OP_TYPE_JSON_GET_VALUE) { if (node->opType == OP_TYPE_JSON_GET_VALUE) {
return code; return code;
......
...@@ -22,6 +22,14 @@ int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) { ...@@ -22,6 +22,14 @@ int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) { int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(terms); i++) { for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i); SIndexJsonTerm *p = taosArrayGetP(terms, i);
if (p->colType == TSDB_DATA_TYPE_BOOL) {
p->colType = TSDB_DATA_TYPE_INT;
} else if (p->colType == TSDB_DATA_TYPE_VARCHAR || p->colType == TSDB_DATA_TYPE_NCHAR ||
p->colType == TSDB_DATA_TYPE_BINARY) {
// p->colType = TSDB_DATA_TYPE_NCHAR;
} else {
p->colType = TSDB_DATA_TYPE_DOUBLE;
}
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON); INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
} }
// handle put // handle put
...@@ -32,6 +40,14 @@ int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *re ...@@ -32,6 +40,14 @@ int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *re
SArray *terms = tq->query; SArray *terms = tq->query;
for (int i = 0; i < taosArrayGetSize(terms); i++) { for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i); SIndexJsonTerm *p = taosArrayGetP(terms, i);
if (p->colType == TSDB_DATA_TYPE_BOOL) {
p->colType = TSDB_DATA_TYPE_INT;
} else if (p->colType == TSDB_DATA_TYPE_VARCHAR || p->colType == TSDB_DATA_TYPE_NCHAR ||
p->colType == TSDB_DATA_TYPE_BINARY) {
// p->colType = TSDB_DATA_TYPE_NCHAR;
} else {
p->colType = TSDB_DATA_TYPE_DOUBLE;
}
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON); INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
} }
// handle search // handle search
......
...@@ -73,6 +73,7 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTRslt* tr); ...@@ -73,6 +73,7 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype); static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype);
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr);
...@@ -87,7 +88,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt ...@@ -87,7 +88,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTRslt* tr) = { static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTRslt* tr) = {
{tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual, {tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual,
tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange}, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange},
{tfSearchTerm_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON, {tfSearchEqual_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON,
tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}}; tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}};
TFileCache* tfileCacheCreate(const char* path) { TFileCache* tfileCacheCreate(const char* path) {
...@@ -424,6 +425,9 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { ...@@ -424,6 +425,9 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
// deprecate api // deprecate api
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tfSearchEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, EQ);
}
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, CONTAINS); return tfSearchCompareFunc_JSON(reader, tem, tr, CONTAINS);
} }
...@@ -456,7 +460,17 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt ...@@ -456,7 +460,17 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
int ret = 0; int ret = 0;
int skip = 0; int skip = 0;
char* p = indexPackJsonDataPrefix(tem, &skip); char* p = NULL;
if (ctype == CONTAINS) {
SIndexTerm tm = {.suid = tem->suid,
.operType = tem->operType,
.colType = tem->colType,
.colName = tem->colVal,
.nColName = tem->nColVal};
p = indexPackJsonDataPrefixNoType(&tm, &skip);
} else {
p = indexPackJsonDataPrefix(tem, &skip);
}
_cache_range_compare cmpFn = indexGetCompare(ctype); _cache_range_compare cmpFn = indexGetCompare(ctype);
...@@ -472,16 +486,20 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt ...@@ -472,16 +486,20 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
int32_t sz = 0; int32_t sz = 0;
char* ch = (char*)fstSliceData(s, &sz); char* ch = (char*)fstSliceData(s, &sz);
char* tmp = taosMemoryCalloc(1, sz + 1); TExeCond cond = CONTINUE;
memcpy(tmp, ch, sz); if (ctype == CONTAINS) {
if (0 == strncmp(ch, p, skip)) {
if (0 != strncmp(tmp, p, skip)) { cond = MATCH;
}
} else {
if (0 != strncmp(ch, p, skip - 1)) {
swsResultDestroy(rt); swsResultDestroy(rt);
taosMemoryFree(tmp);
break; break;
} else if (0 != strncmp(ch, p, skip)) {
continue;
}
cond = cmpFn(ch + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
} }
TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
if (MATCH == cond) { if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) { } else if (CONTINUE == cond) {
...@@ -489,7 +507,6 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt ...@@ -489,7 +507,6 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
swsResultDestroy(rt); swsResultDestroy(rt);
break; break;
} }
taosMemoryFree(tmp);
swsResultDestroy(rt); swsResultDestroy(rt);
} }
streamWithStateDestroy(st); streamWithStateDestroy(st);
......
...@@ -107,42 +107,41 @@ TEST_F(JsonEnv, testWrite) { ...@@ -107,42 +107,41 @@ TEST_F(JsonEnv, testWrite) {
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("ab"); std::string colVal("ab");
for (int i = 0; i < 100; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 100; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("voltage"); std::string colName("voltage");
std::string colVal("ab1"); std::string colVal("ab1");
for (int i = 0; i < 100; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 100; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("voltage"); std::string colName("voltage");
std::string colVal("123"); std::string colVal("123");
for (size_t i = 0; i < 100; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 100; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("ab"); std::string colVal("ab");
...@@ -154,7 +153,7 @@ TEST_F(JsonEnv, testWrite) { ...@@ -154,7 +153,7 @@ TEST_F(JsonEnv, testWrite) {
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM); indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result); tIndexJsonSearch(index, mq, result);
assert(100 == taosArrayGetSize(result)); EXPECT_EQ(100, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq); indexMultiTermQueryDestroy(mq);
} }
} }
...@@ -162,45 +161,45 @@ TEST_F(JsonEnv, testWriteMillonData) { ...@@ -162,45 +161,45 @@ TEST_F(JsonEnv, testWriteMillonData) {
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("ab"); std::string colVal("ab");
for (size_t i = 0; i < 10; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 10; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("voltagefdadfa"); std::string colName("voltagefdadfa");
std::string colVal("abxxxxxxxxxxxx"); std::string colVal("abxxxxxxxxxxxx");
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
colVal[i % colVal.size()] = '0' + i % 128; colVal[i % colVal.size()] = '0' + i % 128;
for (size_t i = 0; i < 100; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 100; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
} }
}
{ {
std::string colName("voltagefdadfa"); std::string colName("voltagefdadfa");
std::string colVal("abxxxxxxxxxxxx"); std::string colVal("abxxxxxxxxxxxx");
for (size_t i = 0; i < 1000; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("ab"); std::string colVal("ab");
...@@ -227,7 +226,7 @@ TEST_F(JsonEnv, testWriteMillonData) { ...@@ -227,7 +226,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
tIndexJsonSearch(index, mq, result); tIndexJsonSearch(index, mq, result);
assert(0 == taosArrayGetSize(result)); EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq); indexMultiTermQueryDestroy(mq);
} }
{ {
...@@ -253,55 +252,55 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { ...@@ -253,55 +252,55 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
std::string colName("test"); std::string colName("test");
// std::string colVal("10"); // std::string colVal("10");
int val = 10; int val = 10;
for (size_t i = 0; i < 1000; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val)); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("test2"); std::string colName("test2");
int val = 20; int val = 20;
for (size_t i = 0; i < 1000; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val)); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("test"); std::string colName("test");
int val = 15; int val = 15;
for (size_t i = 0; i < 1000; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val)); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("test2"); std::string colName("test2");
const char* val = "test"; const char* val = "test";
for (size_t i = 0; i < 1000; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
(const char*)val, strlen(val)); (const char*)val, strlen(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("test"); std::string colName("test");
int val = 15; int val = 15;
...@@ -380,29 +379,29 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) { ...@@ -380,29 +379,29 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
{ {
std::string colName("test1"); std::string colName("test1");
int val = 10; int val = 10;
for (size_t i = 0; i < 1000; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val)); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("xxxxxxxxxxxxxxxxxxx"); std::string colVal("xxxxxxxxxxxxxxxxxxx");
for (size_t i = 0; i < 1000; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("test1"); std::string colName("test1");
int val = 10; int val = 10;
...@@ -478,16 +477,16 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) { ...@@ -478,16 +477,16 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
std::string colName("other_column"); std::string colName("other_column");
int val = 100; int val = 100;
for (size_t i = 0; i < 1000; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val)); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i); tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("test1"); std::string colName("test1");
int val = 10; int val = 10;
...@@ -506,16 +505,16 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) { ...@@ -506,16 +505,16 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
{ {
std::string colName("test1"); std::string colName("test1");
int val = 15; int val = 15;
for (size_t i = 0; i < 1000; i++) {
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val)); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i + 1000); tIndexJsonPut(index, terms, i + 1000);
}
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
}
{ {
std::string colName("test1"); std::string colName("test1");
int val = 8; int val = 8;
......
...@@ -338,8 +338,7 @@ void cliHandleResp(SCliConn* conn) { ...@@ -338,8 +338,7 @@ void cliHandleResp(SCliConn* conn) {
return; return;
} }
int ret = cliAppCb(conn, &transMsg, pMsg); if (cliAppCb(conn, &transMsg, pMsg) != 0) {
if (ret != 0) {
tTrace("try to send req to next node"); tTrace("try to send req to next node");
return; return;
} }
...@@ -403,15 +402,13 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -403,15 +402,13 @@ void cliHandleExcept(SCliConn* pConn) {
continue; continue;
} }
} }
int ret = cliAppCb(pConn, &transMsg, pMsg); if (cliAppCb(pConn, &transMsg, pMsg) != 0) {
if (ret != 0) {
tTrace("try to send req to next node"); tTrace("try to send req to next node");
return; return;
} }
destroyCmsg(pMsg); destroyCmsg(pMsg);
tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); tTrace("%s cli conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
} while (!transQueueEmpty(&pConn->cliMsgs)); } while (!transQueueEmpty(&pConn->cliMsgs));
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
} }
...@@ -976,7 +973,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -976,7 +973,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
arg->param1 = pMsg; arg->param1 = pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
cliDestroyConn(pConn, true); transUnrefCliHandle(pConn);
return -1; return -1;
} }
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) { } else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册