提交 ddbf242f 编写于 作者: S slguan

process show message

上级 5b969523
......@@ -1205,7 +1205,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(pQueryInfo->type);
pSql->cmd.payloadLen = pMsg - pStart;
pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_RETRIEVE;
pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
return TSDB_CODE_SUCCESS;
}
......@@ -1918,7 +1918,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SShowTableMsg) + pCmd->payloadLen + TSDB_EXTRA_PAYLOAD_SIZE;
int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SShowMsg) + pCmd->payloadLen + TSDB_EXTRA_PAYLOAD_SIZE;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for show msg", pSql);
return -1;
......@@ -1951,7 +1951,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
pShowMsg->payloadLen = htons(pPattern->n);
}
pMsg += (sizeof(SShowTableMsg) + pPattern->n);
pMsg += (sizeof(SShowMsg) + pPattern->n);
} else {
SSQLToken *pIpAddr = &pShowInfo->prefix;
assert(pIpAddr->n > 0 && pIpAddr->type > 0);
......@@ -1959,7 +1959,7 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
pShowMsg->payloadLen = htons(pIpAddr->n);
pMsg += (sizeof(SShowTableMsg) + pIpAddr->n);
pMsg += (sizeof(SShowMsg) + pIpAddr->n);
}
pCmd->payloadLen = pMsg - pStart;
......@@ -2216,7 +2216,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_DNODE_RETRIEVE;
pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
return TSDB_CODE_SUCCESS;
}
......
......@@ -59,7 +59,7 @@ void* dnodeProcessMsgFromShell(int8_t msgType, void *pCont, int32_t contLen, voi
if (msgType == TSDB_MSG_TYPE_DNODE_QUERY) {
dnodeProcessQueryRequest(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_DNODE_RETRIEVE) {
} else if (msgType == TSDB_MSG_TYPE_RETRIEVE) {
dnodeProcessRetrieveRequest(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_DNODE_SUBMIT) {
dnodeProcessShellSubmitRequest(pCont, contLen, handle);
......
......@@ -302,6 +302,7 @@ typedef struct {
typedef struct {
char type;
char db[TSDB_DB_NAME_LEN];
void * pNode;
short numOfColumns;
int rowSize;
......@@ -314,7 +315,6 @@ typedef struct {
char payload[]; /* payload for wildcard match in show tables */
} SShowObj;
//mgmtSystem
int32_t mgmtStartSystem();
void mgmtCleanUpSystem();
......
......@@ -35,8 +35,8 @@ extern "C" {
#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_DNODE_QUERY 5
#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6
#define TSDB_MSG_TYPE_DNODE_RETRIEVE 7
#define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_RETRIEVE 7
#define TSDB_MSG_TYPE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11
......@@ -269,10 +269,6 @@ typedef struct {
int8_t data[];
} SDCreateTableMsg;
typedef struct {
char db[TSDB_DB_NAME_LEN];
} SShowTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN];
......@@ -738,7 +734,8 @@ typedef struct {
* payloadLen is the length of payload
*/
typedef struct {
int8_t type;
int8_t type;
char db[TSDB_DB_NAME_LEN];
uint16_t payloadLen;
char payload[];
} SShowMsg;
......@@ -749,7 +746,7 @@ typedef struct {
} SShowRsp;
typedef struct {
char ip[20];
char ip[32];
} SCreateMnodeMsg, SDropMnodeMsg, SCreateDnodeMsg, SDropDnodeMsg;
typedef struct {
......
......@@ -25,6 +25,10 @@ extern "C" {
int mgmtGetConnsMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn);
bool mgmtCheckQhandle(uint64_t qhandle);
void mgmtSaveQhandle(void *qhandle);
void mgmtFreeQhandle(void *qhandle);
#ifdef __cplusplus
}
#endif
......
......@@ -141,3 +141,13 @@ int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn) {
pShow->numOfReads += numOfRows;
return numOfRows;
}
bool mgmtCheckQhandle(uint64_t qhandle) {
return true;
}
void mgmtSaveQhandle(void *qhandle) {
}
void mgmtFreeQhandle(void *qhandle) {
}
\ No newline at end of file
......@@ -910,150 +910,111 @@ static void mgmtInitShowMsgFp() {
}
int32_t mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) {
SShowMsg * pShowMsg = (SShowMsg *)pCont;
STaosRsp * pRsp;
char * pStart;
int32_t code = 0;
SShowRsp *pShowRsp;
SShowObj * pShow = NULL;
SShowMsg *pShowMsg = pCont;
if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) {
if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_SHOW_RSP) != 0) {
return 0;
if (mgmtCheckRedirectMsg(ahandle) != 0) {
return TSDB_CODE_REDIRECT;
}
}
int32_t size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS +
TSDB_EXTRA_PAYLOAD_SIZE;
pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_SHOW_RSP, size);
if (pStart == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_SHOW_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0;
int32_t size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SShowRsp *pShowRsp = rpcMallocCont(size);
if (pShowRsp == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
pMsg = pStart;
pRsp = (STaosRsp *)pMsg;
pMsg = (char *)pRsp->more;
int32_t code;
if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) {
code = -1;
code = TSDB_CODE_INVALID_MSG_TYPE;
} else {
pShow = (SShowObj *)calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen));
SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen));
pShow->signature = pShow;
pShow->type = pShowMsg->type;
pShow->type = pShowMsg->type;
strcpy(pShow->db, pShow->db);
mTrace("pShow:%p is allocated", pShow);
// set the table name query condition
pShow->payloadLen = htons(pShowMsg->payloadLen);
memcpy(pShow->payload, pShowMsg->payload, pShow->payloadLen);
pShowRsp = (SShowRsp *)pMsg;
pShowRsp->qhandle = (uint64_t)pShow; // qhandle;
pConn->qhandle = pShowRsp->qhandle;
code = (*mgmtGetMetaFp[(uint8_t)pShowMsg->type])(&pShowRsp->meterMeta, pShow, pConn);
mgmtSaveQhandle(pShow);
pShowRsp->qhandle = htobe64((uint64_t) pShow);
code = (*mgmtGetMetaFp[(uint8_t) pShowMsg->type])(&pShowRsp->meterMeta, pShow, ahandle);
if (code == 0) {
pMsg += sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
size = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
} else {
mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type, taosMsg[(uint8_t)pShowMsg->type], code);
mError("pShow:%p, type:%d %s, failed to get Meta, code:%d", pShow, pShowMsg->type,
taosMsg[(uint8_t) pShowMsg->type], code);
free(pShow);
}
}
pRsp->code = code;
msgLen = pMsg - pStart;
taosSendMsgToPeer(pConn->thandle, pStart, msgLen);
return msgLen;
rpcSendResponse(ahandle, code, pShowRsp, size);
return code;
}
int32_t mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) {
// SRetrieveMeterMsg *pRetrieve;
// SRetrieveMeterRsp *pRsp;
// int32_t rowsToRead = 0, size = 0, rowsRead = 0;
// char * pStart;
// int32_t code = 0;
// SShowObj * pShow;
//
// pRetrieve = (SRetrieveMeterMsg *)pMsg;
//
// /*
// * in case of server restart, apps may hold qhandle created by server before
// * restart, which is actually invalid, therefore, signature check is required.
// */
// if (pRetrieve->qhandle != pConn->qhandle) {
// mError("retrieve:%p, qhandle:%p is not matched with saved:%p", pRetrieve, pRetrieve->qhandle, pConn->qhandle);
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED);
// return -1;
// }
//
// pShow = (SShowObj *)pRetrieve->qhandle;
// if (pShow->signature != (void *)pShow) {
// mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature);
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED);
// return -1;
// } else {
// if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
// rowsToRead = pShow->numOfRows - pShow->numOfReads;
// }
//
// /* return no more than 100 meters in one round trip */
// if (rowsToRead > 100) rowsToRead = 100;
//
// /*
// * the actual number of table may be larger than the value of pShow->numOfRows, if a query is
// * issued during a continuous create table operation. Therefore, rowToRead may be less than 0.
// */
// if (rowsToRead < 0) rowsToRead = 0;
// size = pShow->rowSize * rowsToRead;
// }
//
// pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, size + 100);
// if (pStart == NULL) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
// return 0;
// }
//
// pMsg = pStart;
//
// STaosRsp *pTaosRsp = (STaosRsp *)pStart;
// pTaosRsp->code = code;
// pMsg = pTaosRsp->more;
//
// if (code == 0) {
// pRsp = (SRetrieveMeterRsp *)pMsg;
// pMsg = pRsp->data;
//
// // if free flag is set, client wants to clean the resources
// if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE)
// rowsRead = (*mgmtRetrieveFp[(uint8_t)pShow->type])(pShow, pRsp->data, rowsToRead, pConn);
//
// if (rowsRead < 0) {
// rowsRead = 0;
// pTaosRsp->code = TSDB_CODE_ACTION_IN_PROGRESS;
// }
//
// pRsp->numOfRows = htonl(rowsRead);
// pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision
// pMsg += size;
// }
//
// msgLen = pMsg - pStart;
// taosSendMsgToPeer(pConn->thandle, pStart, msgLen);
//
// if (rowsToRead == 0) {
// uintptr_t oldSign = (uintptr_t)atomic_val_compare_exchange_ptr(&pShow->signature, pShow, 0);
// if (oldSign != (uintptr_t)pShow) {
// return msgLen;
// }
// // pShow->signature = 0;
// mTrace("pShow:%p is released", pShow);
// tfree(pShow);
// }
//
// return msgLen;
return 0;
int32_t rowsToRead = 0;
int32_t size = 0;
int32_t rowsRead = 0;
SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
/*
* in case of server restart, apps may hold qhandle created by server before
* restart, which is actually invalid, therefore, signature check is required.
*/
if (!mgmtCheckQhandle(pRetrieve->qhandle)) {
mError("retrieve:%p, qhandle:%p is invalid", pRetrieve, pRetrieve->qhandle);
rpcSendResponse(ahandle, TSDB_CODE_INVALID_QHANDLE, NULL, 0);
return TSDB_CODE_INVALID_QHANDLE;
}
SShowObj *pShow = (SShowObj *)pRetrieve->qhandle;
if (pShow->signature != (void *)pShow) {
mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature);
rpcSendResponse(ahandle, TSDB_CODE_MEMORY_CORRUPTED, NULL, 0);
return TSDB_CODE_MEMORY_CORRUPTED;
} else {
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsToRead = pShow->numOfRows - pShow->numOfReads;
}
/* return no more than 100 meters in one round trip */
if (rowsToRead > 100) rowsToRead = 100;
/*
* the actual number of table may be larger than the value of pShow->numOfRows, if a query is
* issued during a continuous create table operation. Therefore, rowToRead may be less than 0.
*/
if (rowsToRead < 0) rowsToRead = 0;
size = pShow->rowSize * rowsToRead;
}
size += 100;
SRetrieveMeterRsp *pRsp = rpcMallocCont(size);
// if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE)
rowsRead = (*mgmtRetrieveFp[(uint8_t) pShow->type])(pShow, pRsp->data, rowsToRead, ahandle);
if (rowsRead < 0) {
rowsRead = 0; // TSDB_CODE_ACTION_IN_PROGRESS;
rpcFreeCont(pRsp);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
pRsp->numOfRows = htonl(rowsRead);
pRsp->precision = htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pRsp, size);
if (rowsToRead == 0) {
mgmtFreeQhandle(pShow);
}
return TSDB_CODE_SUCCESS;
}
int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) {
......@@ -1364,7 +1325,7 @@ static bool mgmtCheckMeterMetaMsgType(void *pMsg) {
static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) {
if ((type == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) ||
type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_RETRIEVE ||
type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_MULTI_TABLE_META ||
type == TSDB_MSG_TYPE_CONNECT) {
return true;
......@@ -1419,7 +1380,7 @@ void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_RETRIEVE] = mgmtProcessRetrieveMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessMeterMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessMetricMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiMeterMetaMsg;
......
......@@ -338,7 +338,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in
// connection type is application specific.
// for TDengine, all the query, show commands shall have TCP connection
if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE ||
type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META ||
type == TSDB_MSG_TYPE_SHOW )
pContext->connType = RPC_CONN_TCPC;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册