From d43c0ab26a7efce69465df0007b00256828908e0 Mon Sep 17 00:00:00 2001 From: slguan Date: Sat, 22 Feb 2020 17:09:38 +0800 Subject: [PATCH] client can send msg to server --- src/client/src/tscServer.c | 137 ++++++++++-------------- src/client/src/tscSql.c | 12 ++- src/client/src/tscSystem.c | 30 ++++-- src/dnode/src/dnodeModule.c | 4 - src/dnode/src/dnodeShell.c | 2 +- src/inc/trpc.h | 2 +- src/kit/shell/src/shellMain.c | 4 + src/mnode/src/mgmtShell.c | 22 ++-- src/plugins/monitor/src/monitorSystem.c | 3 +- src/rpc/src/rpcMain.c | 1 + src/sdb/src/sdbEngine.c | 4 + 11 files changed, 110 insertions(+), 111 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e12841a2e7..b44750050c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -177,47 +177,28 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { } int tscSendMsgToServer(SSqlObj *pSql) { - uint8_t code = TSDB_CODE_NETWORK_UNAVAIL; - - /* - * the total length of message - * rpc header + actual message body + digest - * - * the pSql object may be released automatically during insert procedure, in which the access of - * message body by using "if (pHeader->msgType & 1)" may cause the segment fault. - * - */ - - // the memory will be released by taosProcessResponse, so no memory leak here - char *pStart = rpcMallocCont(pSql->cmd.payloadLen); - if (NULL == pStart) { + char *pMsg = rpcMallocCont(pSql->cmd.payloadLen); + if (NULL == pMsg) { tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); return TSDB_CODE_CLI_OUT_OF_MEMORY; } - tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); - - if (pStart) { - /* - * this SQL object may be released by other thread due to the completion of this query even before the log - * is dumped to log file. So the signature needs to be kept in a local variable. - */ - uint64_t signature = (uint64_t) pSql->signature; - //if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart); + tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); - if (pSql->cmd.command < TSDB_SQL_MGMT) { - rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); - } else { - rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); - } + memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); - tscTrace("%p send msg code:%d sig:%p", pSql, code, signature); + pSql->ipList->ip[0] = inet_addr("192.168.0.1"); + if (pSql->cmd.command < TSDB_SQL_MGMT) { + rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql); + } else { + rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql); } - return code; + return TSDB_CODE_SUCCESS; } void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code) { + tscPrint("response is received, pCont:%p, code:%d", pCont, code); SSqlObj *pSql = (SSqlObj *)ahandle; if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); @@ -237,50 +218,49 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, return; } - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); - if (code == TSDB_CODE_NOT_ACTIVE_TABLE || code == TSDB_CODE_INVALID_TABLE_ID || - code == TSDB_CODE_INVALID_VNODE_ID || code == TSDB_CODE_NOT_ACTIVE_VNODE || - code == TSDB_CODE_NETWORK_UNAVAIL || code == TSDB_CODE_NOT_ACTIVE_SESSION || - code == TSDB_CODE_TABLE_ID_MISMATCH) { - /* - * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized, - * the virtual node may have not create table till now, so try again by using the new metermeta. - * 2. this requested table may have been removed by other client, so we need to renew the - * metermeta here. - * - * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been - * removed. So, renew metermeta and try again. - * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore. - */ - if (pCmd->command == TSDB_SQL_CONNECT) { - code = TSDB_CODE_NETWORK_UNAVAIL; - rpcFreeCont(pCont); - return; - } else if (pCmd->command == TSDB_SQL_HB) { - code = TSDB_CODE_NOT_READY; - rpcFreeCont(pCont); - return; - } else { - tscTrace("%p it shall renew meter meta, code:%d", pSql, code); + if (pCont == NULL) { + code = TSDB_CODE_NETWORK_UNAVAIL; + } else { + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); + if (code == TSDB_CODE_NOT_ACTIVE_TABLE || code == TSDB_CODE_INVALID_TABLE_ID || + code == TSDB_CODE_INVALID_VNODE_ID || code == TSDB_CODE_NOT_ACTIVE_VNODE || + code == TSDB_CODE_NETWORK_UNAVAIL || code == TSDB_CODE_NOT_ACTIVE_SESSION || + code == TSDB_CODE_TABLE_ID_MISMATCH) { + /* + * not_active_table: 1. the virtual node may fail to create table, since the procedure of create table is asynchronized, + * the virtual node may have not create table till now, so try again by using the new metermeta. + * 2. this requested table may have been removed by other client, so we need to renew the + * metermeta here. + * + * not_active_vnode: current vnode is move to other node due to node balance procedure or virtual node have been + * removed. So, renew metermeta and try again. + * not_active_session: db has been move to other node, the vnode does not exist on this dnode anymore. + */ + if (pCmd->command == TSDB_SQL_CONNECT) { + code = TSDB_CODE_NETWORK_UNAVAIL; + rpcFreeCont(pCont); + return; + } else if (pCmd->command == TSDB_SQL_HB) { + code = TSDB_CODE_NOT_READY; + rpcFreeCont(pCont); + return; + } else { + tscTrace("%p it shall renew meter meta, code:%d", pSql, code); - pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; - pSql->res.code = (uint8_t)code; // keep the previous error code + pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; + pSql->res.code = (uint8_t) code; // keep the previous error code - code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name); + code = tscRenewMeterMeta(pSql, pMeterMetaInfo->name); - if (pMeterMetaInfo->pMeterMeta) { - tscSendMsgToServer(pSql); - rpcFreeCont(pCont); - return; + if (pMeterMetaInfo->pMeterMeta) { + tscSendMsgToServer(pSql); + rpcFreeCont(pCont); + return; + } } } } - if (code != TSDB_CODE_SUCCESS){ // for other error set and return to invoker - rpcFreeCont(pCont); - return; - } - pSql->retry = 0; if (pSql->fp == NULL) tsem_wait(&pSql->emptyRspSem); @@ -2359,27 +2339,24 @@ int tscProcessRetrieveMetricRsp(SSqlObj *pSql) { int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, 0); } int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - SCMConnectMsg *pConnect; - char * pMsg, *pStart; - - SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; - pMsg = pCmd->payload + tsRpcHeadSize; - pStart = pMsg; + SSqlCmd *pCmd = &pSql->cmd; + pCmd->msgType = TSDB_MSG_TYPE_CONNECT; + pCmd->payloadLen = sizeof(SCMConnectMsg); - pConnect = (SCMConnectMsg *)pMsg; + if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) { + tscError("%p failed to malloc for query msg", pSql); + return TSDB_CODE_CLI_OUT_OF_MEMORY; + } + + SCMConnectMsg *pConnect = (SCMConnectMsg*)pCmd->payload; char *db; // ugly code to move the space db = strstr(pObj->db, TS_PATH_DELIMITER); db = (db == NULL) ? pObj->db : db + 1; strcpy(pConnect->db, db); - strcpy(pConnect->clientVersion, version); - - pMsg += sizeof(SCMConnectMsg); - - pCmd->payloadLen = pMsg - pStart; - pCmd->msgType = TSDB_MSG_TYPE_CONNECT; + strcpy(pConnect->msgVersion, ""); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 976cd64ba8..c432fb177c 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -64,11 +64,17 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const } if (ip && ip[0]) { - tscMgmtIpList.numOfIps = 2; tscMgmtIpList.index = 0; + tscMgmtIpList.port = tsMgmtShellPort; + tscMgmtIpList.numOfIps = 1; tscMgmtIpList.ip[0] = inet_addr(ip); - tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); - if (tsSecondIp[0]) { + + if (tsMasterIp[0] && strcmp(ip, tsMasterIp) != 0) { + tscMgmtIpList.numOfIps = 2; + tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); + } + + if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) { tscMgmtIpList.numOfIps = 3; tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index f411698bb7..fadcc426fe 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -101,7 +101,7 @@ void taos_init_imp() { tscMgmtIpList.numOfIps = 1; tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); - if (tsSecondIp[0]) { + if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) { tscMgmtIpList.numOfIps = 2; tscMgmtIpList.ip[1] = inet_addr(tsSecondIp); } @@ -125,13 +125,13 @@ void taos_init_imp() { } memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localIp = tsLocalIp; + rpcInit.localIp = "0.0.0.0";//tsLocalIp; rpcInit.localPort = 0; rpcInit.label = "TSC-vnode"; rpcInit.numOfThreads = tscNumOfThreads; - rpcInit.afp = tscProcessMsgFromServer; + rpcInit.cfp = tscProcessMsgFromServer; rpcInit.sessions = tsMaxVnodeConnections; - rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); + rpcInit.connType = TAOS_CONN_CLIENT; pVnodeConn = rpcOpen(&rpcInit); if (pVnodeConn == NULL) { tscError("failed to init connection to vnode"); @@ -139,13 +139,21 @@ void taos_init_imp() { } memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localIp = tsLocalIp; + rpcInit.localIp = "0.0.0.0";//tsLocalIp; rpcInit.localPort = 0; rpcInit.label = "TSC-mgmt"; rpcInit.numOfThreads = 1; - rpcInit.afp = tscProcessMsgFromServer; + rpcInit.cfp = tscProcessMsgFromServer; rpcInit.sessions = tsMaxMgmtConnections; - rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.idleTime = 2000; + rpcInit.user = "root"; + rpcInit.ckey = "key"; + + char secret[32] = {0}; + taosEncryptPass((uint8_t *)"taosdata", strlen("taosdata"), secret); + rpcInit.secret = secret; + pTscMgmtConn = rpcOpen(&rpcInit); if (pTscMgmtConn == NULL) { tscError("failed to init connection to mgmt"); @@ -319,10 +327,10 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { assert(cfg != NULL); if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { - if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) { - tscError("only 'tcp' or 'udp' allowed for configuring the socket type"); - return -1; - } +// if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) { +// tscError("only 'tcp' or 'udp' allowed for configuring the socket type"); +// return -1; +// } strncpy(tsSocketType, pStr, tListLen(tsSocketType)); cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 70754cc6eb..96d0db5f6a 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -121,10 +121,6 @@ void dnodeStartModulesImp() { } } } - - if (tsModule[TSDB_MOD_MGMT].num != 0 && tsModule[TSDB_MOD_MGMT].cleanUpFp) { - (*tsModule[TSDB_MOD_MGMT].cleanUpFp)(); - } } void (*dnodeStartModules)() = dnodeStartModulesImp; diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 164bc80a35..c7747f112e 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -85,7 +85,7 @@ int32_t dnodeInitShell() { rpcInit.numOfThreads = numOfThreads; rpcInit.cfp = dnodeProcessMsgFromShell; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; - rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S(); + rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 2000; tsDnodeShellServer = rpcOpen(&rpcInit); diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 121bb18382..4cf94a993b 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -39,7 +39,7 @@ typedef struct { uint32_t clientIp; uint16_t clientPort; uint32_t serverIp; - char *user; + char user[TSDB_USER_LEN]; } SRpcConnInfo; typedef struct { diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index a7b7e8383b..5083eb8ee1 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -81,6 +81,10 @@ struct arguments args = { */ int main(int argc, char* argv[]) { /*setlocale(LC_ALL, "en_US.UTF-8"); */ + // + void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0); + printf("ok\n"); + // if (!checkVersion()) { exit(EXIT_FAILURE); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 1c3158fc2f..6b80421ceb 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -87,15 +87,17 @@ int32_t mgmtInitShell() { int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0; if (numOfThreads < 1) numOfThreads = 1; - memset(&rpcInit, 0, sizeof(rpcInit)); + //TODO + numOfThreads = 1; - rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;; rpcInit.localPort = tsMgmtShellPort; rpcInit.label = "MND-shell"; rpcInit.numOfThreads = numOfThreads; rpcInit.cfp = mgmtProcessMsgFromShell; rpcInit.sessions = tsMaxShellConns; - rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S(); + rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 2000; rpcInit.afp = mgmtRetriveUserAuthInfo; @@ -1237,17 +1239,17 @@ int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secr return TSDB_CODE_INVALID_USER; } - *spi = 1; + *spi = 0; *encrypt = 0; *ckey = 0; memcpy(secret, pUser->pass, TSDB_KEY_LEN); return TSDB_CODE_SUCCESS; } -static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *ahandle) { +static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *thandle) { SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) pCont; SRpcConnInfo connInfo; - rpcGetConnInfo(ahandle, &connInfo); + rpcGetConnInfo(thandle, &connInfo); int32_t code; SUserObj *pUser = mgmtGetUser(connInfo.user); @@ -1311,17 +1313,15 @@ static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *ahandle connect_over: if (code != TSDB_CODE_SUCCESS) { mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - rpcSendResponse(ahandle, code, NULL, 0); + rpcSendResponse(thandle, code, NULL, 0); } else { mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); - rpcSendResponse(ahandle, code, pConnectRsp, sizeof(pConnectRsp)); + rpcSendResponse(thandle, code, pConnectRsp, sizeof(SCMConnectRsp)); } - rpcFreeCont(pCont); return code; } - /** * check if we need to add mgmtProcessMeterMetaMsg into tranQueue, which will be executed one-by-one. */ @@ -1354,6 +1354,7 @@ static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *a if (sdbGetRunStatus() != SDB_STATUS_SERVING) { mTrace("shell msg is ignored since SDB is not ready"); rpcSendResponse(ahandle, TSDB_CODE_NOT_READY, NULL, 0); + rpcFreeCont(pCont); return; } @@ -1366,6 +1367,7 @@ static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *a mError("%s from shell is not processed", taosMsg[(int8_t)type]); } } + rpcFreeCont(pCont); } void mgmtInitProcessShellMsg() { diff --git a/src/plugins/monitor/src/monitorSystem.c b/src/plugins/monitor/src/monitorSystem.c index f8653e4320..5a42d66493 100644 --- a/src/plugins/monitor/src/monitorSystem.c +++ b/src/plugins/monitor/src/monitorSystem.c @@ -216,7 +216,8 @@ void monitorInitDatabaseCb(void *param, TAOS_RES *result, int code) { if (-code == TSDB_CODE_TABLE_ALREADY_EXIST || -code == TSDB_CODE_DB_ALREADY_EXIST || code >= 0) { monitorTrace("monitor:%p, sql success, code:%d, %s", monitor->conn, code, monitor->sql); if (monitor->cmdIndex == MONITOR_CMD_CREATE_TB_LOG) { - taosLogFp = monitorSaveLog; + //TODO + //taosLogFp = monitorSaveLog; taosLogSqlFp = monitorExecuteSQL; taosLogAcctFp = monitorSaveAcctLog; monitorLPrint("dnode:%s is started", tsPrivateIp); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 8ce625f6fd..caa1ea5c75 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -796,6 +796,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { + SRpcInfo *pRpc = pConn->pRpc; pHead = rpcDecompressRpcMsg(pHead); diff --git a/src/sdb/src/sdbEngine.c b/src/sdb/src/sdbEngine.c index 96ec56ae1a..4b000a30eb 100644 --- a/src/sdb/src/sdbEngine.c +++ b/src/sdb/src/sdbEngine.c @@ -59,6 +59,10 @@ int64_t sdbGetVersion() { }; int32_t sdbGetRunStatus() { + if (!tsIsCluster) { + return SDB_STATUS_SERVING; + } + if (sdbInited == NULL) { return SDB_STATUS_OFFLINE; } -- GitLab