提交 f535b779 编写于 作者: H Haojun Liao

Merge branch '3.0' into feature/3_liaohj

...@@ -105,8 +105,6 @@ int32_t compareStrPatternNotMatch(const void *pLeft, const void *pRight); ...@@ -105,8 +105,6 @@ int32_t compareStrPatternNotMatch(const void *pLeft, const void *pRight);
int32_t compareWStrPatternMatch(const void *pLeft, const void *pRight); int32_t compareWStrPatternMatch(const void *pLeft, const void *pRight);
int32_t compareWStrPatternNotMatch(const void *pLeft, const void *pRight); int32_t compareWStrPatternNotMatch(const void *pLeft, const void *pRight);
int32_t compareJsonContainsKey(const void *pLeft, const void *pRight);
__compar_fn_t getComparFunc(int32_t type, int32_t optr); __compar_fn_t getComparFunc(int32_t type, int32_t optr);
__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order); __compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order);
int32_t doCompare(const char *a, const char *b, int32_t type, size_t size); int32_t doCompare(const char *a, const char *b, int32_t type, size_t size);
......
...@@ -35,15 +35,15 @@ extern "C" { ...@@ -35,15 +35,15 @@ extern "C" {
#endif #endif
// clang-format off // clang-format off
#define indexFatal(...) do { if (idxDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0) #define indexFatal(...) do { if (idxDebugFlag & DEBUG_FATAL) { taosPrintLog("IDX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0)
#define indexError(...) do { if (idxDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0) #define indexError(...) do { if (idxDebugFlag & DEBUG_ERROR) { taosPrintLog("IDX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0)
#define indexWarn(...) do { if (idxDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0) #define indexWarn(...) do { if (idxDebugFlag & DEBUG_WARN) { taosPrintLog("IDX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0)
#define indexInfo(...) do { if (idxDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0) #define indexInfo(...) do { if (idxDebugFlag & DEBUG_INFO) { taosPrintLog("IDX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0)
#define indexDebug(...) do { if (idxDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0) #define indexDebug(...) do { if (idxDebugFlag & DEBUG_DEBUG) { taosPrintLog("IDX ", DEBUG_DEBUG, idxDebugFlag, __VA_ARGS__);} } while (0)
#define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0) #define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("IDX", DEBUG_TRACE, idxDebugFlag, __VA_ARGS__);} } while (0)
// clang-format on // clang-format on
typedef enum { LT, LE, GT, GE } RangeType; typedef enum { LT, LE, GT, GE, CONTAINS } RangeType;
typedef enum { kTypeValue, kTypeDeletion } STermValueType; typedef enum { kTypeValue, kTypeDeletion } STermValueType;
typedef struct SIndexStat { typedef struct SIndexStat {
......
...@@ -90,7 +90,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe ...@@ -90,7 +90,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe
break; break;
} }
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
if (0 == strcmp(c->colVal, pCt->colVal)) { if (0 == strcmp(c->colVal, pCt->colVal) && strlen(pCt->colVal) == strlen(c->colVal)) {
if (c->operaType == ADD_VALUE) { if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
// taosArrayPush(result, &c->uid); // taosArrayPush(result, &c->uid);
...@@ -222,7 +222,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr ...@@ -222,7 +222,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS; return cacheSearchCompareFunc_JSON(cache, term, tr, s, CONTAINS);
} }
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -242,6 +242,9 @@ static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTR ...@@ -242,6 +242,9 @@ static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTR
static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE); return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE);
} }
static int32_t cacheSearchContain_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return cacheSearchCompareFunc_JSON(cache, term, tr, s, CONTAINS);
}
static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -97,6 +97,11 @@ static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { ...@@ -97,6 +97,11 @@ static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type); __compar_fn_t func = indexGetCompar(type);
return tCompare(func, QUERY_GREATER_EQUAL, a, b, type); return tCompare(func, QUERY_GREATER_EQUAL, a, b, type);
} }
static TExeCond tCompareContains(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type);
return tCompare(func, QUERY_TERM, a, b, type);
}
TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) { TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) {
if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY) { if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY) {
return tDoCompare(func, cmptype, a, b); return tDoCompare(func, cmptype, a, b);
...@@ -185,12 +190,14 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { ...@@ -185,12 +190,14 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
case QUERY_TERM: { case QUERY_TERM: {
if (ret == 0) return MATCH; if (ret == 0) return MATCH;
} }
default:
return BREAK;
} }
return CONTINUE; return CONTINUE;
} }
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual, static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {
tCompareGreaterThan, tCompareGreaterEqual}; tCompareLessThan, tCompareLessEqual, tCompareGreaterThan, tCompareGreaterEqual, tCompareContains};
_cache_range_compare indexGetCompare(RangeType ty) { return rangeCompare[ty]; } _cache_range_compare indexGetCompare(RangeType ty) { return rangeCompare[ty]; }
......
...@@ -64,6 +64,8 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { ...@@ -64,6 +64,8 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
*dst = QUERY_TERM; *dst = QUERY_TERM;
} else if (src == OP_TYPE_LIKE || src == OP_TYPE_MATCH || src == OP_TYPE_NMATCH) { } else if (src == OP_TYPE_LIKE || src == OP_TYPE_MATCH || src == OP_TYPE_NMATCH) {
*dst = QUERY_REGEX; *dst = QUERY_REGEX;
} else if (src == OP_TYPE_JSON_CONTAINS) {
*dst = QUERY_PREFIX;
} else { } else {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
...@@ -186,6 +188,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { ...@@ -186,6 +188,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
SIF_ERR_RET(sifGetValueFromNode(node, &param->condValue)); SIF_ERR_RET(sifGetValueFromNode(node, &param->condValue));
param->colId = -1; param->colId = -1;
param->colValType = (uint8_t)(vn->node.resType.type); param->colValType = (uint8_t)(vn->node.resType.type);
memcpy(param->colName, vn->literal, strlen(vn->literal));
break; break;
} }
case QUERY_NODE_COLUMN: { case QUERY_NODE_COLUMN: {
...@@ -237,7 +240,7 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx ...@@ -237,7 +240,7 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx
indexError("invalid operation node, left: %p, rigth: %p", node->pLeft, node->pRight); indexError("invalid operation node, left: %p, rigth: %p", node->pLeft, node->pRight);
SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
if (node->opType == OP_TYPE_JSON_GET_VALUE || node->opType == OP_TYPE_JSON_CONTAINS) { if (node->opType == OP_TYPE_JSON_GET_VALUE) {
return code; return code;
} }
SIFParam *paramList = taosMemoryCalloc(nParam, sizeof(SIFParam)); SIFParam *paramList = taosMemoryCalloc(nParam, sizeof(SIFParam));
...@@ -420,8 +423,8 @@ static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output ...@@ -420,8 +423,8 @@ static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output
return sifDoIndex(left, right, id, output); return sifDoIndex(left, right, id, output);
} }
static int32_t sifJsonContains(SIFParam *left, SIFParam *right, SIFParam *output) { static int32_t sifJsonContains(SIFParam *left, SIFParam *right, SIFParam *output) {
// return 0 int id = OP_TYPE_JSON_CONTAINS;
return 0; return sifDoIndex(left, right, id, output);
} }
static int32_t sifJsonGetValue(SIFParam *left, SIFParam *rigth, SIFParam *output) { static int32_t sifJsonGetValue(SIFParam *left, SIFParam *rigth, SIFParam *output) {
// return 0 // return 0
...@@ -501,9 +504,11 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { ...@@ -501,9 +504,11 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
int32_t code = 0; int32_t code = 0;
int32_t nParam = sifGetOperParamNum(node->opType); int32_t nParam = sifGetOperParamNum(node->opType);
if (nParam <= 1) { if (nParam <= 1) {
SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); output->status = SFLT_NOT_INDEX;
return code;
// SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
if (node->opType == OP_TYPE_JSON_GET_VALUE || node->opType == OP_TYPE_JSON_CONTAINS) { if (node->opType == OP_TYPE_JSON_GET_VALUE) {
return code; return code;
} }
SIFParam *params = NULL; SIFParam *params = NULL;
...@@ -617,11 +622,11 @@ EDealRes sifCalcWalker(SNode *node, void *context) { ...@@ -617,11 +622,11 @@ EDealRes sifCalcWalker(SNode *node, void *context) {
} }
if (QUERY_NODE_OPERATOR == nodeType(node)) { if (QUERY_NODE_OPERATOR == nodeType(node)) {
indexInfo("node type for index filter, type: %d", nodeType(node)); // indexInfo("node type for index filter, type: %d", nodeType(node));
return sifWalkOper(node, ctx); return sifWalkOper(node, ctx);
} }
indexError("invalid node type for index filter calculating, type:%d", nodeType(node)); // indexError("invalid node type for index filter calculating, type:%d", nodeType(node));
ctx->code = TSDB_CODE_QRY_INVALID_INPUT; ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
......
...@@ -425,8 +425,7 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { ...@@ -425,8 +425,7 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
// impl later return tfSearchCompareFunc_JSON(reader, tem, tr, CONTAINS);
return TSDB_CODE_SUCCESS;
} }
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
// impl later // impl later
...@@ -466,10 +465,6 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt ...@@ -466,10 +465,6 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX); AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx); FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
// FstSlice h = fstSliceCreate((uint8_t*)p, skip);
// fstStreamBuilderSetRange(sb, &h, ctype);
// fstSliceDestroy(&h);
StreamWithState* st = streamBuilderIntoStream(sb); StreamWithState* st = streamBuilderIntoStream(sb);
StreamWithStateResult* rt = NULL; StreamWithStateResult* rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
......
...@@ -64,6 +64,7 @@ static SKeyword keywordTable[] = { ...@@ -64,6 +64,7 @@ static SKeyword keywordTable[] = {
{"CONSUMER", TK_CONSUMER}, {"CONSUMER", TK_CONSUMER},
{"COUNT", TK_COUNT}, {"COUNT", TK_COUNT},
{"CREATE", TK_CREATE}, {"CREATE", TK_CREATE},
{"CONTAINS", TK_CONTAINS},
{"DATABASE", TK_DATABASE}, {"DATABASE", TK_DATABASE},
{"DATABASES", TK_DATABASES}, {"DATABASES", TK_DATABASES},
{"DAYS", TK_DAYS}, {"DAYS", TK_DAYS},
......
...@@ -847,8 +847,12 @@ static EDealRes translateJsonOperator(STranslateContext* pCxt, SOperatorNode* pO ...@@ -847,8 +847,12 @@ static EDealRes translateJsonOperator(STranslateContext* pCxt, SOperatorNode* pO
if (TSDB_DATA_TYPE_JSON != ldt.type || TSDB_DATA_TYPE_BINARY != rdt.type) { if (TSDB_DATA_TYPE_JSON != ldt.type || TSDB_DATA_TYPE_BINARY != rdt.type) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
} }
pOp->node.resType.type = TSDB_DATA_TYPE_JSON; if(pOp->opType == OP_TYPE_JSON_GET_VALUE){
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_JSON].bytes; pOp->node.resType.type = TSDB_DATA_TYPE_JSON;
}else if(pOp->opType == OP_TYPE_JSON_CONTAINS){
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
}
pOp->node.resType.bytes = tDataTypes[pOp->node.resType.type].bytes;
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
......
...@@ -168,7 +168,7 @@ __compar_fn_t gDataCompare[] = {compareInt32Val, compareInt8Val, compareInt16Val ...@@ -168,7 +168,7 @@ __compar_fn_t gDataCompare[] = {compareInt32Val, compareInt8Val, compareInt16Val
compareLenPrefixedWStr, compareUint8Val, compareUint16Val, compareUint32Val, compareUint64Val, compareLenPrefixedWStr, compareUint8Val, compareUint16Val, compareUint32Val, compareUint64Val,
setChkInBytes1, setChkInBytes2, setChkInBytes4, setChkInBytes8, compareStrRegexCompMatch, setChkInBytes1, setChkInBytes2, setChkInBytes4, setChkInBytes8, compareStrRegexCompMatch,
compareStrRegexCompNMatch, setChkNotInBytes1, setChkNotInBytes2, setChkNotInBytes4, setChkNotInBytes8, compareStrRegexCompNMatch, setChkNotInBytes1, setChkNotInBytes2, setChkNotInBytes4, setChkNotInBytes8,
compareChkNotInString, compareStrPatternNotMatch, compareWStrPatternNotMatch, compareJsonContainsKey compareChkNotInString, compareStrPatternNotMatch, compareWStrPatternNotMatch
}; };
int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) { int8_t filterGetCompFuncIdx(int32_t type, int32_t optr) {
......
...@@ -88,7 +88,7 @@ void convertNumberToNumber(const void *inData, void *outData, int8_t inType, int ...@@ -88,7 +88,7 @@ void convertNumberToNumber(const void *inData, void *outData, int8_t inType, int
} }
} }
void convertStringToDouble(const void *inData, void *outData, int8_t inType, int8_t outType){ void convertNcharToDouble(const void *inData, void *outData){
char *tmp = taosMemoryMalloc(varDataTLen(inData)); char *tmp = taosMemoryMalloc(varDataTLen(inData));
int len = taosUcs4ToMbs((TdUcs4 *)varDataVal(inData), varDataLen(inData), tmp); int len = taosUcs4ToMbs((TdUcs4 *)varDataVal(inData), varDataLen(inData), tmp);
if (len < 0) { if (len < 0) {
...@@ -97,13 +97,24 @@ void convertStringToDouble(const void *inData, void *outData, int8_t inType, int ...@@ -97,13 +97,24 @@ void convertStringToDouble(const void *inData, void *outData, int8_t inType, int
tmp[len] = 0; tmp[len] = 0;
ASSERT(outType == TSDB_DATA_TYPE_DOUBLE);
double value = taosStr2Double(tmp, NULL); double value = taosStr2Double(tmp, NULL);
*((double *)outData) = value; *((double *)outData) = value;
taosMemoryFreeClear(tmp); taosMemoryFreeClear(tmp);
} }
void convertBinaryToDouble(const void *inData, void *outData){
char *tmp = taosMemoryCalloc(1, varDataTLen(inData));
if(tmp == NULL){
*((double *)outData) = 0.;
return;
}
memcpy(tmp, varDataVal(inData), varDataLen(inData));
double ret = taosStr2Double(tmp, NULL);
taosMemoryFree(tmp);
*((double *)outData) = ret;
}
typedef int64_t (*_getBigintValue_fn_t)(void *src, int32_t index); typedef int64_t (*_getBigintValue_fn_t)(void *src, int32_t index);
int64_t getVectorBigintValue_TINYINT(void *src, int32_t index) { int64_t getVectorBigintValue_TINYINT(void *src, int32_t index) {
...@@ -147,7 +158,7 @@ int64_t getVectorBigintValue_JSON(void *src, int32_t index){ ...@@ -147,7 +158,7 @@ int64_t getVectorBigintValue_JSON(void *src, int32_t index){
if (*data == TSDB_DATA_TYPE_NULL){ if (*data == TSDB_DATA_TYPE_NULL){
return 0; return 0;
} else if(*data == TSDB_DATA_TYPE_NCHAR) { // json inner type can not be BINARY } else if(*data == TSDB_DATA_TYPE_NCHAR) { // json inner type can not be BINARY
convertStringToDouble(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE); convertNcharToDouble(data+CHAR_BYTES, &out);
} else { } else {
convertNumberToNumber(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE); convertNumberToNumber(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE);
} }
...@@ -445,14 +456,30 @@ double getVectorDoubleValue_JSON(void *src, int32_t index){ ...@@ -445,14 +456,30 @@ double getVectorDoubleValue_JSON(void *src, int32_t index){
if (*data == TSDB_DATA_TYPE_NULL){ if (*data == TSDB_DATA_TYPE_NULL){
return out; return out;
} else if(*data == TSDB_DATA_TYPE_NCHAR) { // json inner type can not be BINARY } else if(*data == TSDB_DATA_TYPE_NCHAR) { // json inner type can not be BINARY
convertStringToDouble(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE); convertNcharToDouble(data+CHAR_BYTES, &out);
} else { } else {
convertNumberToNumber(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE); convertNumberToNumber(data+CHAR_BYTES, &out, *data, TSDB_DATA_TYPE_DOUBLE);
} }
return out; return out;
} }
bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t typeRight, char **pLeftData, char **pRightData, void *pLeftOut, void *pRightOut, bool *isNull){ void* ncharTobinary(void *buf){ // todo need to remove , if tobinary is nchar
int32_t inputLen = varDataLen(buf);
void* t = taosMemoryCalloc(1, inputLen);
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(buf), varDataLen(buf), varDataVal(t));
if (len < 0) {
sclError("charset:%s to %s. val:%s convert ncharTobinary failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
(char*)varDataVal(buf));
taosMemoryFree(t);
return NULL;
}
varDataSetLen(t, len);
return t;
}
bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t typeRight, char **pLeftData, char **pRightData,
void *pLeftOut, void *pRightOut, bool *isNull, bool *freeLeft, bool *freeRight){
if(optr == OP_TYPE_JSON_CONTAINS) { if(optr == OP_TYPE_JSON_CONTAINS) {
return true; return true;
} }
...@@ -489,21 +516,41 @@ bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t ...@@ -489,21 +516,41 @@ bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t
*fp = filterGetCompFunc(type, optr); *fp = filterGetCompFunc(type, optr);
if(typeLeft == TSDB_DATA_TYPE_NCHAR) { if(IS_NUMERIC_TYPE(type) || IS_FLOAT_TYPE(type)){
convertStringToDouble(*pLeftData, pLeftOut, typeLeft, type); if(typeLeft == TSDB_DATA_TYPE_NCHAR) {
*pLeftData = pLeftOut; convertNcharToDouble(*pLeftData, pLeftOut);
} else if(typeLeft != type) { *pLeftData = pLeftOut;
convertNumberToNumber(*pLeftData, pLeftOut, typeLeft, type); } else if(typeLeft == TSDB_DATA_TYPE_BINARY) {
*pLeftData = pLeftOut; convertBinaryToDouble(*pLeftData, pLeftOut);
*pLeftData = pLeftOut;
} else if(typeLeft != type) {
convertNumberToNumber(*pLeftData, pLeftOut, typeLeft, type);
*pLeftData = pLeftOut;
}
if(typeRight == TSDB_DATA_TYPE_NCHAR) {
convertNcharToDouble(*pRightData, pRightOut);
*pRightData = pRightOut;
} else if(typeRight == TSDB_DATA_TYPE_BINARY) {
convertBinaryToDouble(*pRightData, pRightOut);
*pRightData = pRightOut;
} else if(typeRight != type) {
convertNumberToNumber(*pRightData, pRightOut, typeRight, type);
*pRightData = pRightOut;
}
}else if(type == TSDB_DATA_TYPE_BINARY){
if(typeLeft == TSDB_DATA_TYPE_NCHAR){
*pLeftData = ncharTobinary(*pLeftData);
*freeLeft = true;
}
if(typeRight == TSDB_DATA_TYPE_NCHAR){
*pRightData = ncharTobinary(*pRightData);
*freeRight = true;
}
}else{
ASSERT(0);
} }
if(typeRight == TSDB_DATA_TYPE_NCHAR) {
convertStringToDouble(*pRightData, pRightOut, typeRight, type);
*pRightData = pRightOut;
} else if(typeRight != type) {
convertNumberToNumber(*pRightData, pRightOut, typeRight, type);
*pRightData = pRightOut;
}
return true; return true;
} }
...@@ -935,44 +982,6 @@ static void doReleaseVec(SColumnInfoData* pCol, int32_t type) { ...@@ -935,44 +982,6 @@ static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
} }
} }
STagVal getJsonValue(char *json, char *key, bool *isExist) {
STagVal val = {.pKey = key};
bool find = tTagGet(((const STag *)json), &val); // json value is null and not exist is different
if(isExist){
*isExist = find;
}
return val;
}
void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
SColumnInfoData *pOutputCol = pOut->columnData;
int32_t i = ((_ord) == TSDB_ORDER_ASC)? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC)? 1 : -1;
pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows);
char *pRightData = colDataGetVarData(pRight->columnData, 0);
char *jsonKey = taosMemoryCalloc(1, varDataLen(pRightData) + 1);
memcpy(jsonKey, varDataVal(pRightData), varDataLen(pRightData));
for (; i >= 0 && i < pLeft->numOfRows; i += step) {
if (colDataIsNull_var(pLeft->columnData, i)) {
colDataSetNull_var(pOutputCol, i);
pOutputCol->hasNull = true;
continue;
}
char *pLeftData = colDataGetVarData(pLeft->columnData, i);
bool isExist = false;
STagVal value = getJsonValue(pLeftData, jsonKey, &isExist);
char *data = isExist ? tTagValToData(&value, true) : NULL;
colDataAppend(pOutputCol, i, data, data == NULL);
if(isExist && IS_VAR_DATA_TYPE(value.type) && data){
taosMemoryFree(data);
}
}
taosMemoryFree(jsonKey);
}
void vectorMathAdd(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { void vectorMathAdd(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
SColumnInfoData *pOutputCol = pOut->columnData; SColumnInfoData *pOutputCol = pOut->columnData;
...@@ -1510,6 +1519,38 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, ...@@ -1510,6 +1519,38 @@ void vectorBitOr(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
doReleaseVec(pRightCol, rightConvert); doReleaseVec(pRightCol, rightConvert);
} }
#define VEC_COM_INNER(pCol, index1, index2) \
for (; i < pCol->numOfRows && i >= 0; i += step) {\
if (IS_HELPER_NULL(pLeft->columnData, index1) || IS_HELPER_NULL(pRight->columnData, index2)) {\
bool res = false;\
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);\
continue;\
}\
char *pLeftData = colDataGetData(pLeft->columnData, index1);\
char *pRightData = colDataGetData(pRight->columnData, index2);\
int64_t leftOut = 0;\
int64_t rightOut = 0;\
bool freeLeft = false;\
bool freeRight = false;\
bool isJsonnull = false;\
bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight),\
&pLeftData, &pRightData, &leftOut, &rightOut, &isJsonnull, &freeLeft, &freeRight);\
if(isJsonnull){\
ASSERT(0);\
}\
if(!pLeftData || !pRightData){\
result = false;\
}\
if(!result){\
colDataAppendInt8(pOut->columnData, i, (int8_t*)&result);\
}else{\
bool res = filterDoCompare(fp, optr, pLeftData, pRightData);\
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);\
}\
if(freeLeft) taosMemoryFreeClear(pLeftData);\
if(freeRight) taosMemoryFreeClear(pRightData);\
}
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) { void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) {
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1; int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
...@@ -1533,79 +1574,11 @@ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam * ...@@ -1533,79 +1574,11 @@ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *
} }
if (pLeft->numOfRows == pRight->numOfRows) { if (pLeft->numOfRows == pRight->numOfRows) {
for (; i < pRight->numOfRows && i >= 0; i += step) { VEC_COM_INNER(pLeft, i, i)
if (IS_HELPER_NULL(pLeft->columnData, i) || IS_HELPER_NULL(pRight->columnData, i)) {
bool res = false;
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);
continue; // TODO set null or ignore
}
char *pLeftData = colDataGetData(pLeft->columnData, i);
char *pRightData = colDataGetData(pRight->columnData, i);
int64_t leftOut = 0;
int64_t rightOut = 0;
bool isJsonnull = false;
bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight), &pLeftData, &pRightData, &leftOut, &rightOut, &isJsonnull);
if(isJsonnull){
ASSERT(0);
}
if(!result){
colDataAppendInt8(pOut->columnData, i, (int8_t*)&result);
}else{
bool res = filterDoCompare(fp, optr, pLeftData, pRightData);
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);
}
}
} else if (pRight->numOfRows == 1) { } else if (pRight->numOfRows == 1) {
ASSERT(pLeft->pHashFilter == NULL); VEC_COM_INNER(pLeft, i, 0)
for (; i >= 0 && i < pLeft->numOfRows; i += step) {
if (IS_HELPER_NULL(pLeft->columnData, i) || IS_HELPER_NULL(pRight->columnData, 0)) {
bool res = false;
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);
continue;
}
char *pLeftData = colDataGetData(pLeft->columnData, i);
char *pRightData = colDataGetData(pRight->columnData, 0);
int64_t leftOut = 0;
int64_t rightOut = 0;
bool isJsonnull = false;
bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight), &pLeftData, &pRightData, &leftOut, &rightOut, &isJsonnull);
if(isJsonnull){
ASSERT(0);
}
if(!result){
colDataAppendInt8(pOut->columnData, i, (int8_t*)&result);
}else{
bool res = filterDoCompare(fp, optr, pLeftData, pRightData);
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);
}
}
} else if (pLeft->numOfRows == 1) { } else if (pLeft->numOfRows == 1) {
for (; i >= 0 && i < pRight->numOfRows; i += step) { VEC_COM_INNER(pRight, 0, i)
if (IS_HELPER_NULL(pRight->columnData, i) || IS_HELPER_NULL(pLeft->columnData, 0)) {
bool res = false;
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);
continue;
}
char *pLeftData = colDataGetData(pLeft->columnData, 0);
char *pRightData = colDataGetData(pLeft->columnData, i);
int64_t leftOut = 0;
int64_t rightOut = 0;
bool isJsonnull = false;
bool result = convertJsonValue(&fp, optr, GET_PARAM_TYPE(pLeft), GET_PARAM_TYPE(pRight), &pLeftData, &pRightData, &leftOut, &rightOut, &isJsonnull);
if(isJsonnull){
ASSERT(0);
}
if(!result){
colDataAppendInt8(pOut->columnData, i, (int8_t*)&result);
}else{
bool res = filterDoCompare(fp, optr, pLeftData, pRightData);
colDataAppendInt8(pOut->columnData, i, (int8_t*)&res);
}
}
} }
} }
...@@ -1683,10 +1656,6 @@ void vectorNotMatch(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOu ...@@ -1683,10 +1656,6 @@ void vectorNotMatch(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOu
vectorCompare(pLeft, pRight, pOut, _ord, OP_TYPE_NMATCH); vectorCompare(pLeft, pRight, pOut, _ord, OP_TYPE_NMATCH);
} }
void vectorJsonContains(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
vectorCompare(pLeft, pRight, pOut, _ord, OP_TYPE_JSON_CONTAINS);
}
void vectorIsNull(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { void vectorIsNull(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
for(int32_t i = 0; i < pLeft->numOfRows; ++i) { for(int32_t i = 0; i < pLeft->numOfRows; ++i) {
int8_t v = IS_HELPER_NULL(pLeft->columnData, i) ? 1 : 0; int8_t v = IS_HELPER_NULL(pLeft->columnData, i) ? 1 : 0;
...@@ -1707,6 +1676,69 @@ void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, ...@@ -1707,6 +1676,69 @@ void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
vectorConvertImpl(pLeft, pOut); vectorConvertImpl(pLeft, pOut);
} }
STagVal getJsonValue(char *json, char *key, bool *isExist) {
STagVal val = {.pKey = key};
bool find = tTagGet(((const STag *)json), &val); // json value is null and not exist is different
if(isExist){
*isExist = find;
}
return val;
}
void vectorJsonContains(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
SColumnInfoData *pOutputCol = pOut->columnData;
int32_t i = ((_ord) == TSDB_ORDER_ASC)? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC)? 1 : -1;
pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows);
char *pRightData = colDataGetVarData(pRight->columnData, 0);
char *jsonKey = taosMemoryCalloc(1, varDataLen(pRightData) + 1);
memcpy(jsonKey, varDataVal(pRightData), varDataLen(pRightData));
for (; i >= 0 && i < pLeft->numOfRows; i += step) {
bool isExist = false;
if (!colDataIsNull_var(pLeft->columnData, i)) {
char *pLeftData = colDataGetVarData(pLeft->columnData, i);
getJsonValue(pLeftData, jsonKey, &isExist);
}
colDataAppend(pOutputCol, i, (const char*)(&isExist), false);
}
taosMemoryFree(jsonKey);
}
void vectorJsonArrow(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
SColumnInfoData *pOutputCol = pOut->columnData;
int32_t i = ((_ord) == TSDB_ORDER_ASC)? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1;
int32_t step = ((_ord) == TSDB_ORDER_ASC)? 1 : -1;
pOut->numOfRows = TMAX(pLeft->numOfRows, pRight->numOfRows);
char *pRightData = colDataGetVarData(pRight->columnData, 0);
char *jsonKey = taosMemoryCalloc(1, varDataLen(pRightData) + 1);
memcpy(jsonKey, varDataVal(pRightData), varDataLen(pRightData));
for (; i >= 0 && i < pLeft->numOfRows; i += step) {
if (colDataIsNull_var(pLeft->columnData, i)) {
colDataSetNull_var(pOutputCol, i);
pOutputCol->hasNull = true;
continue;
}
char *pLeftData = colDataGetVarData(pLeft->columnData, i);
bool isExist = false;
STagVal value = getJsonValue(pLeftData, jsonKey, &isExist);
char *data = isExist ? tTagValToData(&value, true) : NULL;
colDataAppend(pOutputCol, i, data, data == NULL);
if(isExist && IS_VAR_DATA_TYPE(value.type) && data){
taosMemoryFree(data);
}
}
taosMemoryFree(jsonKey);
}
_bin_scalar_fn_t getBinScalarOperatorFn(int32_t binFunctionId) { _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binFunctionId) {
switch (binFunctionId) { switch (binFunctionId) {
case OP_TYPE_ADD: case OP_TYPE_ADD:
......
...@@ -222,11 +222,6 @@ int32_t compareLenPrefixedWStrDesc(const void *pLeft, const void *pRight) { ...@@ -222,11 +222,6 @@ int32_t compareLenPrefixedWStrDesc(const void *pLeft, const void *pRight) {
return compareLenPrefixedWStr(pRight, pLeft); return compareLenPrefixedWStr(pRight, pLeft);
} }
int32_t compareJsonContainsKey(const void* pLeft, const void* pRight) {
if(pLeft) return 0;
return 1;
}
// string > number > bool > null // string > number > bool > null
// ref: https://dev.mysql.com/doc/refman/8.0/en/json.html#json-comparison // ref: https://dev.mysql.com/doc/refman/8.0/en/json.html#json-comparison
int32_t compareJsonVal(const void *pLeft, const void *pRight) { int32_t compareJsonVal(const void *pLeft, const void *pRight) {
......
...@@ -8,6 +8,7 @@ import http.server ...@@ -8,6 +8,7 @@ import http.server
import gzip import gzip
import threading import threading
import json import json
import pickle
from util.log import * from util.log import *
from util.sql import * from util.sql import *
...@@ -15,206 +16,203 @@ from util.cases import * ...@@ -15,206 +16,203 @@ from util.cases import *
from util.dnodes import * from util.dnodes import *
telemetryPort = '6043' telemetryPort = '6043'
serverPort = '7080'
hostname = socket.gethostname()
class RequestHandlerImpl(http.server.BaseHTTPRequestHandler):
hostPort = hostname + ":" + serverPort
def telemetryInfoCheck(infoDict=''): def telemetryInfoCheck(self, infoDict=''):
if "ts" not in infoDict or len(infoDict["ts"]) == 0:
tdLog.exit("ts is null!")
hostname = socket.gethostname() if "dnode_id" not in infoDict or infoDict["dnode_id"] != 1:
serverPort = 7080 tdLog.exit("dnode_id is null!")
if "ts" not in infoDict or len(infoDict["ts"]) == 0: if "dnode_ep" not in infoDict:
tdLog.exit("ts is null!") tdLog.exit("dnode_ep is null!")
if "dnode_id" not in infoDict or infoDict["dnode_id"] != 1: if "cluster_id" not in infoDict:
tdLog.exit("dnode_id is null!") tdLog.exit("cluster_id is null!")
if "dnode_ep" not in infoDict: if "protocol" not in infoDict or infoDict["protocol"] != 1:
tdLog.exit("dnode_ep is null!") tdLog.exit("protocol is null!")
if "cluster_id" not in infoDict: if "cluster_info" not in infoDict :
tdLog.exit("cluster_id is null!") tdLog.exit("cluster_info is null!")
if "protocol" not in infoDict or infoDict["protocol"] != 1: # cluster_info ====================================
tdLog.exit("protocol is null!")
if "cluster_info" not in infoDict : if "first_ep" not in infoDict["cluster_info"] or infoDict["cluster_info"]["first_ep"] == None:
tdLog.exit("cluster_info is null!") tdLog.exit("first_ep is null!")
# cluster_info ==================================== if "first_ep_dnode_id" not in infoDict["cluster_info"] or infoDict["cluster_info"]["first_ep_dnode_id"] != 1:
tdLog.exit("first_ep_dnode_id is null!")
if "first_ep" not in infoDict["cluster_info"] or infoDict["cluster_info"]["first_ep"] == None: if "version" not in infoDict["cluster_info"] or infoDict["cluster_info"]["version"] == None:
tdLog.exit("first_ep is null!") tdLog.exit("first_ep_dnode_id is null!")
if "first_ep_dnode_id" not in infoDict["cluster_info"] or infoDict["cluster_info"]["first_ep_dnode_id"] != 1: if "master_uptime" not in infoDict["cluster_info"] or infoDict["cluster_info"]["master_uptime"] == None:
tdLog.exit("first_ep_dnode_id is null!") tdLog.exit("master_uptime is null!")
if "version" not in infoDict["cluster_info"] or infoDict["cluster_info"]["version"] == None:
tdLog.exit("first_ep_dnode_id is null!")
if "master_uptime" not in infoDict["cluster_info"] or infoDict["cluster_info"]["master_uptime"] == None:
tdLog.exit("master_uptime is null!")
if "monitor_interval" not in infoDict["cluster_info"] or infoDict["cluster_info"]["monitor_interval"] !=5: if "monitor_interval" not in infoDict["cluster_info"] or infoDict["cluster_info"]["monitor_interval"] !=5:
tdLog.exit("monitor_interval is null!") tdLog.exit("monitor_interval is null!")
if "vgroups_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_total"] < 0: if "vgroups_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_total"] < 0:
tdLog.exit("vgroups_total is null!") tdLog.exit("vgroups_total is null!")
if "vgroups_alive" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_alive"] < 0: if "vgroups_alive" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_alive"] < 0:
tdLog.exit("vgroups_alive is null!") tdLog.exit("vgroups_alive is null!")
if "connections_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["connections_total"] < 0 : if "connections_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["connections_total"] < 0 :
tdLog.exit("connections_total is null!") tdLog.exit("connections_total is null!")
if "dnodes" not in infoDict["cluster_info"] or infoDict["cluster_info"]["dnodes"] == None : if "dnodes" not in infoDict["cluster_info"] or infoDict["cluster_info"]["dnodes"] == None :
tdLog.exit("dnodes is null!") tdLog.exit("dnodes is null!")
dnodes_info = { "dnode_id": 1,"dnode_ep": f"{hostname}:{serverPort}","status":"ready"} dnodes_info = { "dnode_id": 1,"dnode_ep": self.hostPort,"status":"ready"}
for k ,v in dnodes_info.items(): for k ,v in dnodes_info.items():
if k not in infoDict["cluster_info"]["dnodes"][0] or v != infoDict["cluster_info"]["dnodes"][0][k] : if k not in infoDict["cluster_info"]["dnodes"][0] or v != infoDict["cluster_info"]["dnodes"][0][k] :
tdLog.exit("dnodes info is null!") tdLog.exit("dnodes info is null!")
mnodes_info = { "mnode_id":1, "mnode_ep":f"{hostname}:{serverPort}","role": "leader" } mnodes_info = { "mnode_id":1, "mnode_ep": self.hostPort,"role": "leader" }
for k ,v in mnodes_info.items(): for k ,v in mnodes_info.items():
if k not in infoDict["cluster_info"]["mnodes"][0] or v != infoDict["cluster_info"]["mnodes"][0][k] : if k not in infoDict["cluster_info"]["mnodes"][0] or v != infoDict["cluster_info"]["mnodes"][0][k] :
tdLog.exit("mnodes info is null!") tdLog.exit("mnodes info is null!")
# vgroup_infos ==================================== # vgroup_infos ====================================
if "vgroup_infos" not in infoDict or infoDict["vgroup_infos"]== None: if "vgroup_infos" not in infoDict or infoDict["vgroup_infos"]== None:
tdLog.exit("vgroup_infos is null!") tdLog.exit("vgroup_infos is null!")
vgroup_infos_nums = len(infoDict["vgroup_infos"])
for index in range(vgroup_infos_nums):
if "vgroup_id" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vgroup_id"]<0:
tdLog.exit("vgroup_id is null!")
if "database_name" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["database_name"]) < 0:
tdLog.exit("database_name is null!")
if "tables_num" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["tables_num"]!= 0:
tdLog.exit("tables_num is null!")
if "status" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["status"]) < 0 :
tdLog.exit("status is null!")
if "vnodes" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vnodes"] ==None :
tdLog.exit("vnodes is null!")
if "dnode_id" not in infoDict["vgroup_infos"][index]["vnodes"][0] or infoDict["vgroup_infos"][index]["vnodes"][0]["dnode_id"] < 0 :
tdLog.exit("vnodes is null!")
# grant_info ====================================
if "grant_info" not in infoDict or infoDict["grant_info"]== None:
tdLog.exit("grant_info is null!")
if "expire_time" not in infoDict["grant_info"] or not infoDict["grant_info"]["expire_time"] > 0:
tdLog.exit("expire_time is null!")
if "timeseries_used" not in infoDict["grant_info"] or not infoDict["grant_info"]["timeseries_used"] > 0:
tdLog.exit("timeseries_used is null!")
if "timeseries_total" not in infoDict["grant_info"] or not infoDict["grant_info"]["timeseries_total"] > 0:
tdLog.exit("timeseries_total is null!")
# dnode_info ==================================== vgroup_infos_nums = len(infoDict["vgroup_infos"])
if "dnode_info" not in infoDict or infoDict["dnode_info"]== None: for index in range(vgroup_infos_nums):
tdLog.exit("dnode_info is null!") if "vgroup_id" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vgroup_id"]<0:
tdLog.exit("vgroup_id is null!")
if "database_name" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["database_name"]) < 0:
tdLog.exit("database_name is null!")
if "tables_num" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["tables_num"]!= 0:
tdLog.exit("tables_num is null!")
if "status" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["status"]) < 0 :
tdLog.exit("status is null!")
if "vnodes" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vnodes"] ==None :
tdLog.exit("vnodes is null!")
if "dnode_id" not in infoDict["vgroup_infos"][index]["vnodes"][0] or infoDict["vgroup_infos"][index]["vnodes"][0]["dnode_id"] < 0 :
tdLog.exit("vnodes is null!")
# grant_info ====================================
if "grant_info" not in infoDict or infoDict["grant_info"]== None:
tdLog.exit("grant_info is null!")
if "expire_time" not in infoDict["grant_info"] or not infoDict["grant_info"]["expire_time"] > 0:
tdLog.exit("expire_time is null!")
if "timeseries_used" not in infoDict["grant_info"] or not infoDict["grant_info"]["timeseries_used"] > 0:
tdLog.exit("timeseries_used is null!")
if "timeseries_total" not in infoDict["grant_info"] or not infoDict["grant_info"]["timeseries_total"] > 0:
tdLog.exit("timeseries_total is null!")
# dnode_info ====================================
dnode_infos = ['uptime', 'cpu_engine', 'cpu_system', 'cpu_cores', 'mem_engine', 'mem_system', 'mem_total', 'disk_engine', if "dnode_info" not in infoDict or infoDict["dnode_info"]== None:
'disk_used', 'disk_total', 'net_in', 'net_out', 'io_read', 'io_write', 'io_read_disk', 'io_write_disk', 'req_select', tdLog.exit("dnode_info is null!")
'req_select_rate', 'req_insert', 'req_insert_success', 'req_insert_rate', 'req_insert_batch', 'req_insert_batch_success',
'req_insert_batch_rate', 'errors', 'vnodes_num', 'masters', 'has_mnode', 'has_qnode', 'has_snode', 'has_bnode']
for elem in dnode_infos:
if elem not in infoDict["dnode_info"] or infoDict["dnode_info"][elem] < 0:
tdLog.exit(f"{elem} is null!")
# dnode_info ==================================== dnode_infos = ['uptime', 'cpu_engine', 'cpu_system', 'cpu_cores', 'mem_engine', 'mem_system', 'mem_total', 'disk_engine',
'disk_used', 'disk_total', 'net_in', 'net_out', 'io_read', 'io_write', 'io_read_disk', 'io_write_disk', 'req_select',
'req_select_rate', 'req_insert', 'req_insert_success', 'req_insert_rate', 'req_insert_batch', 'req_insert_batch_success',
'req_insert_batch_rate', 'errors', 'vnodes_num', 'masters', 'has_mnode', 'has_qnode', 'has_snode', 'has_bnode']
for elem in dnode_infos:
if elem not in infoDict["dnode_info"] or infoDict["dnode_info"][elem] < 0:
tdLog.exit(f"{elem} is null!")
if "disk_infos" not in infoDict or infoDict["disk_infos"]== None: # dnode_info ====================================
tdLog.exit("disk_infos is null!")
# bug for data_dir
if "datadir" not in infoDict["disk_infos"] or len(infoDict["disk_infos"]["datadir"]) <=0 :
tdLog.exit("datadir is null!")
if "name" not in infoDict["disk_infos"]["datadir"][0] or len(infoDict["disk_infos"]["datadir"][0]["name"]) <= 0: if "disk_infos" not in infoDict or infoDict["disk_infos"]== None:
tdLog.exit("name is null!") tdLog.exit("disk_infos is null!")
# bug for data_dir
if "datadir" not in infoDict["disk_infos"] or len(infoDict["disk_infos"]["datadir"]) <=0 :
tdLog.exit("datadir is null!")
if "level" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["level"] < 0: if "name" not in infoDict["disk_infos"]["datadir"][0] or len(infoDict["disk_infos"]["datadir"][0]["name"]) <= 0:
tdLog.exit("level is null!") tdLog.exit("name is null!")
if "avail" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["avail"] <= 0: if "level" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["level"] < 0:
tdLog.exit("avail is null!") tdLog.exit("level is null!")
if "used" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["used"] <= 0: if "avail" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["avail"] <= 0:
tdLog.exit("used is null!") tdLog.exit("avail is null!")
if "total" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["total"] <= 0: if "used" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["used"] <= 0:
tdLog.exit("total is null!") tdLog.exit("used is null!")
if "total" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["total"] <= 0:
tdLog.exit("total is null!")
if "logdir" not in infoDict["disk_infos"] or infoDict["disk_infos"]["logdir"]== None:
tdLog.exit("logdir is null!")
if "name" not in infoDict["disk_infos"]["logdir"] or len(infoDict["disk_infos"]["logdir"]["name"]) <= 0: if "logdir" not in infoDict["disk_infos"] or infoDict["disk_infos"]["logdir"]== None:
tdLog.exit("name is null!") tdLog.exit("logdir is null!")
if "avail" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["avail"] <= 0: if "name" not in infoDict["disk_infos"]["logdir"] or len(infoDict["disk_infos"]["logdir"]["name"]) <= 0:
tdLog.exit("avail is null!") tdLog.exit("name is null!")
if "used" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["used"] <= 0: if "avail" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["avail"] <= 0:
tdLog.exit("used is null!") tdLog.exit("avail is null!")
if "total" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["total"] <= 0: if "used" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["used"] <= 0:
tdLog.exit("total is null!") tdLog.exit("used is null!")
if "total" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["total"] <= 0:
tdLog.exit("total is null!")
if "tempdir" not in infoDict["disk_infos"] or infoDict["disk_infos"]["tempdir"]== None: if "tempdir" not in infoDict["disk_infos"] or infoDict["disk_infos"]["tempdir"]== None:
tdLog.exit("tempdir is null!") tdLog.exit("tempdir is null!")
if "name" not in infoDict["disk_infos"]["tempdir"] or len(infoDict["disk_infos"]["tempdir"]["name"]) <= 0: if "name" not in infoDict["disk_infos"]["tempdir"] or len(infoDict["disk_infos"]["tempdir"]["name"]) <= 0:
tdLog.exit("name is null!") tdLog.exit("name is null!")
if "avail" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["avail"] <= 0: if "avail" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["avail"] <= 0:
tdLog.exit("avail is null!") tdLog.exit("avail is null!")
if "used" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["used"] <= 0: if "used" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["used"] <= 0:
tdLog.exit("used is null!") tdLog.exit("used is null!")
if "total" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["total"] <= 0: if "total" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["total"] <= 0:
tdLog.exit("total is null!") tdLog.exit("total is null!")
# log_infos ==================================== # log_infos ====================================
if "log_infos" not in infoDict or infoDict["log_infos"]== None: if "log_infos" not in infoDict or infoDict["log_infos"]== None:
tdLog.exit("log_infos is null!") tdLog.exit("log_infos is null!")
if "logs" not in infoDict["log_infos"] or len(infoDict["log_infos"]["logs"])!= 10: if "logs" not in infoDict["log_infos"] or len(infoDict["log_infos"]["logs"])!= 10:
tdLog.exit("logs is null!") tdLog.exit("logs is null!")
if "ts" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 10: if "ts" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 10:
tdLog.exit("ts is null!") tdLog.exit("ts is null!")
if "level" not in infoDict["log_infos"]["logs"][0] or infoDict["log_infos"]["logs"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]: if "level" not in infoDict["log_infos"]["logs"][0] or infoDict["log_infos"]["logs"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]:
tdLog.exit("level is null!") tdLog.exit("level is null!")
if "content" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 1: if "content" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 1:
tdLog.exit("content is null!") tdLog.exit("content is null!")
if "summary" not in infoDict["log_infos"] or len(infoDict["log_infos"]["summary"])!= 4: if "summary" not in infoDict["log_infos"] or len(infoDict["log_infos"]["summary"])!= 4:
tdLog.exit("summary is null!") tdLog.exit("summary is null!")
if "total" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["total"] < 0 : if "total" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["total"] < 0 :
tdLog.exit("total is null!") tdLog.exit("total is null!")
if "level" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]: if "level" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]:
tdLog.exit("level is null!") tdLog.exit("level is null!")
class RequestHandlerImpl(http.server.BaseHTTPRequestHandler):
def do_GET(self): def do_GET(self):
""" """
process GET request process GET request
...@@ -245,17 +243,23 @@ class RequestHandlerImpl(http.server.BaseHTTPRequestHandler): ...@@ -245,17 +243,23 @@ class RequestHandlerImpl(http.server.BaseHTTPRequestHandler):
infoDict = json.loads(plainText) infoDict = json.loads(plainText)
#print("================") #print("================")
# print(infoDict) # print(infoDict)
telemetryInfoCheck(infoDict) self.telemetryInfoCheck(infoDict)
# 4. shutdown the server and exit case # 4. shutdown the server and exit case
assassin = threading.Thread(target=httpServer.shutdown) assassin = threading.Thread(target=self.server.shutdown)
assassin.daemon = True assassin.daemon = True
assassin.start() assassin.start()
print ("==== shutdown http server ====") print ("==== shutdown http server ====")
class TDTestCase: class TDTestCase:
hostname = socket.gethostname() global hostname
serverPort = '7080' global serverPort
if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
try:
config = eval(tdDnodes.dnodes[0].remoteIP )
hostname = config["host"]
except Exception:
hostname = tdDnodes.dnodes[0].remoteIP
rpcDebugFlagVal = '143' rpcDebugFlagVal = '143'
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
clientCfgDict["serverPort"] = serverPort clientCfgDict["serverPort"] = serverPort
...@@ -291,21 +295,19 @@ class TDTestCase: ...@@ -291,21 +295,19 @@ class TDTestCase:
sql = "create database db3 vgroups " + vgroups sql = "create database db3 vgroups " + vgroups
tdSql.query(sql) tdSql.query(sql)
# loop to wait request # create http server: bing ip/port , and request processor
httpServer.serve_forever() if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
RequestHandlerImplStr = base64.b64encode(pickle.dumps(RequestHandlerImpl)).decode()
cmdStr = "import pickle\nimport http\nRequestHandlerImpl=pickle.loads(base64.b64decode(\"%s\".encode()))\nclass NewRequestHandlerImpl(RequestHandlerImpl):\n hostPort = \'%s\'\nhttp.server.HTTPServer((\"\", %s), NewRequestHandlerImpl).serve_forever()"%(RequestHandlerImplStr,hostname+":"+serverPort,telemetryPort)
tdDnodes.dnodes[0].remoteExec({}, cmdStr)
else:
serverAddress = ("", int(telemetryPort))
http.server.HTTPServer(serverAddress, RequestHandlerImpl).serve_forever()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")
# create http server: bing ip/port , and request processor
serverAddress = ("", int(telemetryPort))
httpServer = http.server.HTTPServer(serverAddress, RequestHandlerImpl)
tdCases.addLinux(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())
...@@ -118,7 +118,7 @@ class TDTestCase: ...@@ -118,7 +118,7 @@ class TDTestCase:
tdSql.error("select jtag->location from jsons1") tdSql.error("select jtag->location from jsons1")
tdSql.error("select jtag contains location from jsons1") tdSql.error("select jtag contains location from jsons1")
tdSql.error("select * from jsons1 where jtag contains location") tdSql.error("select * from jsons1 where jtag contains location")
tdSql.error("select * from jsons1 where jtag contains''") #tdSql.error("select * from jsons1 where jtag contains''")
tdSql.error("select * from jsons1 where jtag contains 'location'='beijing'") tdSql.error("select * from jsons1 where jtag contains 'location'='beijing'")
# #
# # test function error # # test function error
...@@ -129,6 +129,41 @@ class TDTestCase: ...@@ -129,6 +129,41 @@ class TDTestCase:
tdSql.error("select ceil(jtag->'tag1') from jsons1") tdSql.error("select ceil(jtag->'tag1') from jsons1")
tdSql.error("select ceil(jtag) from jsons1") tdSql.error("select ceil(jtag) from jsons1")
# #
#test scalar operation
tdSql.query("select jtag contains 'tag1',jtag->'tag1' from jsons1 order by jtag->'tag1'")
tdSql.checkRows(13)
tdSql.checkData(0, 0, False)
tdSql.checkData(5, 0, True)
tdSql.checkData(12, 0, True)
tdSql.query("select jtag->'tag1' like 'fe%',jtag->'tag1' from jsons1 order by jtag->'tag1'")
tdSql.checkRows(13)
tdSql.checkData(10, 0, False)
tdSql.checkData(11, 0, False)
tdSql.checkData(12, 0, True)
tdSql.query("select jtag->'tag1' not like 'fe%',jtag->'tag1' from jsons1 order by jtag->'tag1'")
tdSql.checkRows(13)
tdSql.checkData(10, 0, False)
tdSql.checkData(11, 0, True)
tdSql.checkData(12, 0, False)
tdSql.query("select jtag->'tag1' match 'fe',jtag->'tag1' from jsons1 order by jtag->'tag1'")
tdSql.checkRows(13)
tdSql.checkData(10, 0, False)
tdSql.checkData(11, 0, False)
tdSql.checkData(12, 0, True)
tdSql.query("select jtag->'tag1' nmatch 'fe',jtag->'tag1' from jsons1 order by jtag->'tag1'")
tdSql.checkRows(13)
tdSql.checkData(10, 0, False)
tdSql.checkData(11, 0, True)
tdSql.checkData(12, 0, False)
tdSql.query("select jtag->'tag1',jtag->'tag1'>='a' from jsons1 order by jtag->'tag1'")
tdSql.checkRows(13)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, False)
tdSql.checkData(7, 0, "false")
tdSql.checkData(7, 1, True)
tdSql.checkData(12, 1, True)
# # test select normal column # # test select normal column
tdSql.query("select dataint from jsons1 order by dataint") tdSql.query("select dataint from jsons1 order by dataint")
tdSql.checkRows(9) tdSql.checkRows(9)
......
@REM python3 .\test.py -f 0-others\taosShell.py python3 .\test.py -f 0-others\taosShell.py
python3 .\test.py -f 0-others\taosShellError.py python3 .\test.py -f 0-others\taosShellError.py
python3 .\test.py -f 0-others\taosShellNetChk.py python3 .\test.py -f 0-others\taosShellNetChk.py
python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\telemetry.py
@REM python3 .\test.py -f 0-others\taosdMonitor.py python3 .\test.py -f 0-others\taosdMonitor.py
python3 .\test.py -f 0-others\udfTest.py python3 .\test.py -f 0-others\udfTest.py
python3 .\test.py -f 0-others\udf_create.py python3 .\test.py -f 0-others\udf_create.py
python3 .\test.py -f 0-others\udf_restart_taosd.py python3 .\test.py -f 0-others\udf_restart_taosd.py
......
...@@ -114,6 +114,7 @@ if __name__ == "__main__": ...@@ -114,6 +114,7 @@ if __name__ == "__main__":
if not execCmd == "": if not execCmd == "":
tdDnodes.init(deployPath) tdDnodes.init(deployPath)
print(execCmd)
exec(execCmd) exec(execCmd)
quit() quit()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册