提交 fbb5debc 编写于 作者: D dapan1121

support udf

上级 03a6ba65
...@@ -275,13 +275,24 @@ int32_t readFromFile(char *name, uint32_t *len, void **buf) { ...@@ -275,13 +275,24 @@ int32_t readFromFile(char *name, uint32_t *len, void **buf) {
int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) { int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char *msg1 = "function name is too long";
SSqlCmd *pCmd = &pSql->cmd;
switch (pInfo->type) { switch (pInfo->type) {
case TSDB_SQL_CREATE_FUNCTION: case TSDB_SQL_CREATE_FUNCTION: {
SCreateFuncInfo *createInfo = &pInfo->pMiscInfo->funcOpt; SCreateFuncInfo *createInfo = &pInfo->pMiscInfo->funcOpt;
SCreateFuncMsg *pMsg = (SCreateFuncMsg *)pSql->cmd.payload; SCreateFuncMsg *pMsg = (SCreateFuncMsg *)pSql->cmd.payload;
int32_t len = 0; uint32_t len = 0;
void *buf = NULL; void *buf = NULL;
strdequote(createInfo->name.z);
if (strlen(createInfo->name.z) >= TSDB_FUNC_NAME_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
strcpy(pMsg->name, createInfo->name.z);
createInfo->path.z[createInfo->path.n] = 0; createInfo->path.z[createInfo->path.n] = 0;
strdequote(createInfo->path.z); strdequote(createInfo->path.z);
...@@ -302,11 +313,25 @@ int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -302,11 +313,25 @@ int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} }
pMsg->codeLen = htonl(len); pMsg->codeLen = htonl(len);
memcpy(pMsg->code, *buf, len); memcpy(pMsg->code, buf, len);
break; break;
case TSDB_SQL_DROP_FUNCTION: }
case TSDB_SQL_DROP_FUNCTION: {
SStrToken* t0 = taosArrayGet(pInfo->pMiscInfo->a, 0);
SDropFuncMsg *pMsg = (SDropFuncMsg *)pSql->cmd.payload;
strdequote(t0->z);
if (strlen(t0->z) >= TSDB_FUNC_NAME_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
strcpy(pMsg->name, t0->z);
break;
}
default: default:
return TSDB_CODE_TSC_APP_ERROR; return TSDB_CODE_TSC_APP_ERROR;
} }
...@@ -428,8 +453,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -428,8 +453,9 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_CREATE_FUNCTION: case TSDB_SQL_CREATE_FUNCTION:
case TSDB_SQL_DROP_FUNCTION: { case TSDB_SQL_DROP_FUNCTION: {
if (handleCreateFunc(pSql, pInfo) != TSDB_CODE_SUCCESS) { code = handleCreateFunc(pSql, pInfo);
return TSDB_CODE_TSC_INVALID_SQL; if (code != TSDB_CODE_SUCCESS) {
return code;
} }
break; break;
......
...@@ -522,7 +522,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -522,7 +522,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex); tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qhandle);
} else { } else {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(vgIndex >= 0 && vgIndex < numOfVgroups); assert(vgIndex >= 0 && vgIndex < numOfVgroups);
...@@ -530,12 +530,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -530,12 +530,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex); tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qhandle);
} }
} else { } else {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgId); tscDebug("%p build fetch msg from only one vgroup, vgId:%d, qhandle:%" PRIX64, pSql, pTableMeta->vgId, pSql->res.qhandle);
} }
pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg); pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
...@@ -1068,6 +1068,18 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1068,6 +1068,18 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscBuildCreateFuncMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
SCreateFuncMsg *pCreateFuncMsg = (SCreateFuncMsg *)pCmd->payload;
pCmd->msgType = TSDB_MSG_TYPE_CM_CREATE_FUNCTION;
pCmd->payloadLen = sizeof(SCreateFuncMsg) + htonl(pCreateFuncMsg->codeLen);
return TSDB_CODE_SUCCESS;
}
int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCreateDnodeMsg); pCmd->payloadLen = sizeof(SCreateDnodeMsg);
...@@ -1192,6 +1204,17 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1192,6 +1204,17 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscBuildDropFuncMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_FUNCTION;
pCmd->payloadLen = sizeof(SDropFuncMsg);
return TSDB_CODE_SUCCESS;
}
int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCMDropTableMsg); pCmd->payloadLen = sizeof(SCMDropTableMsg);
...@@ -2533,6 +2556,7 @@ void tscInitMsgsFp() { ...@@ -2533,6 +2556,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg;
tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg; tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg; tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
...@@ -2541,6 +2565,7 @@ void tscInitMsgsFp() { ...@@ -2541,6 +2565,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg; tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg; tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg; tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg;
tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg; tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg; tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg; tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
......
...@@ -48,8 +48,10 @@ int32_t dnodeInitShell() { ...@@ -48,8 +48,10 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMWriteQueue;
......
...@@ -28,7 +28,7 @@ typedef void* qinfo_t; ...@@ -28,7 +28,7 @@ typedef void* qinfo_t;
* @param qinfo * @param qinfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo); int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t *qId);
/** /**
...@@ -88,9 +88,10 @@ void* qOpenQueryMgmt(int32_t vgId); ...@@ -88,9 +88,10 @@ void* qOpenQueryMgmt(int32_t vgId);
void qQueryMgmtNotifyClosed(void* pExecutor); void qQueryMgmtNotifyClosed(void* pExecutor);
void qQueryMgmtReOpen(void *pExecutor); void qQueryMgmtReOpen(void *pExecutor);
void qCleanupQueryMgmt(void* pExecutor); void qCleanupQueryMgmt(void* pExecutor);
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo); void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key); void** qAcquireQInfo(void* pMgmt, uint64_t key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle); void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
bool checkQIdEqual(void *qHandle, uint64_t qId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -183,6 +183,7 @@ do { \ ...@@ -183,6 +183,7 @@ do { \
#define TSDB_NODE_NAME_LEN 64 #define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 33 #define TSDB_DB_NAME_LEN 33
#define TSDB_FUNC_NAME_LEN 128
#define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN) #define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
#define TSDB_COL_NAME_LEN 65 #define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
......
...@@ -74,7 +74,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_USER, "drop-user" ) ...@@ -74,7 +74,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_USER, "drop-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DNODE, "create-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DNODE, "create-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DNODE, "drop-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DNODE, "drop-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DB, "create-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DB, "create-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_FUNCTION, "create-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TABLE, "create-table" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TABLE, "create-table" )
...@@ -569,10 +571,16 @@ typedef struct { ...@@ -569,10 +571,16 @@ typedef struct {
} SCreateDbMsg, SAlterDbMsg; } SCreateDbMsg, SAlterDbMsg;
typedef struct { typedef struct {
char name[TSDB_FUNC_NAME_LEN];
int32_t codeLen; int32_t codeLen;
char code[]; char code[];
} SCreateFuncMsg; } SCreateFuncMsg;
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
} SDropFuncMsg;
typedef struct { typedef struct {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
uint8_t ignoreNotExists; uint8_t ignoreNotExists;
......
...@@ -63,172 +63,175 @@ ...@@ -63,172 +63,175 @@
#define TK_SHOW 44 #define TK_SHOW 44
#define TK_DATABASES 45 #define TK_DATABASES 45
#define TK_TOPICS 46 #define TK_TOPICS 46
#define TK_MNODES 47 #define TK_FUNCTIONS 47
#define TK_DNODES 48 #define TK_MNODES 48
#define TK_ACCOUNTS 49 #define TK_DNODES 49
#define TK_USERS 50 #define TK_ACCOUNTS 50
#define TK_MODULES 51 #define TK_USERS 51
#define TK_QUERIES 52 #define TK_MODULES 52
#define TK_CONNECTIONS 53 #define TK_QUERIES 53
#define TK_STREAMS 54 #define TK_CONNECTIONS 54
#define TK_VARIABLES 55 #define TK_STREAMS 55
#define TK_SCORES 56 #define TK_VARIABLES 56
#define TK_GRANTS 57 #define TK_SCORES 57
#define TK_VNODES 58 #define TK_GRANTS 58
#define TK_IPTOKEN 59 #define TK_VNODES 59
#define TK_DOT 60 #define TK_IPTOKEN 60
#define TK_CREATE 61 #define TK_DOT 61
#define TK_TABLE 62 #define TK_CREATE 62
#define TK_DATABASE 63 #define TK_TABLE 63
#define TK_TABLES 64 #define TK_DATABASE 64
#define TK_STABLES 65 #define TK_TABLES 65
#define TK_VGROUPS 66 #define TK_STABLES 66
#define TK_DROP 67 #define TK_VGROUPS 67
#define TK_STABLE 68 #define TK_DROP 68
#define TK_TOPIC 69 #define TK_STABLE 69
#define TK_DNODE 70 #define TK_TOPIC 70
#define TK_USER 71 #define TK_FUNCTION 71
#define TK_ACCOUNT 72 #define TK_DNODE 72
#define TK_USE 73 #define TK_USER 73
#define TK_DESCRIBE 74 #define TK_ACCOUNT 74
#define TK_ALTER 75 #define TK_USE 75
#define TK_PASS 76 #define TK_DESCRIBE 76
#define TK_PRIVILEGE 77 #define TK_ALTER 77
#define TK_LOCAL 78 #define TK_PASS 78
#define TK_IF 79 #define TK_PRIVILEGE 79
#define TK_EXISTS 80 #define TK_LOCAL 80
#define TK_PPS 81 #define TK_IF 81
#define TK_TSERIES 82 #define TK_EXISTS 82
#define TK_DBS 83 #define TK_AS 83
#define TK_STORAGE 84 #define TK_PPS 84
#define TK_QTIME 85 #define TK_TSERIES 85
#define TK_CONNS 86 #define TK_DBS 86
#define TK_STATE 87 #define TK_STORAGE 87
#define TK_KEEP 88 #define TK_QTIME 88
#define TK_CACHE 89 #define TK_CONNS 89
#define TK_REPLICA 90 #define TK_STATE 90
#define TK_QUORUM 91 #define TK_KEEP 91
#define TK_DAYS 92 #define TK_CACHE 92
#define TK_MINROWS 93 #define TK_REPLICA 93
#define TK_MAXROWS 94 #define TK_QUORUM 94
#define TK_BLOCKS 95 #define TK_DAYS 95
#define TK_CTIME 96 #define TK_MINROWS 96
#define TK_WAL 97 #define TK_MAXROWS 97
#define TK_FSYNC 98 #define TK_BLOCKS 98
#define TK_COMP 99 #define TK_CTIME 99
#define TK_PRECISION 100 #define TK_WAL 100
#define TK_UPDATE 101 #define TK_FSYNC 101
#define TK_CACHELAST 102 #define TK_COMP 102
#define TK_PARTITIONS 103 #define TK_PRECISION 103
#define TK_LP 104 #define TK_UPDATE 104
#define TK_RP 105 #define TK_CACHELAST 105
#define TK_UNSIGNED 106 #define TK_PARTITIONS 106
#define TK_TAGS 107 #define TK_LP 107
#define TK_USING 108 #define TK_RP 108
#define TK_COMMA 109 #define TK_UNSIGNED 109
#define TK_AS 110 #define TK_TAGS 110
#define TK_NULL 111 #define TK_USING 111
#define TK_SELECT 112 #define TK_COMMA 112
#define TK_UNION 113 #define TK_NULL 113
#define TK_ALL 114 #define TK_SELECT 114
#define TK_DISTINCT 115 #define TK_UNION 115
#define TK_FROM 116 #define TK_ALL 116
#define TK_VARIABLE 117 #define TK_DISTINCT 117
#define TK_INTERVAL 118 #define TK_FROM 118
#define TK_FILL 119 #define TK_VARIABLE 119
#define TK_SLIDING 120 #define TK_INTERVAL 120
#define TK_ORDER 121 #define TK_FILL 121
#define TK_BY 122 #define TK_SLIDING 122
#define TK_ASC 123 #define TK_ORDER 123
#define TK_DESC 124 #define TK_BY 124
#define TK_GROUP 125 #define TK_ASC 125
#define TK_HAVING 126 #define TK_DESC 126
#define TK_LIMIT 127 #define TK_GROUP 127
#define TK_OFFSET 128 #define TK_HAVING 128
#define TK_SLIMIT 129 #define TK_LIMIT 129
#define TK_SOFFSET 130 #define TK_OFFSET 130
#define TK_WHERE 131 #define TK_SLIMIT 131
#define TK_NOW 132 #define TK_SOFFSET 132
#define TK_RESET 133 #define TK_WHERE 133
#define TK_QUERY 134 #define TK_NOW 134
#define TK_ADD 135 #define TK_RESET 135
#define TK_COLUMN 136 #define TK_QUERY 136
#define TK_TAG 137 #define TK_ADD 137
#define TK_CHANGE 138 #define TK_COLUMN 138
#define TK_SET 139 #define TK_TAG 139
#define TK_KILL 140 #define TK_CHANGE 140
#define TK_CONNECTION 141 #define TK_SET 141
#define TK_STREAM 142 #define TK_KILL 142
#define TK_COLON 143 #define TK_CONNECTION 143
#define TK_ABORT 144 #define TK_STREAM 144
#define TK_AFTER 145 #define TK_COLON 145
#define TK_ATTACH 146 #define TK_ABORT 146
#define TK_BEFORE 147 #define TK_AFTER 147
#define TK_BEGIN 148 #define TK_ATTACH 148
#define TK_CASCADE 149 #define TK_BEFORE 149
#define TK_CLUSTER 150 #define TK_BEGIN 150
#define TK_CONFLICT 151 #define TK_CASCADE 151
#define TK_COPY 152 #define TK_CLUSTER 152
#define TK_DEFERRED 153 #define TK_CONFLICT 153
#define TK_DELIMITERS 154 #define TK_COPY 154
#define TK_DETACH 155 #define TK_DEFERRED 155
#define TK_EACH 156 #define TK_DELIMITERS 156
#define TK_END 157 #define TK_DETACH 157
#define TK_EXPLAIN 158 #define TK_EACH 158
#define TK_FAIL 159 #define TK_END 159
#define TK_FOR 160 #define TK_EXPLAIN 160
#define TK_IGNORE 161 #define TK_FAIL 161
#define TK_IMMEDIATE 162 #define TK_FOR 162
#define TK_INITIALLY 163 #define TK_IGNORE 163
#define TK_INSTEAD 164 #define TK_IMMEDIATE 164
#define TK_MATCH 165 #define TK_INITIALLY 165
#define TK_KEY 166 #define TK_INSTEAD 166
#define TK_OF 167 #define TK_MATCH 167
#define TK_RAISE 168 #define TK_KEY 168
#define TK_REPLACE 169 #define TK_OF 169
#define TK_RESTRICT 170 #define TK_RAISE 170
#define TK_ROW 171 #define TK_REPLACE 171
#define TK_STATEMENT 172 #define TK_RESTRICT 172
#define TK_TRIGGER 173 #define TK_ROW 173
#define TK_VIEW 174 #define TK_STATEMENT 174
#define TK_COUNT 175 #define TK_TRIGGER 175
#define TK_SUM 176 #define TK_VIEW 176
#define TK_AVG 177 #define TK_COUNT 177
#define TK_MIN 178 #define TK_SUM 178
#define TK_MAX 179 #define TK_AVG 179
#define TK_FIRST 180 #define TK_MIN 180
#define TK_LAST 181 #define TK_MAX 181
#define TK_TOP 182 #define TK_FIRST 182
#define TK_BOTTOM 183 #define TK_LAST 183
#define TK_STDDEV 184 #define TK_TOP 184
#define TK_PERCENTILE 185 #define TK_BOTTOM 185
#define TK_APERCENTILE 186 #define TK_STDDEV 186
#define TK_LEASTSQUARES 187 #define TK_PERCENTILE 187
#define TK_HISTOGRAM 188 #define TK_APERCENTILE 188
#define TK_DIFF 189 #define TK_LEASTSQUARES 189
#define TK_SPREAD 190 #define TK_HISTOGRAM 190
#define TK_TWA 191 #define TK_DIFF 191
#define TK_INTERP 192 #define TK_SPREAD 192
#define TK_LAST_ROW 193 #define TK_TWA 193
#define TK_RATE 194 #define TK_INTERP 194
#define TK_IRATE 195 #define TK_LAST_ROW 195
#define TK_SUM_RATE 196 #define TK_RATE 196
#define TK_SUM_IRATE 197 #define TK_IRATE 197
#define TK_AVG_RATE 198 #define TK_SUM_RATE 198
#define TK_AVG_IRATE 199 #define TK_SUM_IRATE 199
#define TK_TBID 200 #define TK_AVG_RATE 200
#define TK_SEMI 201 #define TK_AVG_IRATE 201
#define TK_NONE 202 #define TK_TBID 202
#define TK_PREV 203 #define TK_SEMI 203
#define TK_LINEAR 204 #define TK_NONE 204
#define TK_IMPORT 205 #define TK_PREV 205
#define TK_METRIC 206 #define TK_LINEAR 206
#define TK_TBNAME 207 #define TK_IMPORT 207
#define TK_JOIN 208 #define TK_METRIC 208
#define TK_METRICS 209 #define TK_TBNAME 209
#define TK_INSERT 210 #define TK_JOIN 210
#define TK_INTO 211 #define TK_METRICS 211
#define TK_VALUES 212 #define TK_INSERT 212
#define TK_INTO 213
#define TK_VALUES 214
......
...@@ -44,12 +44,16 @@ void * tsDbSdb = NULL; ...@@ -44,12 +44,16 @@ void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize; static int32_t tsDbUpdateSize;
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *pMsg); static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *pMsg);
static int32_t mnodeCreateFunc(SAcctObj *pAcct, SCreateFuncMsg *pCreate, SMnodeMsg *pMsg);
static int32_t mnodeDropDb(SMnodeMsg *newMsg); static int32_t mnodeDropDb(SMnodeMsg *newMsg);
static int32_t mnodeSetDbDropping(SDbObj *pDb); static int32_t mnodeSetDbDropping(SDbObj *pDb);
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCreateFuncMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropFuncMsg(SMnodeMsg *pMsg);
int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg); int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
#ifndef _TOPIC #ifndef _TOPIC
...@@ -176,8 +180,10 @@ int32_t mnodeInitDbs() { ...@@ -176,8 +180,10 @@ int32_t mnodeInitDbs() {
} }
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DB, mnodeProcessCreateDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DB, mnodeProcessCreateDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_CREATE_FUNCTION, mnodeProcessCreateFuncMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg); mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_FUNCTION, mnodeProcessDropFuncMsg);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb); mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
...@@ -463,6 +469,17 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg * ...@@ -463,6 +469,17 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
return code; return code;
} }
static int32_t mnodeCreateFunc(SAcctObj *pAcct, SCreateFuncMsg *pCreate, SMnodeMsg *pMsg) {
int32_t code = acctCheck(pAcct, ACCT_GRANT_DB);
if (code != 0) return code;
mError("Function name:%s, code:%.*s", pCreate->name, pCreate->codeLen, pCreate->code);
return code;
}
bool mnodeCheckIsMonitorDB(char *db, char *monitordb) { bool mnodeCheckIsMonitorDB(char *db, char *monitordb) {
char dbName[TSDB_DB_NAME_LEN] = {0}; char dbName[TSDB_DB_NAME_LEN] = {0};
extractDBName(db, dbName); extractDBName(db, dbName);
...@@ -891,6 +908,23 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) { ...@@ -891,6 +908,23 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
return code; return code;
} }
static int32_t mnodeProcessCreateFuncMsg(SMnodeMsg *pMsg) {
SCreateFuncMsg *pCreate = pMsg->rpcMsg.pCont;
pCreate->codeLen = htonl(pCreate->codeLen);
int32_t code;
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_GRANT_EXPIRED;
} else if (!pMsg->pUser->writeAuth) {
code = TSDB_CODE_MND_NO_RIGHTS;
} else {
code = mnodeCreateFunc(pMsg->pUser->pAcct, pCreate, pMsg);
}
return code;
}
static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) { static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
SDbCfg newCfg = pDb->cfg; SDbCfg newCfg = pDb->cfg;
int32_t maxTables = htonl(pAlter->maxTables); int32_t maxTables = htonl(pAlter->maxTables);
...@@ -1184,6 +1218,15 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { ...@@ -1184,6 +1218,15 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
return mnodeDropDb(pMsg); return mnodeDropDb(pMsg);
} }
static int32_t mnodeProcessDropFuncMsg(SMnodeMsg *pMsg) {
SDropFuncMsg *pDrop = pMsg->rpcMsg.pCont;
mError("drop function:%s", pDrop->name);
return TSDB_CODE_SUCCESS;
}
void mnodeDropAllDbs(SAcctObj *pAcct) { void mnodeDropAllDbs(SAcctObj *pAcct) {
int32_t numOfDbs = 0; int32_t numOfDbs = 0;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
......
...@@ -262,6 +262,7 @@ enum { ...@@ -262,6 +262,7 @@ enum {
typedef struct SQInfo { typedef struct SQInfo {
void* signature; void* signature;
uint64_t qId;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
void* tsdb; void* tsdb;
...@@ -311,7 +312,7 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu ...@@ -311,7 +312,7 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutpu
SColumnInfo* pTagCols); SColumnInfo* pTagCols);
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql); SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId);
int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable); int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
......
...@@ -65,6 +65,7 @@ program ::= cmd. {} ...@@ -65,6 +65,7 @@ program ::= cmd. {}
//////////////////////////////////THE SHOW STATEMENT/////////////////////////////////////////// //////////////////////////////////THE SHOW STATEMENT///////////////////////////////////////////
cmd ::= SHOW DATABASES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_DB, 0, 0);} cmd ::= SHOW DATABASES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_DB, 0, 0);}
cmd ::= SHOW TOPICS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_TP, 0, 0);} cmd ::= SHOW TOPICS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_TP, 0, 0);}
cmd ::= SHOW FUNCTIONS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_FUNCTION, 0, 0);}
cmd ::= SHOW MNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_MNODE, 0, 0);} cmd ::= SHOW MNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_MNODE, 0, 0);}
cmd ::= SHOW DNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_DNODE, 0, 0);} cmd ::= SHOW DNODES. { setShowOptions(pInfo, TSDB_MGMT_TABLE_DNODE, 0, 0);}
cmd ::= SHOW ACCOUNTS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_ACCT, 0, 0);} cmd ::= SHOW ACCOUNTS. { setShowOptions(pInfo, TSDB_MGMT_TABLE_ACCT, 0, 0);}
...@@ -143,6 +144,7 @@ cmd ::= DROP STABLE ifexists(Y) ids(X) cpxName(Z). { ...@@ -143,6 +144,7 @@ cmd ::= DROP STABLE ifexists(Y) ids(X) cpxName(Z). {
cmd ::= DROP DATABASE ifexists(Y) ids(X). { setDropDbTableInfo(pInfo, TSDB_SQL_DROP_DB, &X, &Y, TSDB_DB_TYPE_DEFAULT, -1); } cmd ::= DROP DATABASE ifexists(Y) ids(X). { setDropDbTableInfo(pInfo, TSDB_SQL_DROP_DB, &X, &Y, TSDB_DB_TYPE_DEFAULT, -1); }
cmd ::= DROP TOPIC ifexists(Y) ids(X). { setDropDbTableInfo(pInfo, TSDB_SQL_DROP_DB, &X, &Y, TSDB_DB_TYPE_TOPIC, -1); } cmd ::= DROP TOPIC ifexists(Y) ids(X). { setDropDbTableInfo(pInfo, TSDB_SQL_DROP_DB, &X, &Y, TSDB_DB_TYPE_TOPIC, -1); }
cmd ::= DROP FUNCTION ids(X). { setDropFuncInfo(pInfo, TSDB_SQL_DROP_FUNCTION, &X); }
cmd ::= DROP DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_DNODE, 1, &X); } cmd ::= DROP DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_DNODE, 1, &X); }
cmd ::= DROP USER ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_USER, 1, &X); } cmd ::= DROP USER ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_DROP_USER, 1, &X); }
...@@ -192,6 +194,7 @@ cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). ...@@ -192,6 +194,7 @@ cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z).
{ setCreateAcctSql(pInfo, TSDB_SQL_CREATE_ACCT, &X, &Y, &Z);} { setCreateAcctSql(pInfo, TSDB_SQL_CREATE_ACCT, &X, &Y, &Z);}
cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDbInfo(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);} cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDbInfo(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
cmd ::= CREATE TOPIC ifnotexists(Z) ids(X) topic_optr(Y). { setCreateDbInfo(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);} cmd ::= CREATE TOPIC ifnotexists(Z) ids(X) topic_optr(Y). { setCreateDbInfo(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
cmd ::= CREATE FUNCTION ids(X) AS ids(Y). { setCreateFuncInfo(pInfo, TSDB_SQL_CREATE_FUNCTION, &X, &Y);}
cmd ::= CREATE USER ids(X) PASS ids(Y). { setCreateUserSql(pInfo, &X, &Y);} cmd ::= CREATE USER ids(X) PASS ids(Y). { setCreateUserSql(pInfo, &X, &Y);}
pps(Y) ::= . { Y.n = 0; } pps(Y) ::= . { Y.n = 0; }
......
...@@ -112,6 +112,9 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { ...@@ -112,6 +112,9 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv); static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
uint64_t queryHandleId = 0;
int32_t getMaximumIdleDurationSec() { int32_t getMaximumIdleDurationSec() {
return tsShellActivityTimer * 2; return tsShellActivityTimer * 2;
} }
...@@ -6570,8 +6573,12 @@ static void calResultBufSize(SQuery* pQuery) { ...@@ -6570,8 +6573,12 @@ static void calResultBufSize(SQuery* pQuery) {
} }
} }
FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return ((SQInfo *)qHandle)->qId == qId;
}
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql) { SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId) {
int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfCols = pQueryMsg->numOfCols;
int16_t numOfOutput = pQueryMsg->numOfOutput; int16_t numOfOutput = pQueryMsg->numOfOutput;
...@@ -6749,8 +6756,10 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr ...@@ -6749,8 +6756,10 @@ SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr
// todo refactor // todo refactor
pQInfo->runtimeEnv.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); pQInfo->runtimeEnv.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1);
*qId = pQInfo->qId;
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
return pQInfo; return pQInfo;
_cleanup_qinfo: _cleanup_qinfo:
......
...@@ -243,7 +243,9 @@ static SKeyword keywordTable[] = { ...@@ -243,7 +243,9 @@ static SKeyword keywordTable[] = {
{"DISTINCT", TK_DISTINCT}, {"DISTINCT", TK_DISTINCT},
{"PARTITIONS", TK_PARTITIONS}, {"PARTITIONS", TK_PARTITIONS},
{"TOPIC", TK_TOPIC}, {"TOPIC", TK_TOPIC},
{"TOPICS", TK_TOPICS} {"TOPICS", TK_TOPICS},
{"FUNCTION", TK_FUNCTION},
{"FUNCTIONS", TK_FUNCTIONS}
}; };
static const char isIdChar[] = { static const char isIdChar[] = {
......
...@@ -68,7 +68,7 @@ void freeParam(SQueryParam *param) { ...@@ -68,7 +68,7 @@ void freeParam(SQueryParam *param) {
tfree(param->prevResult); tfree(param->prevResult);
} }
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo) { int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t *qId) {
assert(pQueryMsg != NULL && tsdb != NULL); assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -158,7 +158,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -158,7 +158,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over; goto _over;
} }
(*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql); (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId);
param.sql = NULL; param.sql = NULL;
param.pExprs = NULL; param.pExprs = NULL;
...@@ -479,7 +479,7 @@ void qCleanupQueryMgmt(void* pQMgmt) { ...@@ -479,7 +479,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
qDebug("vgId:%d, queryMgmt cleanup completed", vgId); qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
} }
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo) {
if (pMgmt == NULL) { if (pMgmt == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL; return NULL;
...@@ -499,8 +499,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { ...@@ -499,8 +499,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL; return NULL;
} else { } else {
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo; void** handle = taosCachePut(pQueryMgmt->qinfoPool, &qId, sizeof(qId), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
(getMaximumIdleDurationSec()*1000)); (getMaximumIdleDurationSec()*1000));
pthread_mutex_unlock(&pQueryMgmt->lock); pthread_mutex_unlock(&pQueryMgmt->lock);
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -247,7 +247,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -247,7 +247,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (contLen != 0) { if (contLen != 0) {
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); uint64_t qId = 0;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = code;
...@@ -259,22 +260,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -259,22 +260,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// current connect is broken // current connect is broken
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo); handle = qRegisterQInfo(pVnode->qMgmt, qId, (uint64_t)pQInfo);
if (handle == NULL) { // failed to register qhandle if (handle == NULL) { // failed to register qhandle
pRsp->code = terrno; pRsp->code = terrno;
terrno = 0; terrno = 0;
vError("vgId:%d, QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, vError("vgId:%d, QInfo:%"PRIu64 "-%p register qhandle failed, return to app, code:%s", pVnode->vgId, qId, (void *)pQInfo,
tstrerror(pRsp->code)); tstrerror(pRsp->code));
qDestroyQueryInfo(pQInfo); // destroy it directly qDestroyQueryInfo(pQInfo); // destroy it directly
return pRsp->code; return pRsp->code;
} else { } else {
assert(*handle == pQInfo); assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t)pQInfo); pRsp->qhandle = htobe64(qId);
} }
if (handle != NULL && if (handle != NULL &&
vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, vError("vgId:%d, QInfo:%"PRIu64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle,
pRead->rpcHandle); pRead->rpcHandle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
...@@ -285,7 +286,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -285,7 +286,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
} }
if (handle != NULL) { if (handle != NULL) {
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); vTrace("vgId:%d, QInfo:%"PRIu64 "-%p, dnode query msg disposed, create qhandle and returns to app", vgId, qId, *handle);
code = vnodePutItemIntoReadQueue(pVnode, handle, pRead->rpcHandle); code = vnodePutItemIntoReadQueue(pVnode, handle, pRead->rpcHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRsp->code = code; pRsp->code = code;
...@@ -349,7 +350,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -349,7 +350,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
pRetrieve->free = htons(pRetrieve->free); pRetrieve->free = htons(pRetrieve->free);
pRetrieve->qhandle = htobe64(pRetrieve->qhandle); pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle, vTrace("vgId:%d, QInfo:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qhandle,
pRetrieve->free, pRead->rpcHandle); pRetrieve->free, pRead->rpcHandle);
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
...@@ -360,19 +361,19 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -360,19 +361,19 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (handle == NULL) { if (handle == NULL) {
code = terrno; code = terrno;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
} else if ((*handle) != (void *)pRetrieve->qhandle) { } else if (!checkQIdEqual(*handle, pRetrieve->qhandle)) {
code = TSDB_CODE_QRY_INVALID_QHANDLE; code = TSDB_CODE_QRY_INVALID_QHANDLE;
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pRetrieve->qhandle); vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qhandle);
vnodeBuildNoResultQueryRsp(pRet); vnodeBuildNoResultQueryRsp(pRet);
return code; return code;
} }
// kill current query and free corresponding resources. // kill current query and free corresponding resources.
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qhandle, *handle);
qKillQuery(*handle); qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
...@@ -383,7 +384,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -383,7 +384,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// register the qhandle to connect to quit query immediate if connection is broken // register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pRead->rpcHandle); vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL; code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(*handle); qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册