提交 b77bf6f5 编写于 作者: J jiacy-jcy

Merge branch '3.0' into test/jcy

......@@ -31,13 +31,13 @@ typedef enum EFunctionType {
FUNCTION_TYPE_ELAPSED,
FUNCTION_TYPE_IRATE,
FUNCTION_TYPE_LAST_ROW,
FUNCTION_TYPE_LEASTSQUARES,
FUNCTION_TYPE_MAX,
FUNCTION_TYPE_MIN,
FUNCTION_TYPE_MODE,
FUNCTION_TYPE_PERCENTILE,
FUNCTION_TYPE_SPREAD,
FUNCTION_TYPE_STDDEV,
FUNCTION_TYPE_LEASTSQUARES,
FUNCTION_TYPE_SUM,
FUNCTION_TYPE_TWA,
FUNCTION_TYPE_HISTOGRAM,
......
......@@ -55,6 +55,12 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx);
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t stddevInvertFunction(SqlFunctionCtx* pCtx);
bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t leastSQRFunction(SqlFunctionCtx* pCtx);
int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx);
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t percentileFunction(SqlFunctionCtx *pCtx);
......
......@@ -226,6 +226,23 @@ static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS;
}
static int32_t translateLeastSQR(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (3 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
for (int32_t i = 0; i < numOfParams; ++i) {
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
if (!IS_NUMERIC_TYPE(colType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
}
pFunc->node.resType = (SDataType) { .bytes = 64, .type = TSDB_DATA_TYPE_BINARY };
return TSDB_CODE_SUCCESS;
}
static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (4 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
......@@ -537,6 +554,17 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = stddevFinalize,
.invertFunc = stddevInvertFunction
},
{
.name = "leastsquares",
.type = FUNCTION_TYPE_LEASTSQUARES,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateLeastSQR,
.getEnvFunc = getLeastSQRFuncEnv,
.initFunc = leastSQRFunctionSetup,
.processFunc = leastSQRFunction,
.finalizeFunc = leastSQRFinalize,
.invertFunc = leastSQRInvertFunction
},
{
.name = "avg",
.type = FUNCTION_TYPE_AVG,
......
......@@ -65,6 +65,13 @@ typedef struct SStddevRes {
};
} SStddevRes;
typedef struct SLeastSQRInfo {
double matrix[2][3];
double startVal;
double stepVal;
int64_t num;
} SLeastSQRInfo;
typedef struct SPercentileInfo {
double result;
tMemBucket* pMemBucket;
......@@ -1404,6 +1411,181 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return functionFinalize(pCtx, pBlock);
}
bool getLeastSQRFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SLeastSQRInfo);
return true;
}
bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
SLeastSQRInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
pInfo->startVal = IS_FLOAT_TYPE(pCtx->param[1].param.nType) ? pCtx->param[1].param.d :
(double)pCtx->param[1].param.i;
pInfo->stepVal = IS_FLOAT_TYPE(pCtx->param[1].param.nType) ? pCtx->param[2].param.d :
(double)pCtx->param[1].param.i;
return true;
}
#define LEASTSQR_CAL(p, x, y, index, step) \
do { \
(p)[0][0] += (double)(x) * (x); \
(p)[0][1] += (double)(x); \
(p)[0][2] += (double)(x) * (y)[index]; \
(p)[1][2] += (y)[index]; \
(x) += step; \
} while (0)
int32_t leastSQRFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
SLeastSQRInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SColumnInfoData* pCol = pInput->pData[0];
double(*param)[3] = pInfo->matrix;
double x = pInfo->startVal;
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
switch (type) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t* plist = (int8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem++;
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
break;
}
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t* plist = (int16_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem++;
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
}
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t* plist = (int32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem++;
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t* plist = (int64_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem++;
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float* plist = (float*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem++;
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double* plist = (double*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem++;
LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal);
}
break;
}
default:
break;
}
pInfo->startVal = x;
pInfo->num += numOfElem;
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SLeastSQRInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t currentRow = pBlock->info.rows;
if (0 == pInfo->num) {
return 0;
}
double(*param)[3] = pInfo->matrix;
param[1][1] = (double)pInfo->num;
param[1][0] = param[0][1];
param[0][0] -= param[1][0] * (param[0][1] / param[1][1]);
param[0][2] -= param[1][2] * (param[0][1] / param[1][1]);
param[0][1] = 0;
param[1][2] -= param[0][2] * (param[1][0] / param[0][0]);
param[1][0] = 0;
param[0][2] /= param[0][0];
param[1][2] /= param[1][1];
char buf[64] = {0};
size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param[0][2], param[1][2]);
varDataSetLen(buf, len);
colDataAppend(pCol, currentRow, buf, false);
return pResInfo->numOfRes;
}
int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx) {
//TODO
return TSDB_CODE_SUCCESS;
}
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SPercentileInfo);
return true;
......@@ -2475,10 +2657,10 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t len;
char buf[512] = {0};
if (!pInfo->normalized) {
len = sprintf(buf + VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%"PRId64"}",
len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%"PRId64"}",
pInfo->bins[i].lower, pInfo->bins[i].upper, pInfo->bins[i].count);
} else {
len = sprintf(buf + VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}",
len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}",
pInfo->bins[i].lower, pInfo->bins[i].upper, pInfo->bins[i].percentage);
}
varDataSetLen(buf, len);
......
......@@ -20,11 +20,23 @@
extern "C" {
#endif
#include "indexInt.h"
#include "tcompare.h"
extern char JSON_COLUMN[];
extern char JSON_VALUE_DELIM;
char* indexPackJsonData(SIndexTerm* itm);
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip);
typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b);
_cache_range_compare indexGetCompare(RangeType ty);
#ifdef __cplusplus
}
#endif
......
......@@ -60,50 +60,6 @@ static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult
static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
RangeType type);
typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
// optime later
int32_t ret = func(a, b);
switch (comType) {
case QUERY_LESS_THAN: {
if (ret < 0) return MATCH;
} break;
case QUERY_LESS_EQUAL: {
if (ret <= 0) return MATCH;
break;
}
case QUERY_GREATER_THAN: {
if (ret > 0) return MATCH;
break;
}
case QUERY_GREATER_EQUAL: {
if (ret >= 0) return MATCH;
}
}
return CONTINUE;
}
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
}
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
}
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
}
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
}
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
tCompareGreaterThan, tCompareGreaterEqual};
static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
{cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual,
cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange},
......@@ -169,7 +125,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes
return 0;
}
_cache_range_compare cmpFn = rangeCompare[type];
_cache_range_compare cmpFn = indexGetCompare(type);
MemTable* mem = cache;
IndexCache* pCache = mem->pCache;
......@@ -295,7 +251,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
if (cache == NULL) {
return 0;
}
_cache_range_compare cmpFn = rangeCompare[type];
_cache_range_compare cmpFn = indexGetCompare(type);
MemTable* mem = cache;
IndexCache* pCache = mem->pCache;
......
......@@ -13,12 +13,58 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "indexComm.h"
#include "index.h"
#include "indexInt.h"
#include "tcompare.h"
char JSON_COLUMN[] = "JSON";
char JSON_VALUE_DELIM = '&';
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
}
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
}
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
}
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = getComparFunc(type, 0);
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
}
TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
// optime later
int32_t ret = func(a, b);
switch (comType) {
case QUERY_LESS_THAN: {
if (ret < 0) return MATCH;
} break;
case QUERY_LESS_EQUAL: {
if (ret <= 0) return MATCH;
break;
}
case QUERY_GREATER_THAN: {
if (ret > 0) return MATCH;
break;
}
case QUERY_GREATER_EQUAL: {
if (ret >= 0) return MATCH;
}
}
return CONTINUE;
}
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
tCompareGreaterThan, tCompareGreaterEqual};
_cache_range_compare indexGetCompare(RangeType ty) { return rangeCompare[ty]; }
char* indexPackJsonData(SIndexTerm* itm) {
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
......@@ -46,6 +92,7 @@ char* indexPackJsonData(SIndexTerm* itm) {
return buf;
}
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) {
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
......
......@@ -72,9 +72,23 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype);
static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan,
tfSearchLessEqual, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange};
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype);
static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
{tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual,
tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange},
{tfSearchTerm_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON,
tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}};
TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
......@@ -202,14 +216,10 @@ void tfileReaderDestroy(TFileReader* reader) {
taosMemoryFree(reader);
}
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0;
char* p = tem->colVal;
uint64_t sz = tem->nColVal;
if (hasJson) {
p = indexPackJsonData(tem);
sz = strlen(p);
}
int64_t st = taosGetTimestampUs();
FstSlice key = fstSliceCreate(p, sz);
uint64_t offset;
......@@ -224,9 +234,6 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
tem->colName, tem->colVal, cost);
}
if (hasJson) {
taosMemoryFree(p);
}
fstSliceDestroy(&key);
return 0;
}
......@@ -308,14 +315,11 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
}
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0;
char* p = tem->colVal;
int skip = 0;
_cache_range_compare cmpFn = indexGetCompare(type);
if (hasJson) {
p = indexPackJsonDataPrefix(tem, &skip);
}
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS);
......@@ -328,7 +332,16 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult
StreamWithState* st = streamBuilderIntoStream(sb);
StreamWithStateResult* rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
taosArrayPush(offsets, &(rt->out.out));
FstSlice* s = &rt->data;
char* ch = (char*)fstSliceData(s, NULL);
TExeCond cond = cmpFn(ch, p, tem->colType);
if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) {
} else if (BREAK == cond) {
swsResultDestroy(rt);
break;
}
swsResultDestroy(rt);
}
streamWithStateDestroy(st);
......@@ -376,17 +389,105 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
fstSliceDestroy(&key);
return 0;
}
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
int ret = 0;
char* p = indexPackJsonData(tem);
int sz = strlen(p);
int64_t st = taosGetTimestampUs();
FstSlice key = fstSliceCreate(p, sz);
uint64_t offset;
if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
int64_t et = taosGetTimestampUs();
int64_t cost = et - st;
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
tem->suid, tem->colName, tem->colVal, cost);
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
cost = taosGetTimestampUs() - et;
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
tem->colName, tem->colVal, cost);
}
fstSliceDestroy(&key);
return 0;
// deprecate api
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
// impl later
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
// impl later
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
// impl later
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, LT);
}
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, LE);
}
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, GT);
}
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc_JSON(reader, tem, tr, GE);
}
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
// impl later
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype) {
int ret = 0;
int skip = 0;
char* p = indexPackJsonDataPrefix(tem, &skip);
_cache_range_compare cmpFn = indexGetCompare(ctype);
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
FstSlice h = fstSliceCreate((uint8_t*)p, skip);
fstStreamBuilderSetRange(sb, &h, ctype);
fstSliceDestroy(&h);
StreamWithState* st = streamBuilderIntoStream(sb);
StreamWithStateResult* rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
FstSlice* s = &rt->data;
char* ch = (char*)fstSliceData(s, NULL);
TExeCond cond = cmpFn(ch, p, tem->colType);
if (MATCH == cond) {
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
} else if (CONTINUE == cond) {
} else if (BREAK == cond) {
swsResultDestroy(rt);
break;
}
swsResultDestroy(rt);
}
streamWithStateDestroy(st);
fstStreamBuilderDestroy(sb);
return TSDB_CODE_SUCCESS;
}
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
if (qtype >= sizeof(tfSearch) / sizeof(tfSearch[0])) {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
term->colVal);
return -1;
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
return tfSearch[1][qtype](reader, term, tr);
} else {
return tfSearch[qtype](reader, term, tr);
return tfSearch[0][qtype](reader, term, tr);
}
tfileReaderUnRef(reader);
return 0;
}
......
......@@ -1108,7 +1108,7 @@ void transSendResponse(const STransMsg* msg) {
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg;
srvMsg->type = Normal;
tTrace("server conn %p start to send resp (1/2)", exh->handle);
tDebug("server conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
uvReleaseExHandle(refId);
return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册