From 234113ef04253330ee5aef2283437774ce66adec Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 29 Feb 2020 15:15:09 +0800 Subject: [PATCH] connection from shell to dnode --- src/client/src/tscServer.c | 36 +++++--- src/dnode/inc/dnodeRead.h | 12 +-- src/dnode/inc/dnodeVnodeMgmt.h | 2 +- src/dnode/src/dnodeMgmt.c | 25 +++--- src/dnode/src/dnodeRead.c | 32 +++---- src/dnode/src/dnodeShell.c | 146 ++++++++++++++++---------------- src/dnode/src/dnodeVnodeMgmt.c | 16 ++-- src/inc/taosmsg.h | 4 +- src/kit/shell/src/shellMain.c | 2 +- src/mnode/src/mgmtChildTable.c | 4 +- src/mnode/src/mgmtDnodeInt.c | 6 ++ src/mnode/src/mgmtNormalTable.c | 4 +- src/rpc/src/rpcMain.c | 2 + src/util/src/tstring.c | 4 +- 14 files changed, 161 insertions(+), 134 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 93a68a4999..10db2c4787 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -180,14 +180,16 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); - - memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); - pSql->ipList->ip[0] = inet_addr("192.168.0.1"); if (pSql->cmd.command < TSDB_SQL_MGMT) { + pSql->ipList->port = tsVnodeShellPort; + tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); + memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql); } else { + pSql->ipList->port = tsMgmtShellPort; + tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); + memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql); } @@ -295,8 +297,14 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, * The actual inserted number of points is the first number. */ if (type == TSDB_MSG_TYPE_SUBMIT_RSP) { - pRes->numOfRows += *(int32_t *)pRes->pRsp; - + SShellSubmitRspMsg *pMsg = pRes->pRsp; + pMsg->code = htonl(pMsg->code); + pMsg->numOfRows = htonl(pMsg->numOfRows); + pMsg->affectedRows = htonl(pMsg->affectedRows); + pMsg->failedRows = htonl(pMsg->failedRows); + pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks); + + pRes->numOfRows += pMsg->affectedRows; tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, *(int32_t *)pRes->pRsp, pRes->rspLen); } else { @@ -512,6 +520,8 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } + // temp + pSql->ipList = &tscMgmtIpList; // if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { // pSql->index = pMeterMetaInfo->pMeterMeta->index; // } else { // it must be the parent SSqlObj for super table query @@ -1194,11 +1204,12 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pStart = pSql->cmd.payload + tsRpcHeadSize; pMsg = pStart; - *((uint64_t *)pMsg) = pSql->res.qhandle; + SRetrieveTableMsg *pRetrieveMsg = (SShellSubmitMsg *)pMsg; + pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); pMsg += sizeof(pSql->res.qhandle); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - *((uint16_t *)pMsg) = htons(pQueryInfo->type); + pRetrieveMsg->free = htons(pQueryInfo->type); pMsg += sizeof(pQueryInfo->type); pSql->cmd.payloadLen = pMsg - pStart; @@ -1246,6 +1257,8 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), htons(pShellMsg->vnode)); + pSql->cmd.payloadLen = sizeof(SShellSubmitMsg); + return TSDB_CODE_SUCCESS; } @@ -1644,8 +1657,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { assert(msgLen + minMsgSize() <= size); - memmove(pSql->cmd.payload, pStart, pSql->cmd.payloadLen - tsRpcHeadSize); - return TSDB_CODE_SUCCESS; } @@ -3007,7 +3018,10 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) { int tscProcessQueryRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; - pRes->qhandle = *((uint64_t *)pRes->pRsp); + SQueryTableRsp *pQuery = (SRetrieveTableRsp *)pRes->pRsp; + pQuery->qhandle = htobe64(pQuery->qhandle); + pRes->qhandle = pQuery->qhandle; + pRes->data = NULL; tscResetForNextRetrieve(pRes); return 0; diff --git a/src/dnode/inc/dnodeRead.h b/src/dnode/inc/dnodeRead.h index 7ca9b24f62..ce73ecac85 100644 --- a/src/dnode/inc/dnodeRead.h +++ b/src/dnode/inc/dnodeRead.h @@ -25,16 +25,6 @@ extern "C" { #include "taosdef.h" #include "taosmsg.h" -/* - * Clear query information associated with this connection - */ -void dnodeFreeQInfo(void *pConn); - -/* - * Clear all query informations - */ -void dnodeFreeQInfos(); - /* * handle query message, and the result is returned by callback function */ @@ -49,7 +39,7 @@ void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieve /* * Fill retrieve result according to query info */ -int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *retrievalRsp); +int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve); /* * Get the size of retrieve result according to query info diff --git a/src/dnode/inc/dnodeVnodeMgmt.h b/src/dnode/inc/dnodeVnodeMgmt.h index 3cc705499f..504439fc7e 100644 --- a/src/dnode/inc/dnodeVnodeMgmt.h +++ b/src/dnode/inc/dnodeVnodeMgmt.h @@ -56,7 +56,7 @@ int32_t dnodeDropVnode(int32_t vnode); * Get the vnode object that has been opened */ //tsdb_repo_t* dnodeGetVnode(int vid); -void* dnodeGetVnode(int vid); +void* dnodeGetVnode(int32_t vnode); /* * get the status of vnode diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index f5b108b810..e1e7df07af 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -121,7 +121,7 @@ void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void //rpcFreeCont(pCont); } -void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { +static void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SDCreateTableMsg *pTable = pCont; pTable->numOfColumns = htons(pTable->numOfColumns); pTable->numOfTags = htons(pTable->numOfTags); @@ -152,7 +152,7 @@ void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { +static void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SDAlterStreamMsg *pStream = pCont; pStream->uid = htobe64(pStream->uid); pStream->stime = htobe64(pStream->stime); @@ -164,7 +164,7 @@ void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { +static void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SDRemoveTableMsg *pTable = pCont; pTable->sid = htonl(pTable->sid); pTable->numOfVPeers = htonl(pTable->numOfVPeers); @@ -179,7 +179,7 @@ void dnodeProcessRemoveTableRequest(void *pCont, int32_t contLen, int8_t msgType dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { +static void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { int32_t code = htonl(*((int32_t *) pCont)); if (code == TSDB_CODE_SUCCESS) { @@ -195,7 +195,7 @@ void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void } } -void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { +static void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { int32_t code = htonl(*((int32_t *) pCont)); if (code == TSDB_CODE_SUCCESS) { @@ -212,14 +212,14 @@ void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void } } -void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { +static void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont; int32_t code = dnodeCreateVnode(pVnode); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { +static void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont; int32_t vnode = htonl(pVnode->vnode); @@ -227,13 +227,17 @@ void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { +static void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont; int32_t code = tsCfgDynamicOptions(pCfg->config); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } +static void dnodeProcessDropStableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { + dnodeSendRspToMnode(pConn, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); +} + void dnodeSendVnodeCfgMsg(int32_t vnode) { SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg)); if (cfg == NULL) { @@ -254,13 +258,14 @@ void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid) { dnodeSendMsgToMnode(TSDB_MSG_TYPE_TABLE_CFG, cfg, sizeof(STableCfgMsg)); } -void dnodeInitProcessShellMsg() { +static void dnodeInitProcessShellMsg() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessDropStableRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp; } \ No newline at end of file diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 55c3ac8ba0..827e599806 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -22,33 +22,32 @@ #include "dnodeRead.h" #include "dnodeSystem.h" -void dnodeFreeQInfo(void *pConn) {} - -void dnodeFreeQInfos() {} - void dnodeQueryData(SQueryTableMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)) { - void *pQInfo = NULL; - int code = TSDB_CODE_SUCCESS; - callback(code, pConn, pQInfo); + dTrace("conn:%p, query msg is disposed", pConn); + void *pQInfo = 100; + callback(TSDB_CODE_SUCCESS, pQInfo, pConn); } static void dnodeExecuteRetrieveData(SSchedMsg *pSched) { - //SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *)pSched->msg; SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle; + SRetrieveTableMsg *pRetrieve = pSched->msg; void *pConn = pSched->ahandle; + dTrace("conn:%p, retrieve msg is disposed, qhandle:%" PRId64, pConn, pRetrieve->qhandle); + //examples - int32_t code = TSDB_CODE_INVALID_QHANDLE; - void *pQInfo = NULL; //get from pConn - (*callback)(code, pQInfo, pConn); + int32_t code = TSDB_CODE_SUCCESS; + void *pQInfo = (void*)pRetrieve->qhandle; - //TODO build response here + (*callback)(code, pQInfo, pConn); free(pSched->msg); } void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) { - int8_t *msg = malloc(sizeof(SRetrieveTableMsg)); + dTrace("conn:%p, retrieve msg is received", pConn); + + void *msg = malloc(sizeof(SRetrieveTableMsg)); memcpy(msg, pRetrieve, sizeof(SRetrieveTableMsg)); SSchedMsg schedMsg; @@ -59,12 +58,15 @@ void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieve taosScheduleTask(tsQueryQhandle, &schedMsg); } -int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *retrievalRsp) { +int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve) { + dTrace("qInfo:%p, data is retrieved"); + pRetrieve->numOfRows = 0; return 0; } int32_t dnodeGetRetrieveDataSize(void *pQInfo) { - return 0; + dTrace("qInfo:%p, contLen is 100"); + return 100; } diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 246fc39a0c..d1a58c65e9 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -32,42 +32,16 @@ #include "dnodeVnodeMgmt.h" #include "dnodeWrite.h" -static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn); -static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn); -static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn); +static void dnodeProcessRetrieveMsg(void *pCont, int32_t contLen, void *pConn); +static void dnodeProcessQueryMsg(void *pCont, int32_t contLen, void *pConn); +static void dnodeProcessSubmitMsg(void *pCont, int32_t contLen, void *pConn); +static void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code); +static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static void *tsDnodeShellServer = NULL; static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0; -void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code) { - assert(handle != NULL); - - if (pCont == NULL || contLen == 0) { - dnodeFreeQInfo(handle); - dTrace("conn:%p, free query info", handle); - return; - } - - if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { - rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0); - dTrace("conn:%p, query msg is ignored since dnode not running", handle); - return; - } - - dTrace("conn:%p, msg:%s is received", handle, taosMsg[(int8_t)msgType]); - - if (msgType == TSDB_MSG_TYPE_QUERY) { - dnodeProcessQueryRequest(pCont, contLen, handle); - } else if (msgType == TSDB_MSG_TYPE_RETRIEVE) { - dnodeProcessRetrieveRequest(pCont, contLen, handle); - } else if (msgType == TSDB_MSG_TYPE_SUBMIT) { - dnodeProcessShellSubmitRequest(pCont, contLen, handle); - } else { - dError("conn:%p, msg:%s is not processed", handle, taosMsg[(int8_t)msgType]); - } -} - int32_t dnodeInitShell() { int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); @@ -85,6 +59,7 @@ int32_t dnodeInitShell() { rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 2000; + rpcInit.afp = dnodeRetrieveUserAuthInfo; tsDnodeShellServer = rpcOpen(&rpcInit); if (tsDnodeShellServer == NULL) { @@ -100,35 +75,69 @@ void dnodeCleanupShell() { if (tsDnodeShellServer) { rpcClose(tsDnodeShellServer); } +} + +SDnodeStatisInfo dnodeGetStatisInfo() { + SDnodeStatisInfo info = {0}; + if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) { + info.httpReqNum = httpGetReqCount(); + info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0); + info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0); + } + + return info; +} + +static void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code) { + if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { + rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0); + dTrace("query msg is ignored since dnode not running"); + return; + } - dnodeFreeQInfos(); + dTrace("conn:%p, msg:%s is received", handle, taosMsg[(int8_t)msgType]); + + if (msgType == TSDB_MSG_TYPE_QUERY) { + dnodeProcessQueryMsg(pCont, contLen, handle); + } else if (msgType == TSDB_MSG_TYPE_RETRIEVE) { + dnodeProcessRetrieveMsg(pCont, contLen, handle); + } else if (msgType == TSDB_MSG_TYPE_SUBMIT) { + dnodeProcessSubmitMsg(pCont, contLen, handle); + } else { + dError("conn:%p, msg:%s is not processed", handle, taosMsg[(int8_t)msgType]); + } + + //TODO free may be cause segmentfault + // rpcFreeCont(pCont); +} + +static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { + return TSDB_CODE_SUCCESS; } -void dnodeProcessQueryRequestCb(int code, void *pQInfo, void *pConn) { +static void dnodeProcessQueryMsgCb(int32_t code, void *pQInfo, void *pConn) { + dTrace("conn:%p, query is returned, code:%d", pConn, code); + int32_t contLen = sizeof(SQueryTableRsp); SQueryTableRsp *queryRsp = (SQueryTableRsp *) rpcMallocCont(contLen); if (queryRsp == NULL) { + rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); return; } - dTrace("conn:%p, query data, code:%d pQInfo:%p", pConn, code, pQInfo); - queryRsp->code = htonl(code); - queryRsp->qhandle = (uint64_t) (pQInfo); - + queryRsp->qhandle = htobe64((uint64_t) (pQInfo)); rpcSendResponse(pConn, TSDB_CODE_SUCCESS, queryRsp, contLen); } -static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn) { +static void dnodeProcessQueryMsg(void *pCont, int32_t contLen, void *pConn) { atomic_fetch_add_32(&tsDnodeQueryReqNum, 1); - dTrace("conn:%p, start to query data", pConn); - SQueryTableMsg *pQuery = (SQueryTableMsg *) pCont; - dnodeQueryData(pQuery, pConn, dnodeProcessQueryRequestCb); + dnodeQueryData(pQuery, pConn, dnodeProcessQueryMsgCb); } -void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) { - dTrace("conn:%p, retrieve data, code:%d", pConn, code); +void dnodeProcessRetrieveMsgCb(int32_t code, void *pQInfo, void *pConn) { + dTrace("conn:%p, retrieve is returned, code:%d", pConn, code); assert(pConn != NULL); if (code != TSDB_CODE_SUCCESS) { @@ -138,48 +147,49 @@ void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) { assert(pQInfo != NULL); int32_t contLen = dnodeGetRetrieveDataSize(pQInfo); - SRetrieveTableRsp *retrieveRsp = (SRetrieveTableRsp *) rpcMallocCont(contLen); - if (retrieveRsp == NULL) { + SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) rpcMallocCont(contLen); + if (pRetrieve == NULL) { rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 0); return; } - code = dnodeGetRetrieveData(pQInfo, retrieveRsp); + code = dnodeGetRetrieveData(pQInfo, pRetrieve); if (code != TSDB_CODE_SUCCESS) { rpcSendResponse(pConn, TSDB_CODE_INVALID_QHANDLE, 0, 0); } - retrieveRsp->numOfRows = htonl(retrieveRsp->numOfRows); - retrieveRsp->precision = htons(retrieveRsp->precision); - retrieveRsp->offset = htobe64(retrieveRsp->offset); - retrieveRsp->useconds = htobe64(retrieveRsp->useconds); + pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); + pRetrieve->precision = htons(pRetrieve->precision); + pRetrieve->offset = htobe64(pRetrieve->offset); + pRetrieve->useconds = htobe64(pRetrieve->useconds); - rpcSendResponse(pConn, TSDB_CODE_SUCCESS, retrieveRsp, contLen); + rpcSendResponse(pConn, TSDB_CODE_SUCCESS, pRetrieve, contLen); } -static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn) { - dTrace("conn:%p, start to retrieve data", pConn); - +static void dnodeProcessRetrieveMsg(void *pCont, int32_t contLen, void *pConn) { SRetrieveTableMsg *pRetrieve = (SRetrieveTableMsg *) pCont; - dnodeRetrieveData(pRetrieve, pConn, dnodeProcessRetrieveRequestCb); + pRetrieve->qhandle = htobe64(pRetrieve->qhandle); + pRetrieve->free = htons(pRetrieve->free); + + dnodeRetrieveData(pRetrieve, pConn, dnodeProcessRetrieveMsgCb); } -void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) { +void dnodeProcessSubmitMsgCb(SShellSubmitRspMsg *result, void *pConn) { assert(result != NULL); + dTrace("conn:%p, submit is returned, code:%d", pConn, result->code); if (result->code != 0) { - rpcSendResponse(pConn, result->code, 0, 0); + rpcSendResponse(pConn, result->code, NULL, 0); return; } int32_t contLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) rpcMallocCont(contLen); if (submitRsp == NULL) { - rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 0); + rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); return; } - dTrace("code:%d, numOfRows:%d affectedRows:%d", result->code, result->numOfRows, result->affectedRows); memcpy(submitRsp, result, contLen); for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) { @@ -203,19 +213,9 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) { rpcSendResponse(pConn, TSDB_CODE_SUCCESS, submitRsp, contLen); } -static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn) { - SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pCont; - dnodeWriteData(pSubmit, pConn, dnodeProcessShellSubmitRequestCb); +static void dnodeProcessSubmitMsg(void *pCont, int32_t contLen, void *pConn) { atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1); -} - -SDnodeStatisInfo dnodeGetStatisInfo() { - SDnodeStatisInfo info = {0}; - if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) { - info.httpReqNum = httpGetReqCount(); - info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0); - info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0); - } - return info; -} \ No newline at end of file + SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pCont; + dnodeWriteData(pSubmit, pConn, dnodeProcessSubmitMsgCb); +} diff --git a/src/dnode/src/dnodeVnodeMgmt.c b/src/dnode/src/dnodeVnodeMgmt.c index de05919c6a..cf43f87aaa 100644 --- a/src/dnode/src/dnodeVnodeMgmt.c +++ b/src/dnode/src/dnodeVnodeMgmt.c @@ -20,36 +20,42 @@ #include "dnodeVnodeMgmt.h" int32_t dnodeOpenVnodes() { - return 0; + dPrint("open all vnodes"); + return TSDB_CODE_SUCCESS; } int32_t dnodeCleanupVnodes() { - return 0; + dPrint("clean all vnodes"); + return TSDB_CODE_SUCCESS; } bool dnodeCheckVnodeExist(int32_t vnode) { + dPrint("vnode:%d, check vnode exist", vnode); return true; } int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode) { dPrint("vnode:%d, is created", htonl(pVnode->vnode)); - return 0; + return TSDB_CODE_SUCCESS; } int32_t dnodeDropVnode(int32_t vnode) { dPrint("vnode:%d, is dropped", vnode); - return 0; + return TSDB_CODE_SUCCESS; } -void* dnodeGetVnode(int vid) { +void* dnodeGetVnode(int32_t vnode) { + dPrint("vnode:%d, get vnode"); return NULL; } EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) { + dPrint("vnode:%d, get vnode status"); return TSDB_VN_STATUS_MASTER; } bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) { + dPrint("vnode:%d, sid:%d, check table exist"); return true; } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index daf49cc50a..cb2a286e64 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -53,6 +53,8 @@ extern "C" { #define TSDB_MSG_TYPE_SDB_SYNC_RSP 22 #define TSDB_MSG_TYPE_SDB_FORWARD 23 #define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24 +#define TSDB_MSG_TYPE_DROP_STABLE 25 +#define TSDB_MSG_TYPE_DROP_STABLE_RSP 26 #define TSDB_MSG_TYPE_CONNECT 31 #define TSDB_MSG_TYPE_CONNECT_RSP 32 #define TSDB_MSG_TYPE_CREATE_ACCT 33 @@ -261,7 +263,7 @@ typedef struct { int16_t numOfColumns; int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string int16_t reserved[16]; - SSchema schema[]; + char schema[]; } SCreateTableMsg; typedef struct { diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index eba047df10..6b184b53b6 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -86,7 +86,7 @@ int main(int argc, char* argv[]) { { printf("=== this a test for debug usage\n"); void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0); - taos_query(taos, "insert into d1.t14 values(now, 1)"); + taos_query(taos, "select * from d1.t6"); while (1) { sleep(1000); } diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 48de49601d..e2b141fb64 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -276,7 +276,7 @@ void mgmtCleanUpChildTables() { static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, void *pTagData, int32_t tagDataLen) { int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags; - int32_t contLen = sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen; + int32_t contLen = sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen; SDCreateTableMsg *pCreateTable = rpcMallocCont(contLen); if (pCreateTable == NULL) { @@ -311,7 +311,7 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou pSchema++; } - memcpy(pCreateTable + sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen); + memcpy(pCreateTable + sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen); return pCreateTable; } diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 821a0585a4..2169d6731d 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -184,6 +184,10 @@ static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contL mTrace("free vnode rsp received, thandle:%p code:%d", thandle, code); } +static void mgmtProcessDropStableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { + mTrace("drop stable rsp received, thandle:%p code:%d", thandle, code); +} + static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) { mTrace("create vnode rsp received, thandle:%p code:%d", thandle, code); if (thandle == NULL) return; @@ -241,6 +245,8 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_FREE_VNODE_RSP) { mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, pConn, code); + } else if (msgType == TSDB_MSG_TYPE_DROP_STABLE) { + mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { } else { diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index eff640c209..aaabe0bcac 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -293,7 +293,7 @@ void mgmtCleanUpNormalTables() { static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { int32_t totalCols = pTable->numOfColumns; - int32_t contLen = sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen; + int32_t contLen = sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen; SDCreateTableMsg *pCreateTable = rpcMallocCont(contLen); if (pCreateTable == NULL) { @@ -327,7 +327,7 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr pSchema++; } - memcpy(pCreateTable->data + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen); + memcpy(pCreateTable + sizeof(SDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen); return pCreateTable; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 84ae74f3f6..93cd1cbe79 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -439,6 +439,8 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { pInfo->clientIp = pConn->peerIp; pInfo->clientPort = pConn->peerPort; pInfo->serverIp = pConn->destIp; + + assert(pConn->user[0]); strcpy(pInfo->user, pConn->user); } diff --git a/src/util/src/tstring.c b/src/util/src/tstring.c index c9c06ea6e7..a5ab7fbf67 100644 --- a/src/util/src/tstring.c +++ b/src/util/src/tstring.c @@ -41,8 +41,8 @@ char *taosMsg[] = { "sync-rsp", "forward", "forward-rsp", - "", - "", + "drop-stable", + "drop-stable-rsp", "", "", "", -- GitLab