未验证 提交 e7452e15 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #8951 from taosdata/feature/TD-11544

[TD-11544][Enhancement] compatibility between old version client and new version server
...@@ -3457,7 +3457,9 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { ...@@ -3457,7 +3457,9 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
strncpy(pCmd->payload, idStr->z, idStr->n); SKillQueryMsg* msg = (SKillQueryMsg*)pCmd->payload;
strncpy(msg->queryId, idStr->z, idStr->n);
const char delim = ':'; const char delim = ':';
char* connIdStr = strtok(idStr->z, &delim); char* connIdStr = strtok(idStr->z, &delim);
...@@ -3465,7 +3467,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { ...@@ -3465,7 +3467,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
int32_t connId = (int32_t)strtol(connIdStr, NULL, 10); int32_t connId = (int32_t)strtol(connIdStr, NULL, 10);
if (connId <= 0) { if (connId <= 0) {
memset(pCmd->payload, 0, strlen(pCmd->payload)); memset(msg, 0, sizeof(*msg));
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
...@@ -3475,7 +3477,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) { ...@@ -3475,7 +3477,7 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
if (queryId <= 0) { if (queryId <= 0) {
memset(pCmd->payload, 0, strlen(pCmd->payload)); memset(msg, 0, sizeof(*msg));
if (killType == TSDB_SQL_KILL_QUERY) { if (killType == TSDB_SQL_KILL_QUERY) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
} else { } else {
......
...@@ -506,7 +506,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -506,7 +506,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
} }
} }
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { if (pRes->code == TSDB_CODE_SUCCESS && pCmd->command < TSDB_SQL_MAX && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
...@@ -906,6 +906,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -906,6 +906,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SArray* queryOperator = createExecOperatorPlan(&query); SArray* queryOperator = createExecOperatorPlan(&query);
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
int32_t numOfTags = query.numOfTags; int32_t numOfTags = query.numOfTags;
...@@ -1146,6 +1147,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1146,6 +1147,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
memcpy(pMsg, pSql->sqlstr, sqlLen); memcpy(pMsg, pSql->sqlstr, sqlLen);
pMsg += sqlLen; pMsg += sqlLen;
/*
//MSG EXTEND DEMO
pQueryMsg->extend = 1;
STLV *tlv = (STLV *)pMsg;
tlv->type = htons(TLV_TYPE_DUMMY);
tlv->len = htonl(sizeof(int16_t));
*(int16_t *)tlv->value = htons(12345);
pMsg += sizeof(*tlv) + ntohl(tlv->len);
tlv = (STLV *)pMsg;
tlv->len = 0;
pMsg += sizeof(*tlv);
*/
int32_t msgLen = (int32_t)(pMsg - pCmd->payload); int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
tscDebug("0x%"PRIx64" msg built success, len:%d bytes", pSql->self, msgLen); tscDebug("0x%"PRIx64" msg built success, len:%d bytes", pSql->self, msgLen);
......
...@@ -120,6 +120,14 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -120,6 +120,14 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (pMsg->pCont == NULL) return; if (pMsg->pCont == NULL) return;
if (pMsg->msgType >= TSDB_MSG_TYPE_MAX) {
dError("RPC %p, shell msg type:%d is not processed", pMsg->handle, pMsg->msgType);
rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
return;
}
SRunStatus dnodeStatus = dnodeGetRunStatus(); SRunStatus dnodeStatus = dnodeGetRunStatus();
if (dnodeStatus == TSDB_RUN_STATUS_STOPPED) { if (dnodeStatus == TSDB_RUN_STATUS_STOPPED) {
dError("RPC %p, shell msg:%s is ignored since dnode exiting", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell msg:%s is ignored since dnode exiting", pMsg->handle, taosMsg[pMsg->msgType]);
......
...@@ -230,6 +230,7 @@ typedef struct SSubmitBlk { ...@@ -230,6 +230,7 @@ typedef struct SSubmitBlk {
// Submit message for this TSDB // Submit message for this TSDB
typedef struct SSubmitMsg { typedef struct SSubmitMsg {
SMsgHead header; SMsgHead header;
int8_t extend;
int32_t length; int32_t length;
int32_t numOfBlocks; int32_t numOfBlocks;
char blocks[]; char blocks[];
...@@ -243,6 +244,7 @@ typedef struct { ...@@ -243,6 +244,7 @@ typedef struct {
} SShellSubmitRspBlock; } SShellSubmitRspBlock;
typedef struct { typedef struct {
int8_t extend;
int32_t code; // 0-success, > 0 error code int32_t code; // 0-success, > 0 error code
int32_t numOfRows; // number of records the client is trying to write int32_t numOfRows; // number of records the client is trying to write
int32_t affectedRows; // number of records actually written int32_t affectedRows; // number of records actually written
...@@ -278,6 +280,7 @@ typedef struct { ...@@ -278,6 +280,7 @@ typedef struct {
} SMDCreateTableMsg; } SMDCreateTableMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t len; // one create table message int32_t len; // one create table message
char tableName[TSDB_TABLE_FNAME_LEN]; char tableName[TSDB_TABLE_FNAME_LEN];
int8_t igExists; int8_t igExists;
...@@ -290,11 +293,13 @@ typedef struct { ...@@ -290,11 +293,13 @@ typedef struct {
} SCreateTableMsg; } SCreateTableMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t numOfTables; int32_t numOfTables;
int32_t contLen; int32_t contLen;
} SCMCreateTableMsg; } SCMCreateTableMsg;
typedef struct { typedef struct {
int8_t extend;
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
// if user specify DROP STABLE, this flag will be set. And an error will be returned if it is not a super table // if user specify DROP STABLE, this flag will be set. And an error will be returned if it is not a super table
int8_t supertable; int8_t supertable;
...@@ -302,6 +307,7 @@ typedef struct { ...@@ -302,6 +307,7 @@ typedef struct {
} SCMDropTableMsg; } SCMDropTableMsg;
typedef struct { typedef struct {
int8_t extend;
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int16_t type; /* operation type */ int16_t type; /* operation type */
...@@ -314,6 +320,7 @@ typedef struct { ...@@ -314,6 +320,7 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int8_t extend;
int64_t uid; int64_t uid;
int32_t tid; int32_t tid;
int16_t tversion; int16_t tversion;
...@@ -327,6 +334,7 @@ typedef struct { ...@@ -327,6 +334,7 @@ typedef struct {
} SUpdateTableTagValMsg; } SUpdateTableTagValMsg;
typedef struct { typedef struct {
int8_t extend;
char clientVersion[TSDB_VERSION_LEN]; char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
...@@ -335,6 +343,7 @@ typedef struct { ...@@ -335,6 +343,7 @@ typedef struct {
} SConnectMsg; } SConnectMsg;
typedef struct { typedef struct {
int8_t extend;
char acctId[TSDB_ACCT_ID_LEN]; char acctId[TSDB_ACCT_ID_LEN];
char serverVersion[TSDB_VERSION_LEN]; char serverVersion[TSDB_VERSION_LEN];
char clusterId[TSDB_CLUSTER_ID_LEN]; char clusterId[TSDB_CLUSTER_ID_LEN];
...@@ -361,16 +370,19 @@ typedef struct { ...@@ -361,16 +370,19 @@ typedef struct {
} SAcctCfg; } SAcctCfg;
typedef struct { typedef struct {
int8_t extend;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
SAcctCfg cfg; SAcctCfg cfg;
} SCreateAcctMsg, SAlterAcctMsg; } SCreateAcctMsg, SAlterAcctMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; int8_t extend;
char user[TSDB_USER_LEN];
} SDropUserMsg, SDropAcctMsg; } SDropUserMsg, SDropAcctMsg;
typedef struct { typedef struct {
int8_t extend;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
int8_t privilege; int8_t privilege;
...@@ -462,6 +474,7 @@ typedef struct { ...@@ -462,6 +474,7 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int8_t extend;
char version[TSDB_VERSION_LEN]; char version[TSDB_VERSION_LEN];
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
...@@ -514,6 +527,7 @@ typedef struct { ...@@ -514,6 +527,7 @@ typedef struct {
} SQueryTableMsg; } SQueryTableMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t code; int32_t code;
union{uint64_t qhandle; uint64_t qId;}; // query handle union{uint64_t qhandle; uint64_t qId;}; // query handle
} SQueryTableRsp; } SQueryTableRsp;
...@@ -521,11 +535,13 @@ typedef struct { ...@@ -521,11 +535,13 @@ typedef struct {
// todo: the show handle should be replaced with id // todo: the show handle should be replaced with id
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
int8_t extend;
union{uint64_t qhandle; uint64_t qId;}; // query handle union{uint64_t qhandle; uint64_t qId;}; // query handle
uint16_t free; uint16_t free;
} SRetrieveTableMsg; } SRetrieveTableMsg;
typedef struct SRetrieveTableRsp { typedef struct SRetrieveTableRsp {
int8_t extend;
int32_t numOfRows; int32_t numOfRows;
int8_t completed; // all results are returned to client int8_t completed; // all results are returned to client
int16_t precision; int16_t precision;
...@@ -551,6 +567,7 @@ typedef struct { ...@@ -551,6 +567,7 @@ typedef struct {
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
int8_t extend;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t cacheBlockSize; //MB int32_t cacheBlockSize; //MB
int32_t totalBlocks; int32_t totalBlocks;
...@@ -577,6 +594,7 @@ typedef struct { ...@@ -577,6 +594,7 @@ typedef struct {
} SCreateDbMsg, SAlterDbMsg; } SCreateDbMsg, SAlterDbMsg;
typedef struct { typedef struct {
int8_t extend;
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
char path[PATH_MAX]; char path[PATH_MAX];
int32_t funcType; int32_t funcType;
...@@ -588,11 +606,13 @@ typedef struct { ...@@ -588,11 +606,13 @@ typedef struct {
} SCreateFuncMsg; } SCreateFuncMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t num; int32_t num;
char name[]; char name[];
} SRetrieveFuncMsg; } SRetrieveFuncMsg;
typedef struct { typedef struct {
int8_t extend;
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int32_t funcType; int32_t funcType;
int8_t resType; int8_t resType;
...@@ -603,15 +623,18 @@ typedef struct { ...@@ -603,15 +623,18 @@ typedef struct {
} SFunctionInfoMsg; } SFunctionInfoMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t num; int32_t num;
char content[]; char content[];
} SUdfFuncMsg; } SUdfFuncMsg;
typedef struct { typedef struct {
int8_t extend;
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
} SDropFuncMsg; } SDropFuncMsg;
typedef struct { typedef struct {
int8_t extend;
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
uint8_t ignoreNotExists; uint8_t ignoreNotExists;
} SDropDbMsg, SUseDbMsg, SSyncDbMsg; } SDropDbMsg, SUseDbMsg, SSyncDbMsg;
...@@ -744,12 +767,14 @@ typedef struct { ...@@ -744,12 +767,14 @@ typedef struct {
} SCreateVnodeMsg, SAlterVnodeMsg; } SCreateVnodeMsg, SAlterVnodeMsg;
typedef struct { typedef struct {
int8_t extend;
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
int16_t createFlag; int16_t createFlag;
char tags[]; char tags[];
} STableInfoMsg; } STableInfoMsg;
typedef struct { typedef struct {
int8_t extend;
uint8_t metaClone; // create local clone of the cached table meta uint8_t metaClone; // create local clone of the cached table meta
int32_t numOfVgroups; int32_t numOfVgroups;
int32_t numOfTables; int32_t numOfTables;
...@@ -758,21 +783,25 @@ typedef struct { ...@@ -758,21 +783,25 @@ typedef struct {
} SMultiTableInfoMsg; } SMultiTableInfoMsg;
typedef struct SSTableVgroupMsg { typedef struct SSTableVgroupMsg {
int8_t extend;
int32_t numOfTables; int32_t numOfTables;
} SSTableVgroupMsg, SSTableVgroupRspMsg; } SSTableVgroupMsg, SSTableVgroupRspMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t vgId; int32_t vgId;
int8_t numOfEps; int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SVgroupMsg, SVgroupInfo; } SVgroupMsg, SVgroupInfo;
typedef struct { typedef struct {
int8_t extend;
int32_t numOfVgroups; int32_t numOfVgroups;
SVgroupMsg vgroups[]; SVgroupMsg vgroups[];
} SVgroupsMsg, SVgroupsInfo; } SVgroupsMsg, SVgroupsInfo;
typedef struct STableMetaMsg { typedef struct STableMetaMsg {
int8_t extend;
int32_t contLen; int32_t contLen;
char tableFname[TSDB_TABLE_FNAME_LEN]; // table id char tableFname[TSDB_TABLE_FNAME_LEN]; // table id
uint8_t numOfTags; uint8_t numOfTags;
...@@ -792,6 +821,7 @@ typedef struct STableMetaMsg { ...@@ -792,6 +821,7 @@ typedef struct STableMetaMsg {
} STableMetaMsg; } STableMetaMsg;
typedef struct SMultiTableMeta { typedef struct SMultiTableMeta {
int8_t extend;
int32_t numOfTables; int32_t numOfTables;
int32_t numOfVgroup; int32_t numOfVgroup;
int32_t numOfUdf; int32_t numOfUdf;
...@@ -814,6 +844,7 @@ typedef struct { ...@@ -814,6 +844,7 @@ typedef struct {
* payloadLen is the length of payload * payloadLen is the length of payload
*/ */
typedef struct { typedef struct {
int8_t extend;
int8_t type; int8_t type;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
uint16_t payloadLen; uint16_t payloadLen;
...@@ -821,17 +852,20 @@ typedef struct { ...@@ -821,17 +852,20 @@ typedef struct {
} SShowMsg; } SShowMsg;
typedef struct { typedef struct {
int8_t extend;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t numOfVgroup; int32_t numOfVgroup;
int32_t vgid[]; int32_t vgid[];
} SCompactMsg; } SCompactMsg;
typedef struct SShowRsp { typedef struct SShowRsp {
int8_t extend;
uint64_t qhandle; uint64_t qhandle;
STableMetaMsg tableMeta; STableMetaMsg tableMeta;
} SShowRsp; } SShowRsp;
typedef struct { typedef struct {
int8_t extend;
char ep[TSDB_EP_LEN]; // end point, hostname:port char ep[TSDB_EP_LEN]; // end point, hostname:port
} SCreateDnodeMsg, SDropDnodeMsg; } SCreateDnodeMsg, SDropDnodeMsg;
...@@ -853,6 +887,7 @@ typedef struct { ...@@ -853,6 +887,7 @@ typedef struct {
} SConfigVnodeMsg; } SConfigVnodeMsg;
typedef struct { typedef struct {
int8_t extend;
char ep[TSDB_EP_LEN]; // end point, hostname:port char ep[TSDB_EP_LEN]; // end point, hostname:port
char config[64]; char config[64];
} SCfgDnodeMsg; } SCfgDnodeMsg;
...@@ -884,6 +919,7 @@ typedef struct { ...@@ -884,6 +919,7 @@ typedef struct {
} SStreamDesc; } SStreamDesc;
typedef struct { typedef struct {
int8_t extend;
char clientVer[TSDB_VERSION_LEN]; char clientVer[TSDB_VERSION_LEN];
uint32_t connId; uint32_t connId;
int32_t pid; int32_t pid;
...@@ -894,6 +930,7 @@ typedef struct { ...@@ -894,6 +930,7 @@ typedef struct {
} SHeartBeatMsg; } SHeartBeatMsg;
typedef struct { typedef struct {
int8_t extend;
uint32_t queryId; uint32_t queryId;
uint32_t streamId; uint32_t streamId;
uint32_t totalDnodes; uint32_t totalDnodes;
...@@ -904,10 +941,12 @@ typedef struct { ...@@ -904,10 +941,12 @@ typedef struct {
} SHeartBeatRsp; } SHeartBeatRsp;
typedef struct { typedef struct {
int8_t extend;
char queryId[TSDB_KILL_MSG_LEN + 1]; char queryId[TSDB_KILL_MSG_LEN + 1];
} SKillQueryMsg, SKillStreamMsg, SKillConnMsg; } SKillQueryMsg, SKillStreamMsg, SKillConnMsg;
typedef struct { typedef struct {
int8_t extend;
int32_t vnode; int32_t vnode;
int32_t sid; int32_t sid;
uint64_t uid; uint64_t uid;
...@@ -932,6 +971,16 @@ typedef struct { ...@@ -932,6 +971,16 @@ typedef struct {
char reserved2[64]; char reserved2[64];
} SStartupStep; } SStartupStep;
typedef struct {
int16_t type;
int32_t len;
char value[];
} STLV;
enum {
TLV_TYPE_DUMMY = 1,
};
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -8145,6 +8145,32 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { ...@@ -8145,6 +8145,32 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
goto _cleanup; goto _cleanup;
} }
/*
//MSG EXTEND DEMO
if (pQueryMsg->extend) {
pMsg += pQueryMsg->sqlstrLen;
STLV *tlv = NULL;
while (1) {
tlv = (STLV *)pMsg;
tlv->type = ntohs(tlv->type);
tlv->len = ntohl(tlv->len);
if (tlv->len > 0) {
*(int16_t *)tlv->value = ntohs(*(int16_t *)tlv->value);
qDebug("Got TLV,type:%d,len:%d,value:%d", tlv->type, tlv->len, *(int16_t*)tlv->value);
pMsg += sizeof(*tlv) + tlv->len;
continue;
}
break;
}
}
*/
qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64, "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
......
...@@ -153,10 +153,14 @@ class TDTestCase: ...@@ -153,10 +153,14 @@ class TDTestCase:
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 2, 's1') tdSql.checkData(0, 2, 's1')
tdSql.checkData(1, 2, 's0') tdSql.checkData(1, 2, 's0')
tdSql.execute('kill stream %s ;' % tdSql.queryResult[0][0])
time.sleep(5)
tdSql.query("show streams")
tdSql.checkRows(1)
def stop(self): def stop(self):
tdSql.close() #tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册