提交 0cde7eef 编写于 作者: H hzcheng

Merge branch 'develop' into feature/2.0tsdb

...@@ -63,6 +63,7 @@ int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn) { ...@@ -63,6 +63,7 @@ int32_t tscInitRpc(const char *user, const char *secret, void** pDnodeConn) {
rpcInit.user = (char*)user; rpcInit.user = (char*)user;
rpcInit.idleTime = 2000; rpcInit.idleTime = 2000;
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.secret = secretEncrypt; rpcInit.secret = secretEncrypt;
*pDnodeConn = rpcOpen(&rpcInit); *pDnodeConn = rpcOpen(&rpcInit);
......
...@@ -1144,23 +1144,21 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) { ...@@ -1144,23 +1144,21 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
} }
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) { SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) {
SColumnFilterInfo* pFilter = NULL; if (numOfFilters == 0) {
if (numOfFilters > 0) {
pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo));
} else {
assert(src == NULL); assert(src == NULL);
return NULL; return NULL;
} }
SColumnFilterInfo* pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo));
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters); memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) { for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) { if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1; size_t len = (size_t) pFilter[j].len + 1;
pFilter[j].pz = (int64_t) calloc(1, len);
char* pTmp = calloc(1, len);
pFilter[j].pz = (int64_t) pTmp;
memcpy((char*)pFilter[j].pz, (char*)src->pz, (size_t)len); memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);
} }
} }
......
...@@ -52,7 +52,8 @@ int32_t dnodeInitServer() { ...@@ -52,7 +52,8 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mgmtProcessReqMsgFromDnode; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mgmtProcessReqMsgFromDnode;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mgmtProcessReqMsgFromDnode; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mgmtProcessReqMsgFromDnode;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mgmtProcessReqMsgFromDnode; dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mgmtProcessReqMsgFromDnode;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mgmtProcessReqMsgFromDnode;
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeDnodePort; rpcInit.localPort = tsDnodeDnodePort;
...@@ -163,3 +164,9 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { ...@@ -163,3 +164,9 @@ void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) { void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg); rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg);
} }
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcIpSet ipSet = {0};
dnodeGetMnodeDnodeIpSet(&ipSet);
rpcSendRecv(tsDnodeClientRpc, &ipSet, rpcMsg, rpcRsp);
}
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "trpc.h" #include "trpc.h"
#include "tglobal.h" #include "tglobal.h"
#include "http.h" #include "http.h"
#include "mnode.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeInt.h" #include "dnodeInt.h"
#include "dnodeVRead.h" #include "dnodeVRead.h"
...@@ -138,7 +139,34 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { ...@@ -138,7 +139,34 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
} }
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return TSDB_CODE_SUCCESS; int code = mgmtRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_NOT_READY) return code;
SDMAuthMsg *pMsg = rpcMallocCont(sizeof(SDMAuthMsg));
strcpy(pMsg->user, user);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pMsg;
rpcMsg.contLen = sizeof(SDMAuthMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH;
dTrace("user:%s, send auth msg to mnode", user);
SRpcMsg rpcRsp = {0};
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnode, error:%s", user, tstrerror(rpcRsp.code));
} else {
dTrace("user:%s, auth msg received from mnode", user);
SDMAuthRsp *pRsp = rpcRsp.pCont;
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
*spi = pRsp->spi;
*encrypt = pRsp->encrypt;
}
rpcFreeCont(rpcRsp.pCont);
return rpcRsp.code;
} }
SDnodeStatisInfo dnodeGetStatisInfo() { SDnodeStatisInfo dnodeGetStatisInfo() {
......
...@@ -52,6 +52,7 @@ int32_t dnodeGetDnodeId(); ...@@ -52,6 +52,7 @@ int32_t dnodeGetDnodeId();
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -26,8 +26,10 @@ void mgmtCleanUpSystem(); ...@@ -26,8 +26,10 @@ void mgmtCleanUpSystem();
void mgmtStopSystem(); void mgmtStopSystem();
void sdbUpdateSync(); void sdbUpdateSync();
void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg); int32_t mgmtRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg); void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg);
void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -100,6 +100,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_TABLE, "config-table" ) ...@@ -100,6 +100,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_TABLE, "config-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_VNODE, "config-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_VNODE, "config-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_STATUS, "status" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_STATUS, "status" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_GRANT, "grant" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_GRANT, "grant" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
...@@ -737,6 +738,14 @@ typedef struct { ...@@ -737,6 +738,14 @@ typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
} SMDAlterStreamMsg; } SMDAlterStreamMsg;
typedef struct {
char user[TSDB_USER_LEN + 1];
char spi;
char encrypt;
char secret[TSDB_KEY_LEN + 1];
char ckey[TSDB_KEY_LEN + 1];
} SDMAuthMsg, SDMAuthRsp;
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -41,7 +41,6 @@ ...@@ -41,7 +41,6 @@
typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
//static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg);
static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg);
static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg);
...@@ -343,29 +342,6 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { ...@@ -343,29 +342,6 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
/*
static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
*spi = 1;
*encrypt = 0;
*ckey = 0;
if (!sdbIsMaster()) {
*secret = 0;
return TSDB_CODE_NOT_READY;
}
SUserObj *pUser = mgmtGetUser(user);
if (pUser == NULL) {
*secret = 0;
return TSDB_CODE_INVALID_USER;
} else {
memcpy(secret, pUser->pass, TSDB_KEY_LEN);
mgmtDecUserRef(pUser);
return TSDB_CODE_SUCCESS;
}
}
*/
static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SCMConnectMsg *pConnectMsg = pMsg->pCont; SCMConnectMsg *pConnectMsg = pMsg->pCont;
......
...@@ -37,6 +37,7 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void ...@@ -37,6 +37,7 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg); static void mgmtProcessCreateUserMsg(SQueuedMsg *pMsg);
static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg); static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg);
static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg); static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg);
static void mgmtProcessAuthMsg(SRpcMsg *rpcMsg);
static int32_t mgmtUserActionDestroy(SSdbOper *pOper) { static int32_t mgmtUserActionDestroy(SSdbOper *pOper) {
tfree(pOper->pObj); tfree(pOper->pObj);
...@@ -140,7 +141,8 @@ int32_t mgmtInitUsers() { ...@@ -140,7 +141,8 @@ int32_t mgmtInitUsers() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_USER, mgmtProcessDropUserMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_USER, mgmtProcessDropUserMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_USER, mgmtGetUserMeta); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_USER, mgmtGetUserMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_USER, mgmtRetrieveUsers); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_USER, mgmtRetrieveUsers);
dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_AUTH, mgmtProcessAuthMsg);
mTrace("table:%s, hash is created", tableDesc.tableName); mTrace("table:%s, hash is created", tableDesc.tableName);
return 0; return 0;
} }
...@@ -529,3 +531,40 @@ void mgmtDropAllUsers(SAcctObj *pAcct) { ...@@ -529,3 +531,40 @@ void mgmtDropAllUsers(SAcctObj *pAcct) {
mTrace("acct:%s, all users:%d is dropped from sdb", pAcct->user, numOfUsers); mTrace("acct:%s, all users:%d is dropped from sdb", pAcct->user, numOfUsers);
} }
int32_t mgmtRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (!sdbIsMaster()) {
*secret = 0;
mTrace("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_NOT_READY));
return TSDB_CODE_NOT_READY;
}
SUserObj *pUser = mgmtGetUser(user);
if (pUser == NULL) {
*secret = 0;
mError("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_INVALID_USER));
return TSDB_CODE_INVALID_USER;
} else {
*spi = 1;
*encrypt = 0;
*ckey = 0;
memcpy(secret, pUser->pass, TSDB_KEY_LEN);
mgmtDecUserRef(pUser);
mTrace("user:%s, auth info is returned", user);
return TSDB_CODE_SUCCESS;
}
}
static void mgmtProcessAuthMsg(SRpcMsg *rpcMsg) {
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
SDMAuthMsg *pAuthMsg = rpcMsg->pCont;
SDMAuthRsp *pAuthRsp = rpcMallocCont(sizeof(SDMAuthRsp));
rpcRsp.code = mgmtRetriveAuth(pAuthMsg->user, &pAuthRsp->spi, &pAuthRsp->encrypt, pAuthRsp->secret, pAuthRsp->ckey);
rpcRsp.pCont = pAuthRsp;
rpcRsp.contLen = sizeof(SDMAuthRsp);
rpcSendResponse(&rpcRsp);
}
...@@ -50,11 +50,11 @@ typedef struct tQueryInfo { ...@@ -50,11 +50,11 @@ typedef struct tQueryInfo {
SSchema sch; // schema of tags SSchema sch; // schema of tags
char* q; char* q;
__compar_fn_t compare; // filter function __compar_fn_t compare; // filter function
void* param; // STSchema, void* param; // STSchema
} tQueryInfo; } tQueryInfo;
typedef struct SExprTraverseSupp { typedef struct SExprTraverseSupp {
__result_filter_fn_t fp; __result_filter_fn_t nodeFilterFn;
__do_filter_suppl_fn_t setupInfoFn; __do_filter_suppl_fn_t setupInfoFn;
void * pExtInfo; void * pExtInfo;
} SExprTraverseSupp; } SExprTraverseSupp;
......
...@@ -760,7 +760,7 @@ static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *p ...@@ -760,7 +760,7 @@ static bool filterItem(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *p
assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE); assert(pLeft->nodeType == TSQL_NODE_COL && pRight->nodeType == TSQL_NODE_VALUE);
param->setupInfoFn(pExpr, param->pExtInfo); param->setupInfoFn(pExpr, param->pExtInfo);
return param->fp(pItem, pExpr->_node.info); return param->nodeFilterFn(pItem, pExpr->_node.info);
} }
/** /**
...@@ -801,33 +801,33 @@ static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSki ...@@ -801,33 +801,33 @@ static void tSQLBinaryTraverseOnSkipList(tExprNode *pExpr, SArray *pResult, SSki
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
} }
static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* result) { static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArray* res, __result_filter_fn_t filterFp) {
SSkipListIterator* iter = tSkipListCreateIter(pSkipList); SSkipListIterator* iter = tSkipListCreateIter(pSkipList);
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
bool addToResult = false; bool addToResult = false;
SSkipListNode *pNode = tSkipListIterGet(iter); SSkipListNode *pNode = tSkipListIterGet(iter);
char* pTable = SL_GET_NODE_DATA(pNode); char * pData = SL_GET_NODE_DATA(pNode);
//todo refactor: // todo refactor:
tstr* name = ((STableIndexElem*) pTable)->pTable->name; tstr *name = ((STableIndexElem *)pData)->pTable->name;
// char* name = NULL; // char* name = NULL;
// tsdbGetTableName(tsdb, pTable, &name); // tsdbGetTableName(pQueryInfo->, pTable, &name);
// todo speed up by using hash // todo speed up by using hash
if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) { if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
if (pQueryInfo->optr == TSDB_RELATION_IN) { if (pQueryInfo->optr == TSDB_RELATION_IN) {
addToResult = pQueryInfo->compare(name, pQueryInfo->q); addToResult = pQueryInfo->compare(name, pQueryInfo->q);
} else if(pQueryInfo->optr == TSDB_RELATION_LIKE) { } else if (pQueryInfo->optr == TSDB_RELATION_LIKE) {
addToResult = !pQueryInfo->compare(name, pQueryInfo->q); addToResult = !pQueryInfo->compare(name, pQueryInfo->q);
} }
} else { } else {
// TODO: other columns addToResult = filterFp(pNode, pQueryInfo);
} }
if (addToResult) { if (addToResult) {
taosArrayPush(result, pTable); taosArrayPush(res, pData);
} }
} }
...@@ -851,7 +851,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -851,7 +851,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
param->setupInfoFn(pExpr, param->pExtInfo); param->setupInfoFn(pExpr, param->pExtInfo);
if (pSkipList == NULL) { if (pSkipList == NULL) {
tArrayTraverse(pExpr, param->fp, result); tArrayTraverse(pExpr, param->nodeFilterFn, result);
return; return;
} }
...@@ -859,7 +859,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S ...@@ -859,7 +859,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) { if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) {
tQueryIndexColumn(pSkipList, pQueryInfo, result); tQueryIndexColumn(pSkipList, pQueryInfo, result);
} else { } else {
tQueryIndexlessColumn(pSkipList, pQueryInfo, result); tQueryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
} }
return; return;
......
...@@ -820,7 +820,7 @@ void setCreateDBSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pToken, SCreateDBI ...@@ -820,7 +820,7 @@ void setCreateDBSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pToken, SCreateDBI
pInfo->pDCLInfo->dbOpt = *pDB; pInfo->pDCLInfo->dbOpt = *pDB;
pInfo->pDCLInfo->dbOpt.dbname = *pToken; pInfo->pDCLInfo->dbOpt.dbname = *pToken;
pInfo->pDCLInfo->dbOpt.ignoreExists = (pIgExists->z != NULL); pInfo->pDCLInfo->dbOpt.ignoreExists = pIgExists->n; // sql.y has: ifnotexists(X) ::= IF NOT EXISTS. {X.n = 1;}
} }
void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken *pPwd, SCreateAcctSQL *pAcctInfo) { void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken *pPwd, SCreateAcctSQL *pAcctInfo) {
......
...@@ -2436,8 +2436,16 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2436,8 +2436,16 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
// set the pCtx output buffer position // set the pCtx output buffer position
pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes; pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data + pRec->rows * bytes;
int32_t functionId = pQuery->pSelectExpr[i].base.functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
}
} }
qTrace("QInfo: %p realloc output buffer, new size: %d rows, old:%d, remain:%d", GET_QINFO_ADDR(pRuntimeEnv),
newSize, pRec->capacity, newSize - pRec->rows);
pRec->capacity = newSize; pRec->capacity = newSize;
} }
} }
...@@ -2641,26 +2649,21 @@ static UNUSED_FUNC void printBinaryData(int32_t functionId, char *data, int32_t ...@@ -2641,26 +2649,21 @@ static UNUSED_FUNC void printBinaryData(int32_t functionId, char *data, int32_t
} }
} }
void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOfRows) { void UNUSED_FUNC displayInterResult(SData **pdata, SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfRows) {
#if 0 SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfCols = pQuery->numOfOutput; int32_t numOfCols = pQuery->numOfOutput;
printf("super table query intermediate result, total:%d\n", numOfRows); printf("super table query intermediate result, total:%d\n", numOfRows);
SQInfo * pQInfo = (SQInfo *)(GET_QINFO_ADDR(pQuery));
SMeterObj *pMeterObj = pQInfo->pObj;
for (int32_t j = 0; j < numOfRows; ++j) { for (int32_t j = 0; j < numOfRows; ++j) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
switch (pQuery->pSelectExpr[i].type) { switch (pQuery->pSelectExpr[i].type) {
case TSDB_DATA_TYPE_BINARY: { case TSDB_DATA_TYPE_BINARY: {
int32_t colIndex = pQuery->pSelectExpr[i].base.colInfo.colIndex; // int32_t colIndex = pQuery->pSelectExpr[i].base.colInfo.colIndex;
int32_t type = 0; int32_t type = pQuery->pSelectExpr[i].type;
// } else {
if (TSDB_COL_IS_TAG(pQuery->pSelectExpr[i].base.colInfo.flag)) { // type = pMeterObj->schema[colIndex].type;
type = pQuery->pSelectExpr[i].type; // }
} else {
type = pMeterObj->schema[colIndex].type;
}
printBinaryData(pQuery->pSelectExpr[i].base.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j, printBinaryData(pQuery->pSelectExpr[i].base.functionId, pdata[i]->data + pQuery->pSelectExpr[i].bytes * j,
type); type);
break; break;
...@@ -2682,7 +2685,6 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf ...@@ -2682,7 +2685,6 @@ void UNUSED_FUNC displayInterResult(SData **pdata, SQuery *pQuery, int32_t numOf
} }
printf("\n"); printf("\n");
} }
#endif
} }
typedef struct SCompSupporter { typedef struct SCompSupporter {
...@@ -2950,7 +2952,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2950,7 +2952,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
int64_t endt = taosGetTimestampMs(); int64_t endt = taosGetTimestampMs();
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->num); displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
#endif #endif
qTrace("QInfo:%p result merge completed, elapsed time:%" PRId64 " ms", GET_QINFO_ADDR(pQuery), endt - startt); qTrace("QInfo:%p result merge completed, elapsed time:%" PRId64 " ms", GET_QINFO_ADDR(pQuery), endt - startt);
...@@ -3588,7 +3590,8 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { ...@@ -3588,7 +3590,8 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
if (pTableQueryInfo->queryRangeSet) { if (pTableQueryInfo->queryRangeSet) {
pTableQueryInfo->lastKey = key; pTableQueryInfo->lastKey = key;
} else { } else {
pQuery->window.skey = key; // pQuery->window.skey = key;
pTableQueryInfo->win.skey = key;
STimeWindow win = {.skey = key, .ekey = pQuery->window.ekey}; STimeWindow win = {.skey = key, .ekey = pQuery->window.ekey};
// for too small query range, no data in this interval. // for too small query range, no data in this interval.
...@@ -3608,7 +3611,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { ...@@ -3608,7 +3611,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
SWindowResInfo *pWindowResInfo = &pTableQueryInfo->windowResInfo; SWindowResInfo *pWindowResInfo = &pTableQueryInfo->windowResInfo;
getAlignQueryTimeWindow(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &w); getAlignQueryTimeWindow(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &w);
pWindowResInfo->startTime = pQuery->window.skey; // windowSKey may be 0 in case of 1970 timestamp pWindowResInfo->startTime = pTableQueryInfo->win.skey; // windowSKey may be 0 in case of 1970 timestamp
if (pWindowResInfo->prevSKey == 0) { if (pWindowResInfo->prevSKey == 0) {
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
...@@ -3620,8 +3623,8 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) { ...@@ -3620,8 +3623,8 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
} }
pTableQueryInfo->queryRangeSet = 1; pTableQueryInfo->queryRangeSet = 1;
pTableQueryInfo->lastKey = pQuery->window.skey; pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
pTableQueryInfo->win.skey = pQuery->window.skey; pTableQueryInfo->win.skey = pTableQueryInfo->win.skey;
} }
} }
...@@ -3726,7 +3729,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde ...@@ -3726,7 +3729,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
qTrace("QInfo:%p copy data to query buf completed", pQInfo); qTrace("QInfo:%p copy data to query buf completed", pQInfo);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pQuery, numOfResult); displayInterResult(pQuery->sdata, pRuntimeEnv, numOfResult);
#endif #endif
return numOfResult; return numOfResult;
} }
...@@ -4766,7 +4769,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -4766,7 +4769,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
copyResToQueryResultBuf(pQInfo, pQuery); copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->num); displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
#endif #endif
} else { } else {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
...@@ -4819,7 +4822,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { ...@@ -4819,7 +4822,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
copyResToQueryResultBuf(pQInfo, pQuery); copyResToQueryResultBuf(pQInfo, pQuery);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->num); displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
#endif #endif
} }
} else { // not a interval query } else { // not a interval query
...@@ -5276,26 +5279,26 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5276,26 +5279,26 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
} }
for (int32_t f = 0; f < numOfFilters; ++f) { for (int32_t f = 0; f < numOfFilters; ++f) {
SColumnFilterInfo *pFilterInfo = (SColumnFilterInfo *)pMsg; SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pMsg;
SColumnFilterInfo *pDestFilterInfo = &pColInfo->filters[f];
SColumnFilterInfo *pColFilter = &pColInfo->filters[f];
pDestFilterInfo->filterstr = htons(pFilterInfo->filterstr); pColFilter->filterstr = htons(pFilterMsg->filterstr);
pMsg += sizeof(SColumnFilterInfo); pMsg += sizeof(SColumnFilterInfo);
if (pDestFilterInfo->filterstr) { if (pColFilter->filterstr) {
pDestFilterInfo->len = htobe64(pFilterInfo->len); pColFilter->len = htobe64(pFilterMsg->len);
pDestFilterInfo->pz = (int64_t) calloc(1, pDestFilterInfo->len); pColFilter->pz = (int64_t) calloc(1, pColFilter->len);
memcpy((void *)pDestFilterInfo->pz, pMsg, pDestFilterInfo->len); memcpy((void *)pColFilter->pz, pMsg, pColFilter->len);
pMsg += (pDestFilterInfo->len); pMsg += (pColFilter->len + 1);
} else { } else {
pDestFilterInfo->lowerBndi = htobe64(pFilterInfo->lowerBndi); pColFilter->lowerBndi = htobe64(pFilterMsg->lowerBndi);
pDestFilterInfo->upperBndi = htobe64(pFilterInfo->upperBndi); pColFilter->upperBndi = htobe64(pFilterMsg->upperBndi);
} }
pDestFilterInfo->lowerRelOptr = htons(pFilterInfo->lowerRelOptr); pColFilter->lowerRelOptr = htons(pFilterMsg->lowerRelOptr);
pDestFilterInfo->upperRelOptr = htons(pFilterInfo->upperRelOptr); pColFilter->upperRelOptr = htons(pFilterMsg->upperRelOptr);
} }
} }
...@@ -5692,9 +5695,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5692,9 +5695,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
for (int16_t i = 0; i < numOfCols; ++i) { for (int16_t i = 0; i < numOfCols; ++i) {
pQuery->colList[i] = pQueryMsg->colList[i]; pQuery->colList[i] = pQueryMsg->colList[i];
pQuery->colList[i].filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pQuery->colList[i].numOfFilters);
SColumnInfo *pColInfo = &pQuery->colList[i];
pColInfo->filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pColInfo->numOfFilters);
} }
pQuery->tagColList = pTagCols; pQuery->tagColList = pTagCols;
......
...@@ -183,33 +183,33 @@ bool equal_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) { ...@@ -183,33 +183,33 @@ bool equal_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) {
bool equal_str(SColumnFilterElem *pFilter, char *minval, char *maxval) { bool equal_str(SColumnFilterElem *pFilter, char *minval, char *maxval) {
// query condition string is greater than the max length of string, not qualified data // query condition string is greater than the max length of string, not qualified data
if (pFilter->filterInfo.len > pFilter->bytes) { if (pFilter->filterInfo.len != varDataLen(minval)) {
return false; return false;
} }
return strncmp((char *)pFilter->filterInfo.pz, minval, pFilter->bytes) == 0; return strncmp((char *)pFilter->filterInfo.pz, varDataVal(minval), varDataLen(minval)) == 0;
} }
bool equal_nchar(SColumnFilterElem *pFilter, char *minval, char *maxval) { bool equal_nchar(SColumnFilterElem *pFilter, char *minval, char *maxval) {
// query condition string is greater than the max length of string, not qualified data // query condition string is greater than the max length of string, not qualified data
if (pFilter->filterInfo.len > pFilter->bytes) { if (pFilter->filterInfo.len != varDataLen(minval)) {
return false; return false;
} }
return wcsncmp((wchar_t *)pFilter->filterInfo.pz, (wchar_t*) minval, pFilter->bytes/TSDB_NCHAR_SIZE) == 0; return wcsncmp((wchar_t *)pFilter->filterInfo.pz, varDataVal(minval), varDataLen(minval)/TSDB_NCHAR_SIZE) == 0;
} }
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
bool like_str(SColumnFilterElem *pFilter, char *minval, char *maxval) { bool like_str(SColumnFilterElem *pFilter, char *minval, char *maxval) {
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
return patternMatch((char *)pFilter->filterInfo.pz, minval, pFilter->bytes, &info) == TSDB_PATTERN_MATCH; return patternMatch((char *)pFilter->filterInfo.pz, varDataVal(minval), varDataLen(minval), &info) == TSDB_PATTERN_MATCH;
} }
bool like_nchar(SColumnFilterElem* pFilter, char* minval, char *maxval) { bool like_nchar(SColumnFilterElem* pFilter, char* minval, char *maxval) {
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
return WCSPatternMatch((wchar_t*) pFilter->filterInfo.pz, (wchar_t*) minval, pFilter->bytes/TSDB_NCHAR_SIZE, &info) == TSDB_PATTERN_MATCH; return WCSPatternMatch((wchar_t*) pFilter->filterInfo.pz, varDataVal(minval), varDataLen(minval)/TSDB_NCHAR_SIZE, &info) == TSDB_PATTERN_MATCH;
} }
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
...@@ -270,11 +270,11 @@ bool nequal_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) { ...@@ -270,11 +270,11 @@ bool nequal_dd(SColumnFilterElem *pFilter, char *minval, char *maxval) {
} }
bool nequal_str(SColumnFilterElem *pFilter, char *minval, char *maxval) { bool nequal_str(SColumnFilterElem *pFilter, char *minval, char *maxval) {
if (pFilter->filterInfo.len > pFilter->bytes) { if (pFilter->filterInfo.len != varDataLen(minval)) {
return true; return true;
} }
return strncmp((char *)pFilter->filterInfo.pz, minval, pFilter->bytes) != 0; return strncmp((char *)pFilter->filterInfo.pz, varDataVal(minval), varDataLen(minval)) != 0;
} }
bool nequal_nchar(SColumnFilterElem *pFilter, char* minval, char *maxval) { bool nequal_nchar(SColumnFilterElem *pFilter, char* minval, char *maxval) {
...@@ -282,7 +282,7 @@ bool nequal_nchar(SColumnFilterElem *pFilter, char* minval, char *maxval) { ...@@ -282,7 +282,7 @@ bool nequal_nchar(SColumnFilterElem *pFilter, char* minval, char *maxval) {
return true; return true;
} }
return wcsncmp((wchar_t *)pFilter->filterInfo.pz, (wchar_t*)minval, pFilter->bytes/TSDB_NCHAR_SIZE) != 0; return wcsncmp((wchar_t *)pFilter->filterInfo.pz, varDataVal(minval), varDataLen(minval)/TSDB_NCHAR_SIZE) != 0;
} }
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
......
...@@ -76,4 +76,8 @@ TEST(testCase, patternMatchTest) { ...@@ -76,4 +76,8 @@ TEST(testCase, patternMatchTest) {
str = "carzero"; str = "carzero";
ret = patternMatch("%o", str, strlen(str), &info); ret = patternMatch("%o", str, strlen(str), &info);
EXPECT_EQ(ret, TSDB_PATTERN_MATCH); EXPECT_EQ(ret, TSDB_PATTERN_MATCH);
str = "19";
ret = patternMatch("%9", str, 2, &info);
EXPECT_EQ(ret, TSDB_PATTERN_MATCH);
} }
...@@ -234,7 +234,6 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) { ...@@ -234,7 +234,6 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
} }
} }
// todo refactor table name definition
int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t* type, int16_t* bytes, char** val) { int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t* type, int16_t* bytes, char** val) {
STsdbMeta* pMeta = tsdbGetMeta(repo); STsdbMeta* pMeta = tsdbGetMeta(repo);
STable* pTable = tsdbGetTableByUid(pMeta, id->uid); STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
...@@ -242,6 +241,7 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t ...@@ -242,6 +241,7 @@ int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t
STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable); STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable);
STColumn* pCol = NULL; STColumn* pCol = NULL;
// todo binary search
for(int32_t col = 0; col < schemaNCols(pSchema); ++col) { for(int32_t col = 0; col < schemaNCols(pSchema); ++col) {
STColumn* p = schemaColAt(pSchema, col); STColumn* p = schemaColAt(pSchema, col);
if (p->colId == colId) { if (p->colId == colId) {
......
...@@ -1432,7 +1432,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -1432,7 +1432,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
return pTableGroup; return pTableGroup;
} }
bool tSkipListNodeFilterCallback(const void* pNode, void* param) { bool indexedNodeFilterFp(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*) param; tQueryInfo* pInfo = (tQueryInfo*) param;
STableIndexElem* elem = (STableIndexElem*)(SL_GET_NODE_DATA((SSkipListNode*)pNode)); STableIndexElem* elem = (STableIndexElem*)(SL_GET_NODE_DATA((SSkipListNode*)pNode));
...@@ -1497,7 +1497,7 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) { ...@@ -1497,7 +1497,7 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) { static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) {
// query according to the expression tree // query according to the expression tree
SExprTraverseSupp supp = { SExprTraverseSupp supp = {
.fp = (__result_filter_fn_t) tSkipListNodeFilterCallback, .nodeFilterFn = (__result_filter_fn_t) indexedNodeFilterFp,
.setupInfoFn = filterPrepare, .setupInfoFn = filterPrepare,
.pExtInfo = pSTable->tagSchema, .pExtInfo = pSTable->tagSchema,
}; };
......
...@@ -51,12 +51,21 @@ int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) { ...@@ -51,12 +51,21 @@ int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) {
} }
} }
int32_t compareFloatVal(const void *pLeft, const void *pRight) {
float ret = GET_FLOAT_VAL(pLeft) - GET_FLOAT_VAL(pRight);
if (fabs(ret) < FLT_EPSILON) {
return 0;
} else {
return ret > 0? 1 : -1;
}
}
int32_t compareDoubleVal(const void *pLeft, const void *pRight) { int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
double ret = GET_DOUBLE_VAL(pLeft) - GET_DOUBLE_VAL(pRight); double ret = GET_DOUBLE_VAL(pLeft) - GET_DOUBLE_VAL(pRight);
if (fabs(ret) < FLT_EPSILON) { if (fabs(ret) < FLT_EPSILON) {
return 0; return 0;
} else { } else {
return ret > 0 ? 1 : -1; return ret > 0? 1 : -1;
} }
} }
...@@ -127,7 +136,7 @@ int patternMatch(const char *patterStr, const char *str, size_t size, const SPat ...@@ -127,7 +136,7 @@ int patternMatch(const char *patterStr, const char *str, size_t size, const SPat
size_t n = strcspn(str, next); size_t n = strcspn(str, next);
str += n; str += n;
if (str[0] == 0 || (n >= size - 1)) { if (str[0] == 0 || (n >= size)) {
break; break;
} }
...@@ -175,10 +184,10 @@ int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, c ...@@ -175,10 +184,10 @@ int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, c
wchar_t accept[3] = {towupper(c), towlower(c), 0}; wchar_t accept[3] = {towupper(c), towlower(c), 0};
while (1) { while (1) {
size_t n = wcsspn(str, accept); size_t n = wcscspn(str, accept);
str += n; str += n;
if (str[0] == 0 || (n >= size - 1)) { if (str[0] == 0 || (n >= size)) {
break; break;
} }
...@@ -257,7 +266,7 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) { ...@@ -257,7 +266,7 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) {
} }
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
comparFn = compareDoubleVal; break; comparFn = compareFloatVal; break;
} }
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
...@@ -313,6 +322,8 @@ __compar_fn_t getKeyComparFunc(int32_t keyType) { ...@@ -313,6 +322,8 @@ __compar_fn_t getKeyComparFunc(int32_t keyType) {
comparFn = compareInt64Val; comparFn = compareInt64Val;
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
comparFn = compareFloatVal;
break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
comparFn = compareDoubleVal; comparFn = compareDoubleVal;
break; break;
......
...@@ -406,7 +406,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -406,7 +406,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
static void vnodeNotifyFileSynced(void *ahandle) { static void vnodeNotifyFileSynced(void *ahandle) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
vTrace("pVnode:%p vgId:%d, data file is synced", pVnode, pVnode->vgId); vTrace("vgId:%d, data file is synced", pVnode->vgId);
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir); sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
......
...@@ -65,7 +65,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -65,7 +65,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp; pRet->rsp = pRsp;
vTrace("vgId:%d QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
} else { } else {
pQInfo = pCont; pQInfo = pCont;
code = TSDB_CODE_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
...@@ -83,7 +83,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -83,7 +83,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
vTrace("vgId:%d QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
pRet->code = qRetrieveQueryResultInfo(pQInfo); pRet->code = qRetrieveQueryResultInfo(pQInfo);
if (pRet->code != TSDB_CODE_SUCCESS) { if (pRet->code != TSDB_CODE_SUCCESS) {
...@@ -104,6 +104,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -104,6 +104,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
} }
} }
vTrace("vgId:%d QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo);
return code; return code;
} }
...@@ -51,10 +51,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -51,10 +51,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED; return TSDB_CODE_MSG_NOT_PROCESSED;
if (pVnode->status != TAOS_VN_STATUS_READY && qtype == TAOS_QTYPE_RPC) if (pHead->version == 0) { // from client or CQ
return TSDB_CODE_NOT_ACTIVE_VNODE;
if (pHead->version == 0) { // from client
if (pVnode->status != TAOS_VN_STATUS_READY) if (pVnode->status != TAOS_VN_STATUS_READY)
return TSDB_CODE_NOT_ACTIVE_VNODE; return TSDB_CODE_NOT_ACTIVE_VNODE;
...@@ -64,12 +61,10 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -64,12 +61,10 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
// assign version // assign version
pVnode->version++; pVnode->version++;
pHead->version = pVnode->version; pHead->version = pVnode->version;
} else { } else { // from wal or forward
// for data from WAL or forward, version may be smaller // for data from WAL or forward, version may be smaller
if (pHead->version <= pVnode->version) return 0; if (pHead->version <= pVnode->version) return 0;
} }
// more status and role checking here
pVnode->version = pHead->version; pVnode->version = pHead->version;
...@@ -77,9 +72,13 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -77,9 +72,13 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
if (code < 0) return code; if (code < 0) return code;
int32_t syncCode = syncForwardToPeer(pVnode->sync, pHead, item); // forward to peers if data is from RPC or CQ
int32_t syncCode = 0;
if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_CQ)
syncCode = syncForwardToPeer(pVnode->sync, pHead, item);
if (syncCode < 0) return syncCode; if (syncCode < 0) return syncCode;
// write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
if (code < 0) return code; if (code < 0) return code;
......
...@@ -81,7 +81,7 @@ sql_error select diff(c8) from $tb ...@@ -81,7 +81,7 @@ sql_error select diff(c8) from $tb
sql_error select diff(c9) from $tb sql_error select diff(c9) from $tb
sql_error select diff(ts) from $tb sql_error select diff(ts) from $tb
sql_error select diff(c1), diff(c2) from $tb sql_error select diff(c1), diff(c2) from $tb
sql_error select 2+diff(c1) from $tb #sql_error select 2+diff(c1) from $tb
sql_error select diff(c1+2) from $tb sql_error select diff(c1+2) from $tb
sql_error select diff(c1) from $tb where ts > 0 and ts < now + 100m interval(10m) sql_error select diff(c1) from $tb where ts > 0 and ts < now + 100m interval(10m)
sql_error select diff(c1) from $mt sql_error select diff(c1) from $mt
......
...@@ -38,8 +38,16 @@ sleep 2000 ...@@ -38,8 +38,16 @@ sleep 2000
run general/parser/lastrow.sim run general/parser/lastrow.sim
sleep 2000 sleep 2000
run general/parser/nchar.sim run general/parser/nchar.sim
sleep 2000 sleep 2000
run general/parser/null_char.sim run general/parser/null_char.sim
sleep 2000
run general/parser/single_row_in_tb.sim
sleep 2000
run general/parser/select_from_cache_disk.sim
sleep 2000
run general/parser/limit.sim
sleep 2000 sleep 2000
run general/parser/fill.sim run general/parser/fill.sim
sleep 2000 sleep 2000
...@@ -49,8 +57,6 @@ run general/parser/tags_dynamically_specifiy.sim ...@@ -49,8 +57,6 @@ run general/parser/tags_dynamically_specifiy.sim
sleep 2000 sleep 2000
run general/parser/interp.sim run general/parser/interp.sim
sleep 2000 sleep 2000
run general/parser/limit.sim
sleep 2000
run general/parser/limit1.sim run general/parser/limit1.sim
sleep 2000 sleep 2000
run general/parser/limit1_tblocks100.sim run general/parser/limit1_tblocks100.sim
...@@ -63,12 +69,8 @@ run general/parser/selectResNum.sim ...@@ -63,12 +69,8 @@ run general/parser/selectResNum.sim
sleep 2000 sleep 2000
run general/parser/select_across_vnodes.sim run general/parser/select_across_vnodes.sim
sleep 2000 sleep 2000
run general/parser/select_from_cache_disk.sim
sleep 2000
run general/parser/set_tag_vals.sim run general/parser/set_tag_vals.sim
sleep 2000 sleep 2000
run general/parser/single_row_in_tb.sim
sleep 2000
run general/parser/slimit.sim run general/parser/slimit.sim
sleep 2000 sleep 2000
run general/parser/slimit1.sim run general/parser/slimit1.sim
......
...@@ -134,7 +134,8 @@ if $data01 != 1 then ...@@ -134,7 +134,8 @@ if $data01 != 1 then
return -1 return -1
endi endi
sql select * from $mt where tgcol = 0 sql select * from $mt where tgcol = 0
if $rows != 0 then if $rows != 0 then
print expect 0, actual: $rows
return -1 return -1
endi endi
...@@ -184,7 +185,8 @@ sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol bool, tgcol2 bool) ...@@ -184,7 +185,8 @@ sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol bool, tgcol2 bool)
sql create table $tb using $mt tags( 1, 2 ) sql create table $tb using $mt tags( 1, 2 )
sql insert into $tb values(now, 1) sql insert into $tb values(now, 1)
sql select * from $mt where tgcol2 = 2 sql select * from $mt where tgcol2 = 2
if $rows != 1 then if $rows != 1 then
print expect 1, actual: $rows
return -1 return -1
endi endi
if $data01 != 1 then if $data01 != 1 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册