提交 234113ef 编写于 作者: S slguan

connection from shell to dnode

上级 bddda543
......@@ -180,14 +180,16 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
pSql->ipList->ip[0] = inet_addr("192.168.0.1");
if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->ipList->port = tsVnodeShellPort;
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql);
} else {
pSql->ipList->port = tsMgmtShellPort;
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql);
}
......@@ -295,8 +297,14 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
* The actual inserted number of points is the first number.
*/
if (type == TSDB_MSG_TYPE_SUBMIT_RSP) {
pRes->numOfRows += *(int32_t *)pRes->pRsp;
SShellSubmitRspMsg *pMsg = pRes->pRsp;
pMsg->code = htonl(pMsg->code);
pMsg->numOfRows = htonl(pMsg->numOfRows);
pMsg->affectedRows = htonl(pMsg->affectedRows);
pMsg->failedRows = htonl(pMsg->failedRows);
pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);
pRes->numOfRows += pMsg->affectedRows;
tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
*(int32_t *)pRes->pRsp, pRes->rspLen);
} else {
......@@ -512,6 +520,8 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code;
}
// temp
pSql->ipList = &tscMgmtIpList;
// if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
// pSql->index = pMeterMetaInfo->pMeterMeta->index;
// } else { // it must be the parent SSqlObj for super table query
......@@ -1194,11 +1204,12 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pStart = pSql->cmd.payload + tsRpcHeadSize;
pMsg = pStart;
*((uint64_t *)pMsg) = pSql->res.qhandle;
SRetrieveTableMsg *pRetrieveMsg = (SShellSubmitMsg *)pMsg;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
pMsg += sizeof(pSql->res.qhandle);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
*((uint16_t *)pMsg) = htons(pQueryInfo->type);
pRetrieveMsg->free = htons(pQueryInfo->type);
pMsg += sizeof(pQueryInfo->type);
pSql->cmd.payloadLen = pMsg - pStart;
......@@ -1246,6 +1257,8 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip),
htons(pShellMsg->vnode));
pSql->cmd.payloadLen = sizeof(SShellSubmitMsg);
return TSDB_CODE_SUCCESS;
}
......@@ -1644,8 +1657,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
assert(msgLen + minMsgSize() <= size);
memmove(pSql->cmd.payload, pStart, pSql->cmd.payloadLen - tsRpcHeadSize);
return TSDB_CODE_SUCCESS;
}
......@@ -3007,7 +3018,10 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
int tscProcessQueryRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
pRes->qhandle = *((uint64_t *)pRes->pRsp);
SQueryTableRsp *pQuery = (SRetrieveTableRsp *)pRes->pRsp;
pQuery->qhandle = htobe64(pQuery->qhandle);
pRes->qhandle = pQuery->qhandle;
pRes->data = NULL;
tscResetForNextRetrieve(pRes);
return 0;
......
......@@ -25,16 +25,6 @@ extern "C" {
#include "taosdef.h"
#include "taosmsg.h"
/*
* Clear query information associated with this connection
*/
void dnodeFreeQInfo(void *pConn);
/*
* Clear all query informations
*/
void dnodeFreeQInfos();
/*
* handle query message, and the result is returned by callback function
*/
......@@ -49,7 +39,7 @@ void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieve
/*
* Fill retrieve result according to query info
*/
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *retrievalRsp);
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve);
/*
* Get the size of retrieve result according to query info
......
......@@ -56,7 +56,7 @@ int32_t dnodeDropVnode(int32_t vnode);
* Get the vnode object that has been opened
*/
//tsdb_repo_t* dnodeGetVnode(int vid);
void* dnodeGetVnode(int vid);
void* dnodeGetVnode(int32_t vnode);
/*
* get the status of vnode
......
......@@ -121,7 +121,7 @@ void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void
//rpcFreeCont(pCont);
}
void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
static void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SDCreateTableMsg *pTable = pCont;
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
......@@ -152,7 +152,7 @@ void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
}
void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
static void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SDAlterStreamMsg *pStream = pCont;
pStream->uid = htobe64(pStream->uid);
pStream->stime = htobe64(pStream->stime);
......@@ -164,7 +164,7 @@ void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
}
void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
static void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SDRemoveTableMsg *pTable = pCont;
pTable->sid = htonl(pTable->sid);
pTable->numOfVPeers = htonl(pTable->numOfVPeers);
......@@ -179,7 +179,7 @@ void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
}
void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
static void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
int32_t code = htonl(*((int32_t *) pCont));
if (code == TSDB_CODE_SUCCESS) {
......@@ -195,7 +195,7 @@ void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void
}
}
void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
static void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
int32_t code = htonl(*((int32_t *) pCont));
if (code == TSDB_CODE_SUCCESS) {
......@@ -212,14 +212,14 @@ void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void
}
}
void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
static void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont;
int32_t code = dnodeCreateVnode(pVnode);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
}
void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
static void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont;
int32_t vnode = htonl(pVnode->vnode);
......@@ -227,13 +227,17 @@ void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType,
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
}
void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
static void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
int32_t code = tsCfgDynamicOptions(pCfg->config);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
}
static void dnodeProcessDropStableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) {
dnodeSendRspToMnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0);
}
void dnodeSendVnodeCfgMsg(int32_t vnode) {
SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg));
if (cfg == NULL) {
......@@ -254,13 +258,14 @@ void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid) {
dnodeSendMsgToMnode(TSDB_MSG_TYPE_TABLE_CFG, cfg, sizeof(STableCfgMsg));
}
void dnodeInitProcessShellMsg() {
static void dnodeInitProcessShellMsg() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessDropStableRequest;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp;
}
\ No newline at end of file
......@@ -22,33 +22,32 @@
#include "dnodeRead.h"
#include "dnodeSystem.h"
void dnodeFreeQInfo(void *pConn) {}
void dnodeFreeQInfos() {}
void dnodeQueryData(SQueryTableMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)) {
void *pQInfo = NULL;
int code = TSDB_CODE_SUCCESS;
callback(code, pConn, pQInfo);
dTrace("conn:%p, query msg is disposed", pConn);
void *pQInfo = 100;
callback(TSDB_CODE_SUCCESS, pQInfo, pConn);
}
static void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
//SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *)pSched->msg;
SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle;
SRetrieveTableMsg *pRetrieve = pSched->msg;
void *pConn = pSched->ahandle;
dTrace("conn:%p, retrieve msg is disposed, qhandle:%" PRId64, pConn, pRetrieve->qhandle);
//examples
int32_t code = TSDB_CODE_INVALID_QHANDLE;
void *pQInfo = NULL; //get from pConn
(*callback)(code, pQInfo, pConn);
int32_t code = TSDB_CODE_SUCCESS;
void *pQInfo = (void*)pRetrieve->qhandle;
//TODO build response here
(*callback)(code, pQInfo, pConn);
free(pSched->msg);
}
void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) {
int8_t *msg = malloc(sizeof(SRetrieveTableMsg));
dTrace("conn:%p, retrieve msg is received", pConn);
void *msg = malloc(sizeof(SRetrieveTableMsg));
memcpy(msg, pRetrieve, sizeof(SRetrieveTableMsg));
SSchedMsg schedMsg;
......@@ -59,12 +58,15 @@ void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieve
taosScheduleTask(tsQueryQhandle, &schedMsg);
}
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *retrievalRsp) {
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve) {
dTrace("qInfo:%p, data is retrieved");
pRetrieve->numOfRows = 0;
return 0;
}
int32_t dnodeGetRetrieveDataSize(void *pQInfo) {
return 0;
dTrace("qInfo:%p, contLen is 100");
return 100;
}
......@@ -32,42 +32,16 @@
#include "dnodeVnodeMgmt.h"
#include "dnodeWrite.h"
static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeProcessRetrieveMsg(void *pCont, int32_t contLen, void *pConn);
static void dnodeProcessQueryMsg(void *pCont, int32_t contLen, void *pConn);
static void dnodeProcessSubmitMsg(void *pCont, int32_t contLen, void *pConn);
static void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code);
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void *tsDnodeShellServer = NULL;
static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code) {
assert(handle != NULL);
if (pCont == NULL || contLen == 0) {
dnodeFreeQInfo(handle);
dTrace("conn:%p, free query info", handle);
return;
}
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0);
dTrace("conn:%p, query msg is ignored since dnode not running", handle);
return;
}
dTrace("conn:%p, msg:%s is received", handle, taosMsg[(int8_t)msgType]);
if (msgType == TSDB_MSG_TYPE_QUERY) {
dnodeProcessQueryRequest(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_RETRIEVE) {
dnodeProcessRetrieveRequest(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_SUBMIT) {
dnodeProcessShellSubmitRequest(pCont, contLen, handle);
} else {
dError("conn:%p, msg:%s is not processed", handle, taosMsg[(int8_t)msgType]);
}
}
int32_t dnodeInitShell() {
int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
......@@ -85,6 +59,7 @@ int32_t dnodeInitShell() {
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 2000;
rpcInit.afp = dnodeRetrieveUserAuthInfo;
tsDnodeShellServer = rpcOpen(&rpcInit);
if (tsDnodeShellServer == NULL) {
......@@ -100,35 +75,69 @@ void dnodeCleanupShell() {
if (tsDnodeShellServer) {
rpcClose(tsDnodeShellServer);
}
}
SDnodeStatisInfo dnodeGetStatisInfo() {
SDnodeStatisInfo info = {0};
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
info.httpReqNum = httpGetReqCount();
info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0);
info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0);
}
return info;
}
static void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code) {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0);
dTrace("query msg is ignored since dnode not running");
return;
}
dnodeFreeQInfos();
dTrace("conn:%p, msg:%s is received", handle, taosMsg[(int8_t)msgType]);
if (msgType == TSDB_MSG_TYPE_QUERY) {
dnodeProcessQueryMsg(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_RETRIEVE) {
dnodeProcessRetrieveMsg(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_SUBMIT) {
dnodeProcessSubmitMsg(pCont, contLen, handle);
} else {
dError("conn:%p, msg:%s is not processed", handle, taosMsg[(int8_t)msgType]);
}
//TODO free may be cause segmentfault
// rpcFreeCont(pCont);
}
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return TSDB_CODE_SUCCESS;
}
void dnodeProcessQueryRequestCb(int code, void *pQInfo, void *pConn) {
static void dnodeProcessQueryMsgCb(int32_t code, void *pQInfo, void *pConn) {
dTrace("conn:%p, query is returned, code:%d", pConn, code);
int32_t contLen = sizeof(SQueryTableRsp);
SQueryTableRsp *queryRsp = (SQueryTableRsp *) rpcMallocCont(contLen);
if (queryRsp == NULL) {
rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return;
}
dTrace("conn:%p, query data, code:%d pQInfo:%p", pConn, code, pQInfo);
queryRsp->code = htonl(code);
queryRsp->qhandle = (uint64_t) (pQInfo);
queryRsp->qhandle = htobe64((uint64_t) (pQInfo));
rpcSendResponse(pConn, TSDB_CODE_SUCCESS, queryRsp, contLen);
}
static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn) {
static void dnodeProcessQueryMsg(void *pCont, int32_t contLen, void *pConn) {
atomic_fetch_add_32(&tsDnodeQueryReqNum, 1);
dTrace("conn:%p, start to query data", pConn);
SQueryTableMsg *pQuery = (SQueryTableMsg *) pCont;
dnodeQueryData(pQuery, pConn, dnodeProcessQueryRequestCb);
dnodeQueryData(pQuery, pConn, dnodeProcessQueryMsgCb);
}
void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) {
dTrace("conn:%p, retrieve data, code:%d", pConn, code);
void dnodeProcessRetrieveMsgCb(int32_t code, void *pQInfo, void *pConn) {
dTrace("conn:%p, retrieve is returned, code:%d", pConn, code);
assert(pConn != NULL);
if (code != TSDB_CODE_SUCCESS) {
......@@ -138,48 +147,49 @@ void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) {
assert(pQInfo != NULL);
int32_t contLen = dnodeGetRetrieveDataSize(pQInfo);
SRetrieveTableRsp *retrieveRsp = (SRetrieveTableRsp *) rpcMallocCont(contLen);
if (retrieveRsp == NULL) {
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) rpcMallocCont(contLen);
if (pRetrieve == NULL) {
rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 0);
return;
}
code = dnodeGetRetrieveData(pQInfo, retrieveRsp);
code = dnodeGetRetrieveData(pQInfo, pRetrieve);
if (code != TSDB_CODE_SUCCESS) {
rpcSendResponse(pConn, TSDB_CODE_INVALID_QHANDLE, 0, 0);
}
retrieveRsp->numOfRows = htonl(retrieveRsp->numOfRows);
retrieveRsp->precision = htons(retrieveRsp->precision);
retrieveRsp->offset = htobe64(retrieveRsp->offset);
retrieveRsp->useconds = htobe64(retrieveRsp->useconds);
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
pRetrieve->precision = htons(pRetrieve->precision);
pRetrieve->offset = htobe64(pRetrieve->offset);
pRetrieve->useconds = htobe64(pRetrieve->useconds);
rpcSendResponse(pConn, TSDB_CODE_SUCCESS, retrieveRsp, contLen);
rpcSendResponse(pConn, TSDB_CODE_SUCCESS, pRetrieve, contLen);
}
static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn) {
dTrace("conn:%p, start to retrieve data", pConn);
static void dnodeProcessRetrieveMsg(void *pCont, int32_t contLen, void *pConn) {
SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *) pCont;
dnodeRetrieveData(pRetrieve, pConn, dnodeProcessRetrieveRequestCb);
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
pRetrieve->free = htons(pRetrieve->free);
dnodeRetrieveData(pRetrieve, pConn, dnodeProcessRetrieveMsgCb);
}
void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) {
void dnodeProcessSubmitMsgCb(SShellSubmitRspMsg *result, void *pConn) {
assert(result != NULL);
dTrace("conn:%p, submit is returned, code:%d", pConn, result->code);
if (result->code != 0) {
rpcSendResponse(pConn, result->code, 0, 0);
rpcSendResponse(pConn, result->code, NULL, 0);
return;
}
int32_t contLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock);
SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) rpcMallocCont(contLen);
if (submitRsp == NULL) {
rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 0);
rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return;
}
dTrace("code:%d, numOfRows:%d affectedRows:%d", result->code, result->numOfRows, result->affectedRows);
memcpy(submitRsp, result, contLen);
for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) {
......@@ -203,19 +213,9 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) {
rpcSendResponse(pConn, TSDB_CODE_SUCCESS, submitRsp, contLen);
}
static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn) {
SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pCont;
dnodeWriteData(pSubmit, pConn, dnodeProcessShellSubmitRequestCb);
static void dnodeProcessSubmitMsg(void *pCont, int32_t contLen, void *pConn) {
atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1);
}
SDnodeStatisInfo dnodeGetStatisInfo() {
SDnodeStatisInfo info = {0};
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
info.httpReqNum = httpGetReqCount();
info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0);
info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0);
}
return info;
}
\ No newline at end of file
SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pCont;
dnodeWriteData(pSubmit, pConn, dnodeProcessSubmitMsgCb);
}
......@@ -20,36 +20,42 @@
#include "dnodeVnodeMgmt.h"
int32_t dnodeOpenVnodes() {
return 0;
dPrint("open all vnodes");
return TSDB_CODE_SUCCESS;
}
int32_t dnodeCleanupVnodes() {
return 0;
dPrint("clean all vnodes");
return TSDB_CODE_SUCCESS;
}
bool dnodeCheckVnodeExist(int32_t vnode) {
dPrint("vnode:%d, check vnode exist", vnode);
return true;
}
int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode) {
dPrint("vnode:%d, is created", htonl(pVnode->vnode));
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t dnodeDropVnode(int32_t vnode) {
dPrint("vnode:%d, is dropped", vnode);
return 0;
return TSDB_CODE_SUCCESS;
}
void* dnodeGetVnode(int vid) {
void* dnodeGetVnode(int32_t vnode) {
dPrint("vnode:%d, get vnode");
return NULL;
}
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) {
dPrint("vnode:%d, get vnode status");
return TSDB_VN_STATUS_MASTER;
}
bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) {
dPrint("vnode:%d, sid:%d, check table exist");
return true;
}
......@@ -53,6 +53,8 @@ extern "C" {
#define TSDB_MSG_TYPE_SDB_SYNC_RSP 22
#define TSDB_MSG_TYPE_SDB_FORWARD 23
#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24
#define TSDB_MSG_TYPE_DROP_STABLE 25
#define TSDB_MSG_TYPE_DROP_STABLE_RSP 26
#define TSDB_MSG_TYPE_CONNECT 31
#define TSDB_MSG_TYPE_CONNECT_RSP 32
#define TSDB_MSG_TYPE_CREATE_ACCT 33
......@@ -261,7 +263,7 @@ typedef struct {
int16_t numOfColumns;
int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
int16_t reserved[16];
SSchema schema[];
char schema[];
} SCreateTableMsg;
typedef struct {
......
......@@ -86,7 +86,7 @@ int main(int argc, char* argv[]) {
{
printf("=== this a test for debug usage\n");
void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
taos_query(taos, "insert into d1.t14 values(now, 1)");
taos_query(taos, "select * from d1.t6");
while (1) {
sleep(1000);
}
......
......@@ -276,7 +276,7 @@ void mgmtCleanUpChildTables() {
static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, void *pTagData, int32_t tagDataLen) {
int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
int32_t contLen = sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
int32_t contLen = sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
SDCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
if (pCreateTable == NULL) {
......@@ -311,7 +311,7 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou
pSchema++;
}
memcpy(pCreateTable + sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen);
memcpy(pCreateTable + sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen);
return pCreateTable;
}
......
......@@ -184,6 +184,10 @@ static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contL
mTrace("free vnode rsp received, thandle:%p code:%d", thandle, code);
}
static void mgmtProcessDropStableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
mTrace("drop stable rsp received, thandle:%p code:%d", thandle, code);
}
static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
mTrace("create vnode rsp received, thandle:%p code:%d", thandle, code);
if (thandle == NULL) return;
......@@ -241,6 +245,8 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p
mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_FREE_VNODE_RSP) {
mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DROP_STABLE) {
mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) {
} else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
} else {
......
......@@ -293,7 +293,7 @@ void mgmtCleanUpNormalTables() {
static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
int32_t totalCols = pTable->numOfColumns;
int32_t contLen = sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
int32_t contLen = sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
SDCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
if (pCreateTable == NULL) {
......@@ -327,7 +327,7 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr
pSchema++;
}
memcpy(pCreateTable->data + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
memcpy(pCreateTable + sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
return pCreateTable;
}
......
......@@ -439,6 +439,8 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
pInfo->clientIp = pConn->peerIp;
pInfo->clientPort = pConn->peerPort;
pInfo->serverIp = pConn->destIp;
assert(pConn->user[0]);
strcpy(pInfo->user, pConn->user);
}
......
......@@ -41,8 +41,8 @@ char *taosMsg[] = {
"sync-rsp",
"forward",
"forward-rsp",
"",
"",
"drop-stable",
"drop-stable-rsp",
"",
"",
"",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册