提交 45a4017c 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

...@@ -52,6 +52,8 @@ typedef enum EStreamType { ...@@ -52,6 +52,8 @@ typedef enum EStreamType {
typedef struct { typedef struct {
SArray* pTableList; SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid SHashObj* map; // speedup acquire the tableQueryInfo by table uid
void* pTagCond;
void* pTagIndexCond;
} STableListInfo; } STableListInfo;
typedef struct SColumnDataAgg { typedef struct SColumnDataAgg {
......
...@@ -71,7 +71,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); ...@@ -71,7 +71,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
#define colDataGetData(p1_, r_) \ #define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_)) ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_))
#define IS_JSON_NULL(type, data) ((type) == TSDB_DATA_TYPE_JSON && *(data) == TSDB_DATA_TYPE_NULL) #define IS_JSON_NULL(type, data) ((type) == TSDB_DATA_TYPE_JSON && \
(*(data) == TSDB_DATA_TYPE_NULL || tTagIsJsonNull(data)))
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) { static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
if (!pColumnInfoData->hasNull) { if (!pColumnInfoData->hasNull) {
......
...@@ -70,6 +70,8 @@ int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow); ...@@ -70,6 +70,8 @@ int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow);
// STag // STag
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag); int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
void tTagFree(STag *pTag); void tTagFree(STag *pTag);
bool tTagIsJson(const void *pTag);
bool tTagIsJsonNull(void *tagVal);
bool tTagGet(const STag *pTag, STagVal *pTagVal); bool tTagGet(const STag *pTag, STagVal *pTagVal);
char *tTagValToData(const STagVal *pTagVal, bool isJson); char *tTagValToData(const STagVal *pTagVal, bool isJson);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
......
...@@ -137,6 +137,8 @@ extern bool tsSmlDataFormat; ...@@ -137,6 +137,8 @@ extern bool tsSmlDataFormat;
// internal // internal
extern int32_t tsTransPullupInterval; extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval; extern int32_t tsMqRebalanceInterval;
extern int32_t tsTtlUnit;
extern int32_t tsTtlPushInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......
...@@ -145,6 +145,7 @@ enum { ...@@ -145,6 +145,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)
......
...@@ -66,6 +66,7 @@ typedef struct SScanLogicNode { ...@@ -66,6 +66,7 @@ typedef struct SScanLogicNode {
int8_t intervalUnit; int8_t intervalUnit;
int8_t slidingUnit; int8_t slidingUnit;
SNode* pTagCond; SNode* pTagCond;
SNode* pTagIndexCond;
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
int16_t tsColId; int16_t tsColId;
...@@ -468,6 +469,7 @@ typedef struct SSubplan { ...@@ -468,6 +469,7 @@ typedef struct SSubplan {
SPhysiNode* pNode; // physical plan of current subplan SPhysiNode* pNode; // physical plan of current subplan
SDataSinkNode* pDataSink; // data of the subplan flow into the datasink SDataSinkNode* pDataSink; // data of the subplan flow into the datasink
SNode* pTagCond; SNode* pTagCond;
SNode* pTagIndexCond;
} SSubplan; } SSubplan;
typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode; typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode;
......
...@@ -288,11 +288,11 @@ typedef enum ESqlClause { ...@@ -288,11 +288,11 @@ typedef enum ESqlClause {
} ESqlClause; } ESqlClause;
typedef struct SDeleteStmt { typedef struct SDeleteStmt {
ENodeType type; // QUERY_NODE_DELETE_STMT ENodeType type; // QUERY_NODE_DELETE_STMT
SNode* pFromTable; // FROM clause SNode* pFromTable; // FROM clause
SNode* pWhere; // WHERE clause SNode* pWhere; // WHERE clause
SNode* pCountFunc; // count the number of rows affected SNode* pCountFunc; // count the number of rows affected
SNode* pTagIndexCond; // pWhere divided into pTagIndexCond and timeRange SNode* pTagCond; // pWhere divided into pTagCond and timeRange
STimeWindow timeRange; STimeWindow timeRange;
uint8_t precision; uint8_t precision;
bool deleteZeroRows; bool deleteZeroRows;
...@@ -397,7 +397,8 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal); ...@@ -397,7 +397,8 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal);
char* nodesGetFillModeString(EFillMode mode); char* nodesGetFillModeString(EFillMode mode);
int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc); int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc);
int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagCond, SNode** pOtherCond); int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond,
SNode** pOtherCond);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -104,8 +104,6 @@ extern "C" { ...@@ -104,8 +104,6 @@ extern "C" {
#include "osTimezone.h" #include "osTimezone.h"
#include "osEnv.h" #include "osEnv.h"
void osDefaultInit();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -1507,7 +1507,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i ...@@ -1507,7 +1507,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
char* jsonInnerData = data + CHAR_BYTES; char* jsonInnerData = data + CHAR_BYTES;
if (jsonInnerType == TSDB_DATA_TYPE_NULL) { if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
len += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L)); len += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
} else if (jsonInnerType & TD_TAG_JSON) { } else if (tTagIsJson(data)) {
len += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len); len += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value" } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
len += varDataTLen(jsonInnerData) + CHAR_BYTES * 2; len += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
...@@ -1592,7 +1592,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int ...@@ -1592,7 +1592,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
if (jsonInnerType == TSDB_DATA_TYPE_NULL) { if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L); sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType & TD_TAG_JSON) { } else if (tTagIsJson(data)) {
char* jsonString = parseTagDatatoJson(data); char* jsonString = parseTagDatatoJson(data);
STR_TO_VARSTR(dst, jsonString); STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString); taosMemoryFree(jsonString);
......
...@@ -110,7 +110,7 @@ int32_t getJsonValueLen(const char* data) { ...@@ -110,7 +110,7 @@ int32_t getJsonValueLen(const char* data) {
dataLen = DOUBLE_BYTES + CHAR_BYTES; dataLen = DOUBLE_BYTES + CHAR_BYTES;
} else if (*data == TSDB_DATA_TYPE_BOOL) { } else if (*data == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES + CHAR_BYTES; dataLen = CHAR_BYTES + CHAR_BYTES;
} else if (*data & TD_TAG_JSON) { // json string } else if (tTagIsJson(data)) { // json string
dataLen = ((STag*)(data))->len; dataLen = ((STag*)(data))->len;
} else { } else {
ASSERT(0); ASSERT(0);
......
...@@ -924,6 +924,18 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) { ...@@ -924,6 +924,18 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
return n; return n;
} }
bool tTagIsJson(const void *pTag){
return (((const STag *)pTag)->flags & TD_TAG_JSON);
}
bool tTagIsJsonNull(void *data){
STag *pTag = (STag*)data;
int8_t isJson = tTagIsJson(pTag);
if(!isJson) return false;
return ((STag*)data)->nTag == 0;
}
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) { int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
int32_t code = 0; int32_t code = 0;
uint8_t *p = NULL; uint8_t *p = NULL;
......
...@@ -187,6 +187,9 @@ bool tsStartUdfd = true; ...@@ -187,6 +187,9 @@ bool tsStartUdfd = true;
// internal // internal
int32_t tsTransPullupInterval = 2; int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 60;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN); tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
...@@ -467,6 +470,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -467,6 +470,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400*365, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
return 0; return 0;
...@@ -619,6 +624,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -619,6 +624,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32;
tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32;
tsTtlPushInterval = cfgGetItem(pCfg, "ttlPushInterval")->i32;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
...@@ -631,7 +638,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -631,7 +638,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,
const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) { const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc) {
osDefaultInit(); if (tsCfg == NULL) osDefaultInit();
SConfig *pCfg = cfgInit(); SConfig *pCfg = cfgInit();
if (pCfg == NULL) return -1; if (pCfg == NULL) return -1;
......
...@@ -65,6 +65,13 @@ static void mndPullupTrans(SMnode *pMnode) { ...@@ -65,6 +65,13 @@ static void mndPullupTrans(SMnode *pMnode) {
} }
} }
static void mndTtlTimer(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
static void mndCalMqRebalance(SMnode *pMnode) { static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
...@@ -83,41 +90,6 @@ static void mndPullupTelem(SMnode *pMnode) { ...@@ -83,41 +90,6 @@ static void mndPullupTelem(SMnode *pMnode) {
} }
} }
static void mndPushTtlTime(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t);
SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
continue;
}
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
int32_t t = taosGetTimestampSec();
*(int32_t *)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
mError("failed to send ttl time seed msg, code:0x%x", code);
} else {
mInfo("send ttl time seed msg, time:%d", t);
}
sdbRelease(pSdb, pVgroup);
}
}
static void *mndThreadFp(void *param) { static void *mndThreadFp(void *param) {
SMnode *pMnode = param; SMnode *pMnode = param;
int64_t lastTime = 0; int64_t lastTime = 0;
...@@ -125,14 +97,13 @@ static void *mndThreadFp(void *param) { ...@@ -125,14 +97,13 @@ static void *mndThreadFp(void *param) {
while (1) { while (1) {
lastTime++; lastTime++;
if (lastTime % (864000) == 0) { // sleep 1 day for ttl
mndPushTtlTime(pMnode);
}
taosMsleep(100); taosMsleep(100);
if (mndGetStop(pMnode)) break; if (mndGetStop(pMnode)) break;
if (lastTime % (tsTransPullupInterval * 10) == 1) {
mndTtlTimer(pMnode);
}
if (lastTime % (tsTransPullupInterval * 10) == 0) { if (lastTime % (tsTransPullupInterval * 10) == 0) {
mndPullupTrans(pMnode); mndPullupTrans(pMnode);
} }
...@@ -558,12 +529,12 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { ...@@ -558,12 +529,12 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0; if (!IsReq(pMsg)) return 0;
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER) { pMsg->msgType == TDMT_MND_TRANS_TIMER || TDMT_MND_TTL_TIMER) {
return -1; return -1;
} }
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
mGError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
SEpSet epSet = {0}; SEpSet epSet = {0};
mndGetMnodeEpSet(pMsg->info.node, &epSet); mndGetMnodeEpSet(pMsg->info.node, &epSet);
......
...@@ -37,6 +37,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); ...@@ -37,6 +37,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
static int32_t mndProcessTtlTimer(SRpcMsg *pReq);
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq); static int32_t mndProcessCreateStbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterStbReq(SRpcMsg *pReq); static int32_t mndProcessAlterStbReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReq(SRpcMsg *pReq); static int32_t mndProcessDropStbReq(SRpcMsg *pReq);
...@@ -63,6 +64,7 @@ int32_t mndInitStb(SMnode *pMnode) { ...@@ -63,6 +64,7 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
mndSetMsgHandle(pMnode, TDMT_MND_TTL_TIMER, mndProcessTtlTimer);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
...@@ -799,6 +801,43 @@ int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p ...@@ -799,6 +801,43 @@ int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
return 0; return 0;
} }
static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t);
SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) {
sdbCancelFetch(pSdb, pVgroup);
sdbRelease(pSdb, pVgroup);
continue;
}
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
int32_t t = taosGetTimestampSec();
*(int32_t *)((char *)pHead + sizeof(SMsgHead)) = htonl(t);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if (code != 0) {
mError("failed to send ttl time seed, code:0x%x", code);
} else {
mDebug("send ttl time seed success, time:%d", t);
}
sdbRelease(pSdb, pVgroup);
}
return 0;
}
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1; int32_t code = -1;
......
...@@ -56,6 +56,7 @@ static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->dep ...@@ -56,6 +56,7 @@ static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->dep
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
static int32_t mndProcessTransReq(SRpcMsg *pReq); static int32_t mndProcessTransReq(SRpcMsg *pReq);
static int32_t mndProcessTtl(SRpcMsg *pReq);
static int32_t mndProcessKillTransReq(SRpcMsg *pReq); static int32_t mndProcessKillTransReq(SRpcMsg *pReq);
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
......
...@@ -613,9 +613,6 @@ const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) { ...@@ -613,9 +613,6 @@ const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
ASSERT(pEntry->type == TSDB_CHILD_TABLE); ASSERT(pEntry->type == TSDB_CHILD_TABLE);
STag *tag = (STag *)pEntry->ctbEntry.pTags; STag *tag = (STag *)pEntry->ctbEntry.pTags;
if (type == TSDB_DATA_TYPE_JSON) { if (type == TSDB_DATA_TYPE_JSON) {
if (tag->nTag == 0) {
return NULL;
}
return tag; return tag;
} }
bool find = tTagGet(tag, val); bool find = tTagGet(tag, val);
......
...@@ -400,8 +400,7 @@ static void metaBuildTtlIdxKey(STtlIdxKey *ttlKey, const SMetaEntry *pME){ ...@@ -400,8 +400,7 @@ static void metaBuildTtlIdxKey(STtlIdxKey *ttlKey, const SMetaEntry *pME){
if (ttlDays <= 0) return; if (ttlDays <= 0) return;
ttlKey->dtime = ctime / 1000 + ttlDays * 24 * 60 * 60; ttlKey->dtime = ctime / 1000 + ttlDays * tsTtlUnit;
// ttlKey->dtime = ctime / 1000 + ttlDays;
ttlKey->uid = pME->uid; ttlKey->uid = pME->uid;
} }
......
...@@ -315,7 +315,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p ...@@ -315,7 +315,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY; if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
int32_t t = ntohl(*(int32_t *)pReq); int32_t t = ntohl(*(int32_t *)pReq);
vError("rec ttl time:%d", t); vDebug("vgId:%d, recv ttl msg, time:%d", pVnode->config.vgId, t);
int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids); int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
if (ret != 0) { if (ret != 0) {
goto end; goto end;
......
...@@ -106,7 +106,8 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); ...@@ -106,7 +106,8 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo, SNode* pTagCond); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
SArray* createSortInfo(SNodeList* pNodeList); SArray* createSortInfo(SNodeList* pNodeList);
SArray* extractPartitionColInfo(SNodeList* pNodeList); SArray* extractPartitionColInfo(SNodeList* pNodeList);
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type); SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type);
......
...@@ -282,7 +282,6 @@ typedef struct STagScanInfo { ...@@ -282,7 +282,6 @@ typedef struct STagScanInfo {
int32_t curPos; int32_t curPos;
SReadHandle readHandle; SReadHandle readHandle;
STableListInfo *pTableList; STableListInfo *pTableList;
SNode* pFilterNode; // filter info,
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
...@@ -839,8 +838,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi ...@@ -839,8 +838,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
SNode* pTagCond);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo,
......
...@@ -214,28 +214,111 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { ...@@ -214,28 +214,111 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
return pBlock; return pBlock;
} }
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo, SNode* pTagCond) { EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
SMetaReader* mr = (SMetaReader*)pContext;
if(nodeType(*pNode) == QUERY_NODE_COLUMN){
SColumnNode* pSColumnNode = *(SColumnNode**)pNode;
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
return DEAL_RES_ERROR;
}
res->translate = true;
res->node.resType = pSColumnNode->node.resType;
STagVal tagVal = {0};
tagVal.cid = pSColumnNode->colId;
const char* p = metaGetTableTagVal(&mr->me, pSColumnNode->node.resType.type, &tagVal);
if (p == NULL) {
res->node.resType.type = TSDB_DATA_TYPE_NULL;
}else if (pSColumnNode->node.resType.type == TSDB_DATA_TYPE_JSON) {
int32_t len = ((const STag*)p) -> len;
res->datum.p = taosMemoryCalloc(len + 1, 1);
memcpy(res->datum.p, p, len);
} else if (IS_VAR_DATA_TYPE(pSColumnNode->node.resType.type)) {
res->datum.p = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
memcpy(varDataVal(res->datum.p), tagVal.pData, tagVal.nData);
varDataSetLen(res->datum.p, tagVal.nData);
} else {
nodesSetValueNodeValue(res, &(tagVal.i64));
}
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
}else if (nodeType(*pNode) == QUERY_NODE_FUNCTION){
SFunctionNode * pFuncNode = *(SFunctionNode**)pNode;
if(pFuncNode->funcType == FUNCTION_TYPE_TBNAME){
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
return DEAL_RES_ERROR;
}
res->translate = true;
res->node.resType = pFuncNode->node.resType;
int32_t len = strlen(mr->me.name);
res->datum.p = taosMemoryCalloc(len + VARSTR_HEADER_SIZE + 1, 1);
memcpy(varDataVal(res->datum.p), mr->me.name, len);
varDataSetLen(res->datum.p, len);
nodesDestroyNode(*pNode);
*pNode = (SNode*)res;
}
}
return DEAL_RES_CONTINUE;
}
static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){
SMetaReader mr = {0};
metaReaderInit(&mr, metaHandle, 0);
metaGetTableEntryByUid(&mr, info->uid);
SNode *pTagCondTmp = nodesCloneNode(pTagCond);
nodesRewriteExprPostOrder(&pTagCondTmp, doTranslateTagExpr, &mr);
metaReaderClear(&mr);
SNode* pNew = NULL;
int32_t code = scalarCalculateConstants(pTagCondTmp, &pNew);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pTagCondTmp);
return false;
}
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
SValueNode *pValue = (SValueNode *)pNew;
ASSERT(pValue->node.resType.type == TSDB_DATA_TYPE_BOOL);
bool result = pValue->datum.b;
nodesDestroyNode(pNew);
return result;
}
int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
uint64_t tableUid = pScanNode->uid; uint64_t tableUid = pScanNode->uid;
SNode* pTagCond = (SNode*)pListInfo->pTagCond;
SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond;
if (pScanNode->tableType == TSDB_SUPER_TABLE) { if (pScanNode->tableType == TSDB_SUPER_TABLE) {
if (pTagCond) { if (pTagIndexCond) {
SIndexMetaArg metaArg = { SIndexMetaArg metaArg = {
.metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid}; .metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid};
SArray* res = taosArrayInit(8, sizeof(uint64_t)); SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, &metaArg, res); //code = doFilterTag(pTagIndexCond, &metaArg, res);
if (code == TSDB_CODE_INDEX_REBUILDING) { // todo code = TSDB_CODE_INDEX_REBUILDING;
// doFilter(); if (code == TSDB_CODE_INDEX_REBUILDING) {
code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
} else if (code != TSDB_CODE_SUCCESS) { } else if (code != TSDB_CODE_SUCCESS) {
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid); qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
taosArrayDestroy(res); taosArrayDestroy(res);
terrno = code; terrno = code;
return code; return code;
} else { } else {
qDebug("success to get tableIds, size: %d, suid: %" PRIu64 "", (int)taosArrayGetSize(res), tableUid); qDebug("sucess to get tableIds, size: %d, suid: %" PRIu64 "", (int)taosArrayGetSize(res), tableUid);
} }
for (int i = 0; i < taosArrayGetSize(res); i++) { for (int i = 0; i < taosArrayGetSize(res); i++) {
...@@ -246,7 +329,20 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo ...@@ -246,7 +329,20 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
} else { } else {
code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList);
} }
} else { // Create one table group.
if(pTagCond){
int32_t i = 0;
while(i < taosArrayGetSize(pListInfo->pTableList)) {
STableKeyInfo* info = taosArrayGet(pListInfo->pTableList, i);
bool isOk = isTableOk(info, pTagCond, metaHandle);
if(!isOk){
taosArrayRemove(pListInfo->pTableList, i);
continue;
}
i++;
}
}
}else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0}; STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0};
taosArrayPush(pListInfo->pTableList, &info); taosArrayPush(pListInfo->pTableList, &info);
} }
......
...@@ -3872,8 +3872,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -3872,8 +3872,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
} }
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
SNode* pTagCond);
static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList);
...@@ -3987,7 +3986,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -3987,7 +3986,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
} }
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, SNode* pTagCond) { uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo) {
int32_t type = nodeType(pPhyNode); int32_t type = nodeType(pPhyNode);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
...@@ -3995,7 +3994,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -3995,7 +3994,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
tsdbReaderT pDataReader = tsdbReaderT pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
if (pDataReader == NULL && terrno != 0) { if (pDataReader == NULL && terrno != 0) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
...@@ -4023,7 +4022,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4023,7 +4022,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId, pTagCond); createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId);
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId); SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
...@@ -4045,10 +4044,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4045,10 +4044,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pHandle->vnode) { if (pHandle->vnode) {
// for stram // for stram
pDataReader = pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
} else { } else {
// for tq // for tq
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond); getTableList(pHandle->meta, pScanPhyNode, pTableListInfo);
} }
} }
...@@ -4076,7 +4075,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4076,7 +4075,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
int32_t code = getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pScanPhyNode->node.pConditions); int32_t code = getTableList(pHandle->meta, pScanPhyNode, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
...@@ -4134,7 +4133,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4134,7 +4133,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo, pTagCond); ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo);
if (ops[i] == NULL) { if (ops[i] == NULL) {
return NULL; return NULL;
} }
...@@ -4338,8 +4337,8 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -4338,8 +4337,8 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
} }
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4497,8 +4496,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4497,8 +4496,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
} }
(*pTaskInfo)->sql = sql; (*pTaskInfo)->sql = sql;
(*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
(*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
&(*pTaskInfo)->tableqinfoList, pPlan->pTagCond); &(*pTaskInfo)->tableqinfoList);
if (NULL == (*pTaskInfo)->pRoot) { if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code; code = (*pTaskInfo)->code;
goto _complete; goto _complete;
......
...@@ -337,7 +337,7 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_ ...@@ -337,7 +337,7 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_
} }
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data, (data == NULL)); colDataAppend(pColInfoData, i, data, (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
} }
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
...@@ -1829,7 +1829,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1829,7 +1829,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} else { } else {
data = (char*)p; data = (char*)p;
} }
colDataAppend(pDst, count, data, (data == NULL)); colDataAppend(pDst, count, data, (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
data != NULL) { data != NULL) {
...@@ -1852,9 +1852,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1852,9 +1852,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} }
pRes->info.rows = count; pRes->info.rows = count;
doFilter(pInfo->pFilterNode, pRes); pOperator->resultInfo.totalRows += count;
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
} }
...@@ -1884,13 +1882,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -1884,13 +1882,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto _error; goto _error;
} }
pInfo->pTableList = pTableListInfo; pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList; pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode); pInfo->pRes = createResDataBlock(pDescNode);
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
pInfo->pFilterNode = pPhyNode->node.pConditions;
pOperator->name = "TagScanOperator"; pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
...@@ -1972,8 +1968,8 @@ int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) { ...@@ -1972,8 +1968,8 @@ int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
} }
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -1656,10 +1656,9 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1656,10 +1656,9 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL;
} }
return pBInfo->pRes; return pBInfo->pRes->info.rows > 0 ? pBInfo->pRes : NULL;
} }
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
......
...@@ -351,6 +351,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { ...@@ -351,6 +351,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(intervalUnit); COPY_SCALAR_FIELD(intervalUnit);
COPY_SCALAR_FIELD(slidingUnit); COPY_SCALAR_FIELD(slidingUnit);
CLONE_NODE_FIELD(pTagCond); CLONE_NODE_FIELD(pTagCond);
CLONE_NODE_FIELD(pTagIndexCond);
COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId); COPY_SCALAR_FIELD(tsColId);
......
...@@ -2326,6 +2326,7 @@ static const char* jkSubplanNodeAddr = "NodeAddr"; ...@@ -2326,6 +2326,7 @@ static const char* jkSubplanNodeAddr = "NodeAddr";
static const char* jkSubplanRootNode = "RootNode"; static const char* jkSubplanRootNode = "RootNode";
static const char* jkSubplanDataSink = "DataSink"; static const char* jkSubplanDataSink = "DataSink";
static const char* jkSubplanTagCond = "TagCond"; static const char* jkSubplanTagCond = "TagCond";
static const char* jkSubplanTagIndexCond = "TagIndexCond";
static int32_t subplanToJson(const void* pObj, SJson* pJson) { static int32_t subplanToJson(const void* pObj, SJson* pJson) {
const SSubplan* pNode = (const SSubplan*)pObj; const SSubplan* pNode = (const SSubplan*)pObj;
...@@ -2355,6 +2356,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) { ...@@ -2355,6 +2356,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSubplanTagCond, nodeToJson, pNode->pTagCond); code = tjsonAddObject(pJson, jkSubplanTagCond, nodeToJson, pNode->pTagCond);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSubplanTagIndexCond, nodeToJson, pNode->pTagIndexCond);
}
return code; return code;
} }
...@@ -2388,6 +2392,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) { ...@@ -2388,6 +2392,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSubplanTagCond, (SNode**)&pNode->pTagCond); code = jsonToNodeObject(pJson, jkSubplanTagCond, (SNode**)&pNode->pTagCond);
} }
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSubplanTagIndexCond, (SNode**)&pNode->pTagIndexCond);
}
return code; return code;
} }
...@@ -3954,7 +3961,7 @@ static int32_t deleteStmtToJson(const void* pObj, SJson* pJson) { ...@@ -3954,7 +3961,7 @@ static int32_t deleteStmtToJson(const void* pObj, SJson* pJson) {
code = tjsonAddObject(pJson, jkDeleteStmtCountFunc, nodeToJson, pNode->pCountFunc); code = tjsonAddObject(pJson, jkDeleteStmtCountFunc, nodeToJson, pNode->pCountFunc);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkDeleteStmtTagIndexCond, nodeToJson, pNode->pTagIndexCond); code = tjsonAddObject(pJson, jkDeleteStmtTagIndexCond, nodeToJson, pNode->pTagCond);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDeleteStmtTimeRangeStartKey, pNode->timeRange.skey); code = tjsonAddIntegerToObject(pJson, jkDeleteStmtTimeRangeStartKey, pNode->timeRange.skey);
...@@ -3983,7 +3990,7 @@ static int32_t jsonToDeleteStmt(const SJson* pJson, void* pObj) { ...@@ -3983,7 +3990,7 @@ static int32_t jsonToDeleteStmt(const SJson* pJson, void* pObj) {
code = jsonToNodeObject(pJson, jkDeleteStmtCountFunc, &pNode->pCountFunc); code = jsonToNodeObject(pJson, jkDeleteStmtCountFunc, &pNode->pCountFunc);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkDeleteStmtTagIndexCond, &pNode->pTagIndexCond); code = jsonToNodeObject(pJson, jkDeleteStmtTagIndexCond, &pNode->pTagCond);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkDeleteStmtTimeRangeStartKey, &pNode->timeRange.skey); code = tjsonGetBigIntValue(pJson, jkDeleteStmtTimeRangeStartKey, &pNode->timeRange.skey);
......
...@@ -669,7 +669,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -669,7 +669,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pFromTable); nodesDestroyNode(pStmt->pFromTable);
nodesDestroyNode(pStmt->pWhere); nodesDestroyNode(pStmt->pWhere);
nodesDestroyNode(pStmt->pCountFunc); nodesDestroyNode(pStmt->pCountFunc);
nodesDestroyNode(pStmt->pTagIndexCond); nodesDestroyNode(pStmt->pTagCond);
break; break;
} }
case QUERY_NODE_QUERY: { case QUERY_NODE_QUERY: {
...@@ -688,7 +688,13 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -688,7 +688,13 @@ void nodesDestroyNode(SNode* pNode) {
SScanLogicNode* pLogicNode = (SScanLogicNode*)pNode; SScanLogicNode* pLogicNode = (SScanLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode); destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyList(pLogicNode->pScanCols); nodesDestroyList(pLogicNode->pScanCols);
nodesDestroyList(pLogicNode->pScanPseudoCols);
taosMemoryFreeClear(pLogicNode->pVgroupList); taosMemoryFreeClear(pLogicNode->pVgroupList);
nodesDestroyList(pLogicNode->pDynamicScanFuncs);
nodesDestroyNode(pLogicNode->pTagCond);
nodesDestroyNode(pLogicNode->pTagIndexCond);
taosArrayDestroy(pLogicNode->pSmaIndexes);
nodesDestroyList(pLogicNode->pPartTags);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_JOIN: { case QUERY_NODE_LOGIC_PLAN_JOIN: {
...@@ -897,6 +903,8 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -897,6 +903,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pSubplan->pChildren); nodesDestroyList(pSubplan->pChildren);
nodesDestroyNode((SNode*)pSubplan->pNode); nodesDestroyNode((SNode*)pSubplan->pNode);
nodesDestroyNode((SNode*)pSubplan->pDataSink); nodesDestroyNode((SNode*)pSubplan->pDataSink);
nodesDestroyNode((SNode*)pSubplan->pTagCond);
nodesDestroyNode((SNode*)pSubplan->pTagIndexCond);
nodesClearList(pSubplan->pParents); nodesClearList(pSubplan->pParents);
break; break;
} }
...@@ -1130,6 +1138,7 @@ void* nodesGetValueFromNode(SValueNode* pNode) { ...@@ -1130,6 +1138,7 @@ void* nodesGetValueFromNode(SValueNode* pNode) {
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_JSON:
return (void*)pNode->datum.p; return (void*)pNode->datum.p;
default: default:
break; break;
...@@ -1659,6 +1668,7 @@ int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc) { ...@@ -1659,6 +1668,7 @@ int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc) {
typedef struct SClassifyConditionCxt { typedef struct SClassifyConditionCxt {
bool hasPrimaryKey; bool hasPrimaryKey;
bool hasTagIndexCol; bool hasTagIndexCol;
bool hasTagCol;
bool hasOtherCol; bool hasOtherCol;
} SClassifyConditionCxt; } SClassifyConditionCxt;
...@@ -1670,6 +1680,9 @@ static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) { ...@@ -1670,6 +1680,9 @@ static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) {
pCxt->hasPrimaryKey = true; pCxt->hasPrimaryKey = true;
} else if (pCol->hasIndex) { } else if (pCol->hasIndex) {
pCxt->hasTagIndexCol = true; pCxt->hasTagIndexCol = true;
pCxt->hasTagCol = true;
} else if (COLUMN_TYPE_TAG == pCol->colType) {
pCxt->hasTagCol = true;
} else { } else {
pCxt->hasOtherCol = true; pCxt->hasOtherCol = true;
} }
...@@ -1678,23 +1691,31 @@ static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) { ...@@ -1678,23 +1691,31 @@ static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
typedef enum EConditionType { COND_TYPE_PRIMARY_KEY = 1, COND_TYPE_TAG_INDEX, COND_TYPE_NORMAL } EConditionType; typedef enum EConditionType {
COND_TYPE_PRIMARY_KEY = 1,
COND_TYPE_TAG_INDEX,
COND_TYPE_TAG,
COND_TYPE_NORMAL
} EConditionType;
static EConditionType classifyCondition(SNode* pNode) { static EConditionType classifyCondition(SNode* pNode) {
SClassifyConditionCxt cxt = {.hasPrimaryKey = false, .hasTagIndexCol = false, .hasOtherCol = false}; SClassifyConditionCxt cxt = {.hasPrimaryKey = false, .hasTagIndexCol = false, .hasOtherCol = false};
nodesWalkExpr(pNode, classifyConditionImpl, &cxt); nodesWalkExpr(pNode, classifyConditionImpl, &cxt);
return cxt.hasOtherCol ? COND_TYPE_NORMAL return cxt.hasOtherCol ? COND_TYPE_NORMAL
: (cxt.hasPrimaryKey && cxt.hasTagIndexCol : (cxt.hasPrimaryKey && cxt.hasTagCol
? COND_TYPE_NORMAL ? COND_TYPE_NORMAL
: (cxt.hasPrimaryKey ? COND_TYPE_PRIMARY_KEY : COND_TYPE_TAG_INDEX)); : (cxt.hasPrimaryKey ? COND_TYPE_PRIMARY_KEY
: (cxt.hasTagIndexCol ? COND_TYPE_TAG_INDEX : COND_TYPE_TAG)));
} }
static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagCond, SNode** pOtherCond) { static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond,
SNode** pOtherCond) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(*pCondition); SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(*pCondition);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pPrimaryKeyConds = NULL; SNodeList* pPrimaryKeyConds = NULL;
SNodeList* pTagIndexConds = NULL;
SNodeList* pTagConds = NULL; SNodeList* pTagConds = NULL;
SNodeList* pOtherConds = NULL; SNodeList* pOtherConds = NULL;
SNode* pCond = NULL; SNode* pCond = NULL;
...@@ -1706,6 +1727,14 @@ static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, S ...@@ -1706,6 +1727,14 @@ static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, S
} }
break; break;
case COND_TYPE_TAG_INDEX: case COND_TYPE_TAG_INDEX:
if (NULL != pTagIndexCond) {
code = nodesListMakeAppend(&pTagIndexConds, nodesCloneNode(pCond));
}
if (NULL != pTagCond) {
code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond));
}
break;
case COND_TYPE_TAG:
if (NULL != pTagCond) { if (NULL != pTagCond) {
code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond)); code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond));
} }
...@@ -1723,11 +1752,15 @@ static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, S ...@@ -1723,11 +1752,15 @@ static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, S
} }
SNode* pTempPrimaryKeyCond = NULL; SNode* pTempPrimaryKeyCond = NULL;
SNode* pTempTagIndexCond = NULL;
SNode* pTempTagCond = NULL; SNode* pTempTagCond = NULL;
SNode* pTempOtherCond = NULL; SNode* pTempOtherCond = NULL;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds); code = nodesMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds);
} }
if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempTagIndexCond, &pTagIndexConds);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempTagCond, &pTagConds); code = nodesMergeConds(&pTempTagCond, &pTagConds);
} }
...@@ -1739,6 +1772,9 @@ static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, S ...@@ -1739,6 +1772,9 @@ static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, S
if (NULL != pPrimaryKeyCond) { if (NULL != pPrimaryKeyCond) {
*pPrimaryKeyCond = pTempPrimaryKeyCond; *pPrimaryKeyCond = pTempPrimaryKeyCond;
} }
if (NULL != pTagIndexCond) {
*pTagIndexCond = pTempTagIndexCond;
}
if (NULL != pTagCond) { if (NULL != pTagCond) {
*pTagCond = pTempTagCond; *pTagCond = pTempTagCond;
} }
...@@ -1749,9 +1785,11 @@ static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, S ...@@ -1749,9 +1785,11 @@ static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, S
*pCondition = NULL; *pCondition = NULL;
} else { } else {
nodesDestroyList(pPrimaryKeyConds); nodesDestroyList(pPrimaryKeyConds);
nodesDestroyList(pTagIndexConds);
nodesDestroyList(pTagConds); nodesDestroyList(pTagConds);
nodesDestroyList(pOtherConds); nodesDestroyList(pOtherConds);
nodesDestroyNode(pTempPrimaryKeyCond); nodesDestroyNode(pTempPrimaryKeyCond);
nodesDestroyNode(pTempTagIndexCond);
nodesDestroyNode(pTempTagCond); nodesDestroyNode(pTempTagCond);
nodesDestroyNode(pTempOtherCond); nodesDestroyNode(pTempOtherCond);
} }
...@@ -1759,10 +1797,11 @@ static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, S ...@@ -1759,10 +1797,11 @@ static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, S
return code; return code;
} }
int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagCond, SNode** pOtherCond) { int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond,
SNode** pOtherCond) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCondition) && if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCondition) &&
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)*pCondition)->condType) { LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)*pCondition)->condType) {
return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagCond, pOtherCond); return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagIndexCond, pTagCond, pOtherCond);
} }
switch (classifyCondition(*pCondition)) { switch (classifyCondition(*pCondition)) {
...@@ -1772,6 +1811,21 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** ...@@ -1772,6 +1811,21 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode**
} }
break; break;
case COND_TYPE_TAG_INDEX: case COND_TYPE_TAG_INDEX:
if (NULL != pTagIndexCond) {
*pTagIndexCond = *pCondition;
}
if (NULL != pTagCond) {
SNode* pTempCond = *pCondition;
if (NULL != pTagIndexCond) {
pTempCond = nodesCloneNode(*pCondition);
if (NULL == pTempCond) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
*pTagCond = pTempCond;
}
break;
case COND_TYPE_TAG:
if (NULL != pTagCond) { if (NULL != pTagCond) {
*pTagCond = *pCondition; *pTagCond = *pCondition;
} }
......
...@@ -2013,7 +2013,7 @@ static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWin ...@@ -2013,7 +2013,7 @@ static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWin
} }
SNode* pPrimaryKeyCond = NULL; SNode* pPrimaryKeyCond = NULL;
nodesPartitionCond(&pCond, &pPrimaryKeyCond, NULL, NULL); nodesPartitionCond(&pCond, &pPrimaryKeyCond, NULL, NULL, NULL);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pPrimaryKeyCond) { if (NULL != pPrimaryKeyCond) {
...@@ -2161,9 +2161,6 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) { ...@@ -2161,9 +2161,6 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
if (COLUMN_TYPE_TAG == pCol->colType) { if (COLUMN_TYPE_TAG == pCol->colType) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_COL); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_COL);
} }
if (TSDB_SUPER_TABLE == pCol->tableType) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_STATE_WIN_TABLE);
}
} }
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
...@@ -2685,7 +2682,7 @@ static int32_t partitionDeleteWhere(STranslateContext* pCxt, SDeleteStmt* pDelet ...@@ -2685,7 +2682,7 @@ static int32_t partitionDeleteWhere(STranslateContext* pCxt, SDeleteStmt* pDelet
SNode* pPrimaryKeyCond = NULL; SNode* pPrimaryKeyCond = NULL;
SNode* pOtherCond = NULL; SNode* pOtherCond = NULL;
int32_t code = nodesPartitionCond(&pDelete->pWhere, &pPrimaryKeyCond, &pDelete->pTagIndexCond, &pOtherCond); int32_t code = nodesPartitionCond(&pDelete->pWhere, &pPrimaryKeyCond, NULL, &pDelete->pTagCond, &pOtherCond);
if (TSDB_CODE_SUCCESS == code && NULL != pOtherCond) { if (TSDB_CODE_SUCCESS == code && NULL != pOtherCond) {
code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DELETE_WHERE); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DELETE_WHERE);
} }
......
...@@ -1160,8 +1160,8 @@ static int32_t createDeleteScanLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* p ...@@ -1160,8 +1160,8 @@ static int32_t createDeleteScanLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* p
} }
} }
if (TSDB_CODE_SUCCESS == code && NULL != pDelete->pTagIndexCond) { if (TSDB_CODE_SUCCESS == code && NULL != pDelete->pTagCond) {
pScan->pTagCond = nodesCloneNode(pDelete->pTagIndexCond); pScan->pTagCond = nodesCloneNode(pDelete->pTagCond);
if (NULL == pScan->pTagCond) { if (NULL == pScan->pTagCond) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
#include "filter.h" #include "filter.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "index.h"
#include "planInt.h" #include "planInt.h"
#include "ttime.h" #include "ttime.h"
...@@ -309,32 +308,6 @@ static int32_t cpdCalcTimeRange(SOptimizeContext* pCxt, SScanLogicNode* pScan, S ...@@ -309,32 +308,6 @@ static int32_t cpdCalcTimeRange(SOptimizeContext* pCxt, SScanLogicNode* pScan, S
return code; return code;
} }
static int32_t cpdApplyTagIndex(SScanLogicNode* pScan, SNode** pTagCond, SNode** pOtherCond) {
int32_t code = TSDB_CODE_SUCCESS;
SIdxFltStatus idxStatus = idxGetFltStatus(*pTagCond);
switch (idxStatus) {
case SFLT_NOT_INDEX:
code = cpdCondAppend(pOtherCond, pTagCond);
break;
case SFLT_COARSE_INDEX:
pScan->pTagCond = nodesCloneNode(*pTagCond);
if (NULL == pScan->pTagCond) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
code = cpdCondAppend(pOtherCond, pTagCond);
break;
case SFLT_ACCURATE_INDEX:
pScan->pTagCond = *pTagCond;
*pTagCond = NULL;
break;
default:
code = TSDB_CODE_FAILED;
break;
}
return code;
}
static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) { static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD) || if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD) ||
TSDB_SYSTEM_TABLE == pScan->tableType) { TSDB_SYSTEM_TABLE == pScan->tableType) {
...@@ -342,15 +315,12 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* ...@@ -342,15 +315,12 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
} }
SNode* pPrimaryKeyCond = NULL; SNode* pPrimaryKeyCond = NULL;
SNode* pTagCond = NULL;
SNode* pOtherCond = NULL; SNode* pOtherCond = NULL;
int32_t code = nodesPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pTagCond, &pOtherCond); int32_t code = nodesPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pScan->pTagIndexCond, &pScan->pTagCond,
&pOtherCond);
if (TSDB_CODE_SUCCESS == code && NULL != pPrimaryKeyCond) { if (TSDB_CODE_SUCCESS == code && NULL != pPrimaryKeyCond) {
code = cpdCalcTimeRange(pCxt, pScan, &pPrimaryKeyCond, &pOtherCond); code = cpdCalcTimeRange(pCxt, pScan, &pPrimaryKeyCond, &pOtherCond);
} }
if (TSDB_CODE_SUCCESS == code && NULL != pTagCond) {
code = cpdApplyTagIndex(pScan, &pTagCond, &pOtherCond);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pScan->node.pConditions = pOtherCond; pScan->node.pConditions = pOtherCond;
} }
......
...@@ -436,6 +436,15 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS ...@@ -436,6 +436,15 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS
} }
} }
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pScanLogicNode->pTagIndexCond) {
pSubplan->pTagIndexCond = nodesCloneNode(pScanLogicNode->pTagIndexCond);
if (NULL == pSubplan->pTagIndexCond) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pScanPhysiNode; *pPhyNode = (SPhysiNode*)pScanPhysiNode;
} else { } else {
......
...@@ -166,6 +166,31 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { ...@@ -166,6 +166,31 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)); return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
} }
static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
}
if (WINDOW_TYPE_SESSION == pWindow->winType) {
if (!streamQuery) {
return stbSplHasMultiTbScan(streamQuery, pNode);
} else {
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
}
}
if (WINDOW_TYPE_STATE == pWindow->winType) {
if (!streamQuery) {
return stbSplHasMultiTbScan(streamQuery, pNode);
} else {
return false;
}
}
return false;
}
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
...@@ -174,13 +199,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { ...@@ -174,13 +199,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: { case QUERY_NODE_LOGIC_PLAN_WINDOW:
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; return stbSplNeedSplitWindow(streamQuery, pNode);
if (WINDOW_TYPE_STATE == pWindow->winType || (!streamQuery && WINDOW_TYPE_SESSION == pWindow->winType)) {
return false;
}
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
}
case QUERY_NODE_LOGIC_PLAN_SORT: case QUERY_NODE_LOGIC_PLAN_SORT:
return stbSplHasMultiTbScan(streamQuery, pNode); return stbSplHasMultiTbScan(streamQuery, pNode);
default: default:
...@@ -477,11 +497,64 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo ...@@ -477,11 +497,64 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo
return code; return code;
} }
static void splSetTableScanType(SLogicNode* pNode, EScanType scanType) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
((SScanLogicNode*)pNode)->scanType = scanType;
} else {
if (1 == LIST_LENGTH(pNode->pChildren)) {
splSetTableScanType((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), scanType);
}
}
}
static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pWindow = pInfo->pSplitNode;
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);
SNodeList* pMergeKeys = NULL;
int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pChild, SPLIT_FLAG_STABLE_SPLIT));
}
if (TSDB_CODE_SUCCESS == code) {
splSetTableScanType(pChild, SCAN_TYPE_TABLE_MERGE);
++(pCxt->groupId);
}
if (TSDB_CODE_SUCCESS == code) {
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
} else {
nodesDestroyList(pMergeKeys);
}
return code;
}
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->streamQuery) {
return stbSplSplitSessionForStream(pCxt, pInfo); return stbSplSplitSessionForStream(pCxt, pInfo);
} else { } else {
return TSDB_CODE_PLAN_INTERNAL_ERROR; return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
}
}
static int32_t stbSplSplitStateForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) {
return stbSplSplitStateForStream(pCxt, pInfo);
} else {
return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
} }
} }
...@@ -511,6 +584,8 @@ static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitI ...@@ -511,6 +584,8 @@ static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitI
return stbSplSplitInterval(pCxt, pInfo); return stbSplSplitInterval(pCxt, pInfo);
case WINDOW_TYPE_SESSION: case WINDOW_TYPE_SESSION:
return stbSplSplitSession(pCxt, pInfo); return stbSplSplitSession(pCxt, pInfo);
case WINDOW_TYPE_STATE:
return stbSplSplitState(pCxt, pInfo);
default: default:
break; break;
} }
......
...@@ -40,6 +40,8 @@ TEST_F(PlanOptimizeTest, ConditionPushDown) { ...@@ -40,6 +40,8 @@ TEST_F(PlanOptimizeTest, ConditionPushDown) {
run("SELECT ts, c1 FROM st1 WHERE tag1 > 4 or tag1 < 2"); run("SELECT ts, c1 FROM st1 WHERE tag1 > 4 or tag1 < 2");
run("SELECT ts, c1 FROM st1 WHERE tag1 > 4 AND tag2 = 'hello'"); run("SELECT ts, c1 FROM st1 WHERE tag1 > 4 AND tag2 = 'hello'");
run("SELECT ts, c1 FROM st1 WHERE tag1 > 4 AND tag2 = 'hello' AND c1 > 10");
} }
TEST_F(PlanOptimizeTest, orderByPrimaryKey) { TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
......
...@@ -34,3 +34,13 @@ TEST_F(PlanSessionTest, selectFunc) { ...@@ -34,3 +34,13 @@ TEST_F(PlanSessionTest, selectFunc) {
// select function along with the columns of select row, and with SESSION clause // select function along with the columns of select row, and with SESSION clause
run("SELECT MAX(c1), c2 FROM t1 SESSION(ts, 10s)"); run("SELECT MAX(c1), c2 FROM t1 SESSION(ts, 10s)");
} }
TEST_F(PlanSessionTest, stable) {
useDb("root", "test");
// select function for SESSION clause
run("SELECT MAX(c1), MIN(c1) FROM st1 SESSION(ts, 10s)");
// select function along with the columns of select row, and with SESSION clause
run("SELECT MAX(c1), c2 FROM st1 SESSION(ts, 10s)");
run("SELECT count(ts) FROM st1 PARTITION BY c1 SESSION(ts, 10s)");
}
...@@ -40,3 +40,12 @@ TEST_F(PlanStateTest, selectFunc) { ...@@ -40,3 +40,12 @@ TEST_F(PlanStateTest, selectFunc) {
// select function along with the columns of select row, and with STATE_WINDOW clause // select function along with the columns of select row, and with STATE_WINDOW clause
run("SELECT MAX(c1), c2 FROM t1 STATE_WINDOW(c3)"); run("SELECT MAX(c1), c2 FROM t1 STATE_WINDOW(c3)");
} }
TEST_F(PlanStateTest, stable) {
useDb("root", "test");
// select function for STATE_WINDOW clause
run("SELECT MAX(c1), MIN(c1) FROM st1 STATE_WINDOW(c2)");
// select function along with the columns of select row, and with STATE_WINDOW clause
run("SELECT MAX(c1), c2 FROM st1 STATE_WINDOW(c2)");
}
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "sclvector.h" #include "sclvector.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdataformat.h"
#include "ttypes.h" #include "ttypes.h"
#include "ttime.h" #include "ttime.h"
...@@ -506,6 +507,16 @@ bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t ...@@ -506,6 +507,16 @@ bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t
} }
} }
// if types can not comparable
if((IS_NUMERIC_TYPE(typeLeft) && !IS_NUMERIC_TYPE(typeRight)) ||
(IS_NUMERIC_TYPE(typeRight) && !IS_NUMERIC_TYPE(typeLeft)) ||
(IS_VAR_DATA_TYPE(typeLeft) && !IS_VAR_DATA_TYPE(typeRight)) ||
(IS_VAR_DATA_TYPE(typeRight) && !IS_VAR_DATA_TYPE(typeLeft)) ||
((typeLeft == TSDB_DATA_TYPE_BOOL) && (typeRight != TSDB_DATA_TYPE_BOOL)) ||
((typeRight == TSDB_DATA_TYPE_BOOL) && (typeLeft != TSDB_DATA_TYPE_BOOL)))
return false;
if(typeLeft == TSDB_DATA_TYPE_NULL || typeRight == TSDB_DATA_TYPE_NULL){ if(typeLeft == TSDB_DATA_TYPE_NULL || typeRight == TSDB_DATA_TYPE_NULL){
*isNull = true; *isNull = true;
return true; return true;
...@@ -519,24 +530,28 @@ bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t ...@@ -519,24 +530,28 @@ bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t
*fp = filterGetCompFunc(type, optr); *fp = filterGetCompFunc(type, optr);
if(IS_NUMERIC_TYPE(type) || IS_FLOAT_TYPE(type)){ if(IS_NUMERIC_TYPE(type)){
if(typeLeft == TSDB_DATA_TYPE_NCHAR) { if(typeLeft == TSDB_DATA_TYPE_NCHAR) {
convertNcharToDouble(*pLeftData, pLeftOut); ASSERT(0);
*pLeftData = pLeftOut; // convertNcharToDouble(*pLeftData, pLeftOut);
// *pLeftData = pLeftOut;
} else if(typeLeft == TSDB_DATA_TYPE_BINARY) { } else if(typeLeft == TSDB_DATA_TYPE_BINARY) {
convertBinaryToDouble(*pLeftData, pLeftOut); ASSERT(0);
*pLeftData = pLeftOut; // convertBinaryToDouble(*pLeftData, pLeftOut);
// *pLeftData = pLeftOut;
} else if(typeLeft != type) { } else if(typeLeft != type) {
convertNumberToNumber(*pLeftData, pLeftOut, typeLeft, type); convertNumberToNumber(*pLeftData, pLeftOut, typeLeft, type);
*pLeftData = pLeftOut; *pLeftData = pLeftOut;
} }
if(typeRight == TSDB_DATA_TYPE_NCHAR) { if(typeRight == TSDB_DATA_TYPE_NCHAR) {
convertNcharToDouble(*pRightData, pRightOut); ASSERT(0);
*pRightData = pRightOut; // convertNcharToDouble(*pRightData, pRightOut);
// *pRightData = pRightOut;
} else if(typeRight == TSDB_DATA_TYPE_BINARY) { } else if(typeRight == TSDB_DATA_TYPE_BINARY) {
convertBinaryToDouble(*pRightData, pRightOut); ASSERT(0);
*pRightData = pRightOut; // convertBinaryToDouble(*pRightData, pRightOut);
// *pRightData = pRightOut;
} else if(typeRight != type) { } else if(typeRight != type) {
convertNumberToNumber(*pRightData, pRightOut, typeRight, type); convertNumberToNumber(*pRightData, pRightOut, typeRight, type);
*pRightData = pRightOut; *pRightData = pRightOut;
...@@ -1693,6 +1708,13 @@ void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, ...@@ -1693,6 +1708,13 @@ void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
STagVal getJsonValue(char *json, char *key, bool *isExist) { STagVal getJsonValue(char *json, char *key, bool *isExist) {
STagVal val = {.pKey = key}; STagVal val = {.pKey = key};
if (tTagIsJson((const STag *)json) == false){
if(isExist){
*isExist = false;
}
return val;
}
bool find = tTagGet(((const STag *)json), &val); // json value is null and not exist is different bool find = tTagGet(((const STag *)json), &val); // json value is null and not exist is different
if(isExist){ if(isExist){
*isExist = find; *isExist = find;
......
...@@ -1310,11 +1310,11 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1310,11 +1310,11 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json string--0 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n"); printf("--------------------json string--0 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n");
key = "k2"; key = "k2";
bool eRes1[len+len1] = {false, false, true, true, false, false, false, true, false, true, false, true, true}; bool eRes1[len+len1] = {false, false, false, false, false, false, false, true, false, true, false, true, true};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes1[i], op[i], false); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes1[i], op[i], false);
} }
bool eRes_1[len0] = {true, true, false, false, false, false}; bool eRes_1[len0] = {false, false, false, false, false, false};
for(int i = 0; i < len0; i++){ for(int i = 0; i < len0; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_1[i], op[i], true); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_1[i], op[i], true);
} }
...@@ -1346,11 +1346,11 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1346,11 +1346,11 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json bool--1 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n"); printf("--------------------json bool--1 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n");
key = "k4"; key = "k4";
bool eRes3[len+len1] = {false, false, true, true, false, true, false, true, true, false, false, false, false}; bool eRes3[len+len1] = {false, false, false, false, false, false, false, true, true, false, false, false, false};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes3[i], op[i], false); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes3[i], op[i], false);
} }
bool eRes_3[len0] = {false, true, false, false, false, true}; bool eRes_3[len0] = {false, false, false, false, false, false};
for(int i = 0; i < len0; i++){ for(int i = 0; i < len0; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_3[i], op[i], true); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_3[i], op[i], true);
} }
...@@ -1419,11 +1419,11 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1419,11 +1419,11 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json bool-- 0 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n"); printf("--------------------json bool-- 0 {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n");
key = "k8"; key = "k8";
bool eRes7[len+len1] = {false, false, true, true, false, false, false, true, false, false, false, false, false}; bool eRes7[len+len1] = {false, false, false, false, false, false, false, true, false, false, false, false, false};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes7[i], op[i], false); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes7[i], op[i], false);
} }
bool eRes_7[len0] = {true, true, false, false, false, false}; bool eRes_7[len0] = {false, false, false, false, false, false};
for(int i = 0; i < len0; i++) { for(int i = 0; i < len0; i++) {
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_7[i], op[i], true); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_7[i], op[i], true);
} }
...@@ -1438,11 +1438,11 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1438,11 +1438,11 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json string-- 6.6hello {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n"); printf("--------------------json string-- 6.6hello {1, 8, 2, 2, 3, 0, 0, 0, 0}-------------------\n");
key = "k9"; key = "k9";
bool eRes8[len+len1] = {true, false, false, false, false, true, false, true, true, false, true, false, true}; bool eRes8[len+len1] = {false, false, false, false, false, false, false, true, true, false, true, false, true};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes8[i], op[i], false); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes8[i], op[i], false);
} }
bool eRes_8[len0] = {false, true, true, true, false, true}; bool eRes_8[len0] = {false, false, false, false, false, false};
for(int i = 0; i < len0; i++) { for(int i = 0; i < len0; i++) {
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_8[i], op[i], true); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes_8[i], op[i], true);
} }
......
...@@ -88,11 +88,11 @@ void taosSetSystemLocale(const char *inLocale, const char *inCharSet) { ...@@ -88,11 +88,11 @@ void taosSetSystemLocale(const char *inLocale, const char *inCharSet) {
void taosGetSystemLocale(char *outLocale, char *outCharset) { void taosGetSystemLocale(char *outLocale, char *outCharset) {
#ifdef WINDOWS #ifdef WINDOWS
char *locale = setlocale(LC_CTYPE, "chs"); char *locale = setlocale(LC_CTYPE, "en_US.UTF-8");
if (locale != NULL) { if (locale != NULL) {
tstrncpy(outLocale, locale, TD_LOCALE_LEN); tstrncpy(outLocale, locale, TD_LOCALE_LEN);
} }
strcpy(outCharset, "cp936"); strcpy(outCharset, "UTF-8");
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
/* /*
......
...@@ -52,7 +52,7 @@ echo wal 0 >> %TAOS_CFG% ...@@ -52,7 +52,7 @@ echo wal 0 >> %TAOS_CFG%
echo asyncLog 0 >> %TAOS_CFG% echo asyncLog 0 >> %TAOS_CFG%
echo locale en_US.UTF-8 >> %TAOS_CFG% echo locale en_US.UTF-8 >> %TAOS_CFG%
echo enableCoreFile 1 >> %TAOS_CFG% echo enableCoreFile 1 >> %TAOS_CFG%
echo charset cp65001 >> %TAOS_CFG% echo charset UTF-8 >> %TAOS_CFG%
set "FILE_NAME=testSuite.sim" set "FILE_NAME=testSuite.sim"
if "%1" == "-f" set "FILE_NAME=%2" if "%1" == "-f" set "FILE_NAME=%2"
......
from distutils.log import error
import taos
import sys
import time
import socket
import os
import threading
import subprocess
import platform
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def prepare_udf_so(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
print(projPath)
if platform.system().lower() == 'windows':
self.libudf1 = subprocess.Popen('(for /r %s %%i in ("udf1.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
if (not tdDnodes.dnodes[0].remoteIP == ""):
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1.so',projPath+"\\debug\\build\\lib\\")
self.libudf1 = self.libudf1.replace('udf1.dll','libudf1.so')
else:
self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1 = self.libudf1.replace('\r','').replace('\n','')
return
def create_udf_function(self):
# create scalar functions
tdSql.execute("create function udf1 as '%s' outputtype int bufSize 8;"%self.libudf1)
functions = tdSql.getResult("show functions")
function_nums = len(functions)
if function_nums == 1:
tdLog.info("create one udf functions success ")
else:
tdLog.exit("create udf functions fail")
return
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic1', 'topic2']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts, c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
queryString = "select ts, c1,udf1(c1),sin(udf1(c2)), log(udf1(c2)) from %s.%s where udf1(c1) == 88 or sin(udf1(c1)) > 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 1
topicList = topicNameList[1]
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.prepare_udf_so()
self.create_udf_function()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -133,3 +133,4 @@ python3 ./test.py -f 7-tmq/tmqError.py ...@@ -133,3 +133,4 @@ python3 ./test.py -f 7-tmq/tmqError.py
python3 ./test.py -f 7-tmq/schema.py python3 ./test.py -f 7-tmq/schema.py
python3 ./test.py -f 7-tmq/stbFilter.py python3 ./test.py -f 7-tmq/stbFilter.py
python3 ./test.py -f 7-tmq/tmqCheckData.py python3 ./test.py -f 7-tmq/tmqCheckData.py
python3 ./test.py -f 7-tmq/tmqUdf.py
...@@ -4,8 +4,8 @@ ...@@ -4,8 +4,8 @@
python3 .\test.py -f 0-others\taosShellNetChk.py python3 .\test.py -f 0-others\taosShellNetChk.py
python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\telemetry.py
python3 .\test.py -f 0-others\taosdMonitor.py python3 .\test.py -f 0-others\taosdMonitor.py
python3 .\test.py -f 0-others\udfTest.py @REM python3 .\test.py -f 0-others\udfTest.py
python3 .\test.py -f 0-others\udf_create.py @REM python3 .\test.py -f 0-others\udf_create.py
@REM python3 .\test.py -f 0-others\udf_restart_taosd.py @REM python3 .\test.py -f 0-others\udf_restart_taosd.py
@REM python3 .\test.py -f 0-others\cachelast.py @REM python3 .\test.py -f 0-others\cachelast.py
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#define UP 3 #define UP 3
#define DOWN 4 #define DOWN 4
#define PSIZE shell.info.promptSize #define PSIZE shell.info.promptSize
#define SHELL_INPUT_MAX_COMMAND_SIZE 10000
typedef struct { typedef struct {
char *buffer; char *buffer;
...@@ -227,6 +228,7 @@ void shellPrintChar(char c, int32_t times) { ...@@ -227,6 +228,7 @@ void shellPrintChar(char c, int32_t times) {
} }
void shellPositionCursor(int32_t step, int32_t direction) { void shellPositionCursor(int32_t step, int32_t direction) {
#ifndef WINDOWS
if (step > 0) { if (step > 0) {
if (direction == LEFT) { if (direction == LEFT) {
fprintf(stdout, "\033[%dD", step); fprintf(stdout, "\033[%dD", step);
...@@ -239,6 +241,7 @@ void shellPositionCursor(int32_t step, int32_t direction) { ...@@ -239,6 +241,7 @@ void shellPositionCursor(int32_t step, int32_t direction) {
} }
fflush(stdout); fflush(stdout);
} }
#endif
} }
void shellUpdateBuffer(SShellCmd *cmd) { void shellUpdateBuffer(SShellCmd *cmd) {
...@@ -330,10 +333,14 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) { ...@@ -330,10 +333,14 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) {
int32_t command_x = ecmd_pos / ws_col; int32_t command_x = ecmd_pos / ws_col;
shellPositionCursor(cursor_y, LEFT); shellPositionCursor(cursor_y, LEFT);
shellPositionCursor(command_x - cursor_x, DOWN); shellPositionCursor(command_x - cursor_x, DOWN);
#ifndef WINDOWS
fprintf(stdout, "\033[2K"); fprintf(stdout, "\033[2K");
#endif
for (int32_t i = 0; i < command_x; i++) { for (int32_t i = 0; i < command_x; i++) {
shellPositionCursor(1, UP); shellPositionCursor(1, UP);
#ifndef WINDOWS
fprintf(stdout, "\033[2K"); fprintf(stdout, "\033[2K");
#endif
} }
fflush(stdout); fflush(stdout);
} }
...@@ -394,6 +401,41 @@ void shellShowOnScreen(SShellCmd *cmd) { ...@@ -394,6 +401,41 @@ void shellShowOnScreen(SShellCmd *cmd) {
fflush(stdout); fflush(stdout);
} }
char taosGetConsoleChar() {
#ifdef WINDOWS
static void *console = NULL;
if (console == NULL) {
console = GetStdHandle(STD_INPUT_HANDLE);
}
static TdWchar buf[SHELL_INPUT_MAX_COMMAND_SIZE];
static char mbStr[5];
static unsigned long bufLen = 0;
static uint16_t bufIndex = 0, mbStrIndex = 0, mbStrLen = 0;
if (bufLen == 0) {
ReadConsoleW(console, buf, SHELL_INPUT_MAX_COMMAND_SIZE, &bufLen, NULL);
bufIndex = 0;
}
if (mbStrLen == 0){
if (buf[bufIndex] == '\r') {
bufIndex++;
}
mbStrLen = WideCharToMultiByte(CP_UTF8, 0, &buf[bufIndex], 1, mbStr, sizeof(mbStr), NULL, NULL);
mbStrIndex = 0;
bufIndex++;
}
mbStrIndex++;
if (mbStrIndex == mbStrLen) {
mbStrLen = 0;
if (bufIndex == bufLen) {
bufLen = 0;
}
}
return mbStr[mbStrIndex-1];
#else
return (char)getchar(); // getchar() return an 'int32_t' value
#endif
}
int32_t shellReadCommand(char *command) { int32_t shellReadCommand(char *command) {
SShellHistory *pHistory = &shell.history; SShellHistory *pHistory = &shell.history;
SShellCmd cmd = {0}; SShellCmd cmd = {0};
...@@ -407,7 +449,7 @@ int32_t shellReadCommand(char *command) { ...@@ -407,7 +449,7 @@ int32_t shellReadCommand(char *command) {
// Read input. // Read input.
char c; char c;
while (1) { while (1) {
c = (char)getchar(); // getchar() return an 'int32_t' value c = taosGetConsoleChar();
if (c == EOF) { if (c == EOF) {
return c; return c;
...@@ -417,7 +459,7 @@ int32_t shellReadCommand(char *command) { ...@@ -417,7 +459,7 @@ int32_t shellReadCommand(char *command) {
int32_t count = shellCountPrefixOnes(c); int32_t count = shellCountPrefixOnes(c);
utf8_array[0] = c; utf8_array[0] = c;
for (int32_t k = 1; k < count; k++) { for (int32_t k = 1; k < count; k++) {
c = (char)getchar(); c = taosGetConsoleChar();
utf8_array[k] = c; utf8_array[k] = c;
} }
shellInsertChar(&cmd, utf8_array, count); shellInsertChar(&cmd, utf8_array, count);
...@@ -472,10 +514,10 @@ int32_t shellReadCommand(char *command) { ...@@ -472,10 +514,10 @@ int32_t shellReadCommand(char *command) {
break; break;
} }
} else if (c == '\033') { } else if (c == '\033') {
c = (char)getchar(); c = taosGetConsoleChar();
switch (c) { switch (c) {
case '[': case '[':
c = (char)getchar(); c = taosGetConsoleChar();
switch (c) { switch (c) {
case 'A': // Up arrow case 'A': // Up arrow
if (hist_counter != pHistory->hstart) { if (hist_counter != pHistory->hstart) {
...@@ -502,35 +544,35 @@ int32_t shellReadCommand(char *command) { ...@@ -502,35 +544,35 @@ int32_t shellReadCommand(char *command) {
shellMoveCursorLeft(&cmd); shellMoveCursorLeft(&cmd);
break; break;
case '1': case '1':
if ((c = (char)getchar()) == '~') { if ((c = taosGetConsoleChar()) == '~') {
// Home key // Home key
shellPositionCursorHome(&cmd); shellPositionCursorHome(&cmd);
} }
break; break;
case '2': case '2':
if ((c = (char)getchar()) == '~') { if ((c = taosGetConsoleChar()) == '~') {
// Insert key // Insert key
} }
break; break;
case '3': case '3':
if ((c = (char)getchar()) == '~') { if ((c = taosGetConsoleChar()) == '~') {
// Delete key // Delete key
shellDeleteChar(&cmd); shellDeleteChar(&cmd);
} }
break; break;
case '4': case '4':
if ((c = (char)getchar()) == '~') { if ((c = taosGetConsoleChar()) == '~') {
// End key // End key
shellPositionCursorEnd(&cmd); shellPositionCursorEnd(&cmd);
} }
break; break;
case '5': case '5':
if ((c = (char)getchar()) == '~') { if ((c = taosGetConsoleChar()) == '~') {
// Page up key // Page up key
} }
break; break;
case '6': case '6':
if ((c = (char)getchar()) == '~') { if ((c = taosGetConsoleChar()) == '~') {
// Page down key // Page down key
} }
break; break;
......
...@@ -393,15 +393,11 @@ void shellPrintNChar(const char *str, int32_t length, int32_t width) { ...@@ -393,15 +393,11 @@ void shellPrintNChar(const char *str, int32_t length, int32_t width) {
break; break;
} }
int w = 0; int w = 0;
#ifdef WINDOWS
w = bytes;
#else
if(*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r'){ if(*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r'){
w = bytes; w = bytes;
}else{ }else{
w = taosWcharWidth(wc); w = taosWcharWidth(wc);
} }
#endif
pos += bytes; pos += bytes;
if (w <= 0) { if (w <= 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册