diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e1be3bf6a95a26d7b96d803b86d8a5a4f14d829e..a92bf7c22bef8cde1546b3cde4da46f9826f02e9 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3457,7 +3457,9 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { return TSDB_CODE_TSC_INVALID_OPERATION; } - strncpy(pCmd->payload, idStr->z, idStr->n); + SKillQueryMsg* msg = (SKillQueryMsg*)pCmd->payload; + + strncpy(msg->queryId, idStr->z, idStr->n); const char delim = ':'; char* connIdStr = strtok(idStr->z, &delim); @@ -3465,7 +3467,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { int32_t connId = (int32_t)strtol(connIdStr, NULL, 10); if (connId <= 0) { - memset(pCmd->payload, 0, strlen(pCmd->payload)); + memset(msg, 0, sizeof(*msg)); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } @@ -3475,7 +3477,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); if (queryId <= 0) { - memset(pCmd->payload, 0, strlen(pCmd->payload)); + memset(msg, 0, sizeof(*msg)); if (killType == TSDB_SQL_KILL_QUERY) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } else { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index fdf5ff3a2acc221c20fdb6ed2676933696aee544..896b5361435d78de1551d5bc9e0da61a19fd2e55 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -506,7 +506,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } } - if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { + if (pRes->code == TSDB_CODE_SUCCESS && pCmd->command < TSDB_SQL_MAX && tscProcessMsgRsp[pCmd->command]) { rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); } @@ -906,6 +906,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SArray* queryOperator = createExecOperatorPlan(&query); SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; + tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); int32_t numOfTags = query.numOfTags; @@ -1146,6 +1147,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { memcpy(pMsg, pSql->sqlstr, sqlLen); pMsg += sqlLen; + +/* + //MSG EXTEND DEMO + pQueryMsg->extend = 1; + + STLV *tlv = (STLV *)pMsg; + tlv->type = htons(TLV_TYPE_DUMMY); + tlv->len = htonl(sizeof(int16_t)); + *(int16_t *)tlv->value = htons(12345); + pMsg += sizeof(*tlv) + ntohl(tlv->len); + + tlv = (STLV *)pMsg; + tlv->len = 0; + pMsg += sizeof(*tlv); + +*/ + int32_t msgLen = (int32_t)(pMsg - pCmd->payload); tscDebug("0x%"PRIx64" msg built success, len:%d bytes", pSql->self, msgLen); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index f62e0c0df41f2fe399d0f4c1c8e661fcd0ef91b9..7676343b37d242c1d174a31959ea4be25a9d5af2 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -120,6 +120,14 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { if (pMsg->pCont == NULL) return; + if (pMsg->msgType >= TSDB_MSG_TYPE_MAX) { + dError("RPC %p, shell msg type:%d is not processed", pMsg->handle, pMsg->msgType); + rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; + rpcSendResponse(&rpcMsg); + rpcFreeCont(pMsg->pCont); + return; + } + SRunStatus dnodeStatus = dnodeGetRunStatus(); if (dnodeStatus == TSDB_RUN_STATUS_STOPPED) { dError("RPC %p, shell msg:%s is ignored since dnode exiting", pMsg->handle, taosMsg[pMsg->msgType]); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 84491e0a438fdb3b5dd2905acffaf32c76b23c9b..1db66f0387b02c3d02f50eef04110b23ce2d9e64 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -230,6 +230,7 @@ typedef struct SSubmitBlk { // Submit message for this TSDB typedef struct SSubmitMsg { SMsgHead header; + int8_t extend; int32_t length; int32_t numOfBlocks; char blocks[]; @@ -243,6 +244,7 @@ typedef struct { } SShellSubmitRspBlock; typedef struct { + int8_t extend; int32_t code; // 0-success, > 0 error code int32_t numOfRows; // number of records the client is trying to write int32_t affectedRows; // number of records actually written @@ -278,6 +280,7 @@ typedef struct { } SMDCreateTableMsg; typedef struct { + int8_t extend; int32_t len; // one create table message char tableName[TSDB_TABLE_FNAME_LEN]; int8_t igExists; @@ -290,11 +293,13 @@ typedef struct { } SCreateTableMsg; typedef struct { + int8_t extend; int32_t numOfTables; int32_t contLen; } SCMCreateTableMsg; typedef struct { + int8_t extend; char name[TSDB_TABLE_FNAME_LEN]; // if user specify DROP STABLE, this flag will be set. And an error will be returned if it is not a super table int8_t supertable; @@ -302,6 +307,7 @@ typedef struct { } SCMDropTableMsg; typedef struct { + int8_t extend; char tableFname[TSDB_TABLE_FNAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; int16_t type; /* operation type */ @@ -314,6 +320,7 @@ typedef struct { typedef struct { SMsgHead head; + int8_t extend; int64_t uid; int32_t tid; int16_t tversion; @@ -327,6 +334,7 @@ typedef struct { } SUpdateTableTagValMsg; typedef struct { + int8_t extend; char clientVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN]; char db[TSDB_TABLE_FNAME_LEN]; @@ -335,6 +343,7 @@ typedef struct { } SConnectMsg; typedef struct { + int8_t extend; char acctId[TSDB_ACCT_ID_LEN]; char serverVersion[TSDB_VERSION_LEN]; char clusterId[TSDB_CLUSTER_ID_LEN]; @@ -361,16 +370,19 @@ typedef struct { } SAcctCfg; typedef struct { + int8_t extend; char user[TSDB_USER_LEN]; char pass[TSDB_KEY_LEN]; SAcctCfg cfg; } SCreateAcctMsg, SAlterAcctMsg; typedef struct { - char user[TSDB_USER_LEN]; + int8_t extend; + char user[TSDB_USER_LEN]; } SDropUserMsg, SDropAcctMsg; typedef struct { + int8_t extend; char user[TSDB_USER_LEN]; char pass[TSDB_KEY_LEN]; int8_t privilege; @@ -462,6 +474,7 @@ typedef struct { typedef struct { SMsgHead head; + int8_t extend; char version[TSDB_VERSION_LEN]; bool stableQuery; // super table query or not @@ -514,6 +527,7 @@ typedef struct { } SQueryTableMsg; typedef struct { + int8_t extend; int32_t code; union{uint64_t qhandle; uint64_t qId;}; // query handle } SQueryTableRsp; @@ -521,11 +535,13 @@ typedef struct { // todo: the show handle should be replaced with id typedef struct { SMsgHead header; + int8_t extend; union{uint64_t qhandle; uint64_t qId;}; // query handle uint16_t free; } SRetrieveTableMsg; typedef struct SRetrieveTableRsp { + int8_t extend; int32_t numOfRows; int8_t completed; // all results are returned to client int16_t precision; @@ -551,6 +567,7 @@ typedef struct { } SVnodeLoad; typedef struct { + int8_t extend; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; int32_t cacheBlockSize; //MB int32_t totalBlocks; @@ -577,6 +594,7 @@ typedef struct { } SCreateDbMsg, SAlterDbMsg; typedef struct { + int8_t extend; char name[TSDB_FUNC_NAME_LEN]; char path[PATH_MAX]; int32_t funcType; @@ -588,11 +606,13 @@ typedef struct { } SCreateFuncMsg; typedef struct { + int8_t extend; int32_t num; char name[]; } SRetrieveFuncMsg; typedef struct { + int8_t extend; char name[TSDB_FUNC_NAME_LEN]; int32_t funcType; int8_t resType; @@ -603,15 +623,18 @@ typedef struct { } SFunctionInfoMsg; typedef struct { + int8_t extend; int32_t num; char content[]; } SUdfFuncMsg; typedef struct { + int8_t extend; char name[TSDB_FUNC_NAME_LEN]; } SDropFuncMsg; typedef struct { + int8_t extend; char db[TSDB_TABLE_FNAME_LEN]; uint8_t ignoreNotExists; } SDropDbMsg, SUseDbMsg, SSyncDbMsg; @@ -744,12 +767,14 @@ typedef struct { } SCreateVnodeMsg, SAlterVnodeMsg; typedef struct { + int8_t extend; char tableFname[TSDB_TABLE_FNAME_LEN]; int16_t createFlag; char tags[]; } STableInfoMsg; typedef struct { + int8_t extend; uint8_t metaClone; // create local clone of the cached table meta int32_t numOfVgroups; int32_t numOfTables; @@ -758,21 +783,25 @@ typedef struct { } SMultiTableInfoMsg; typedef struct SSTableVgroupMsg { + int8_t extend; int32_t numOfTables; } SSTableVgroupMsg, SSTableVgroupRspMsg; typedef struct { + int8_t extend; int32_t vgId; int8_t numOfEps; SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; } SVgroupMsg, SVgroupInfo; typedef struct { + int8_t extend; int32_t numOfVgroups; SVgroupMsg vgroups[]; } SVgroupsMsg, SVgroupsInfo; typedef struct STableMetaMsg { + int8_t extend; int32_t contLen; char tableFname[TSDB_TABLE_FNAME_LEN]; // table id uint8_t numOfTags; @@ -792,6 +821,7 @@ typedef struct STableMetaMsg { } STableMetaMsg; typedef struct SMultiTableMeta { + int8_t extend; int32_t numOfTables; int32_t numOfVgroup; int32_t numOfUdf; @@ -814,6 +844,7 @@ typedef struct { * payloadLen is the length of payload */ typedef struct { + int8_t extend; int8_t type; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; uint16_t payloadLen; @@ -821,17 +852,20 @@ typedef struct { } SShowMsg; typedef struct { + int8_t extend; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; int32_t numOfVgroup; int32_t vgid[]; } SCompactMsg; typedef struct SShowRsp { + int8_t extend; uint64_t qhandle; STableMetaMsg tableMeta; } SShowRsp; typedef struct { + int8_t extend; char ep[TSDB_EP_LEN]; // end point, hostname:port } SCreateDnodeMsg, SDropDnodeMsg; @@ -853,6 +887,7 @@ typedef struct { } SConfigVnodeMsg; typedef struct { + int8_t extend; char ep[TSDB_EP_LEN]; // end point, hostname:port char config[64]; } SCfgDnodeMsg; @@ -884,6 +919,7 @@ typedef struct { } SStreamDesc; typedef struct { + int8_t extend; char clientVer[TSDB_VERSION_LEN]; uint32_t connId; int32_t pid; @@ -894,6 +930,7 @@ typedef struct { } SHeartBeatMsg; typedef struct { + int8_t extend; uint32_t queryId; uint32_t streamId; uint32_t totalDnodes; @@ -904,10 +941,12 @@ typedef struct { } SHeartBeatRsp; typedef struct { + int8_t extend; char queryId[TSDB_KILL_MSG_LEN + 1]; } SKillQueryMsg, SKillStreamMsg, SKillConnMsg; typedef struct { + int8_t extend; int32_t vnode; int32_t sid; uint64_t uid; @@ -932,6 +971,16 @@ typedef struct { char reserved2[64]; } SStartupStep; +typedef struct { + int16_t type; + int32_t len; + char value[]; +} STLV; + +enum { + TLV_TYPE_DUMMY = 1, +}; + #pragma pack(pop) #ifdef __cplusplus diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5f929d38961b40078afd625a6796d6647ad6fcf3..b83926414f39bec3ac2c3d1d94459d0f6d27ea37 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -8145,6 +8145,32 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { goto _cleanup; } + + +/* + //MSG EXTEND DEMO + if (pQueryMsg->extend) { + pMsg += pQueryMsg->sqlstrLen; + + STLV *tlv = NULL; + while (1) { + tlv = (STLV *)pMsg; + tlv->type = ntohs(tlv->type); + tlv->len = ntohl(tlv->len); + if (tlv->len > 0) { + *(int16_t *)tlv->value = ntohs(*(int16_t *)tlv->value); + qDebug("Got TLV,type:%d,len:%d,value:%d", tlv->type, tlv->len, *(int16_t*)tlv->value); + pMsg += sizeof(*tlv) + tlv->len; + continue; + } + + break; + } + } + +*/ + + qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, diff --git a/tests/pytest/stream/stream2.py b/tests/pytest/stream/stream2.py index 9b4eb8725c96f95196f251c55b0b773cd68e9ed5..95f5d233855bbb743eaf0390d6c145258fd8f84d 100644 --- a/tests/pytest/stream/stream2.py +++ b/tests/pytest/stream/stream2.py @@ -153,10 +153,14 @@ class TDTestCase: tdSql.checkRows(2) tdSql.checkData(0, 2, 's1') tdSql.checkData(1, 2, 's0') + tdSql.execute('kill stream %s ;' % tdSql.queryResult[0][0]) + time.sleep(5) + tdSql.query("show streams") + tdSql.checkRows(1) def stop(self): - tdSql.close() + #tdSql.close() tdLog.success("%s successfully executed" % __file__)