diff --git a/.travis.yml b/.travis.yml index da0c47ea56292ab3a9aa5a6fadde133e0aa26984..9bc576dcf9de328032037162f0c882e7ccbf4057 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,7 +50,7 @@ matrix: ./test-all.sh $TRAVIS_EVENT_TYPE || travis_terminate $? cd ${TRAVIS_BUILD_DIR}/tests/pytest - ./valgrind-test.sh -g 2>&1 > mem-error-out.txt + ./valgrind-test.sh 2>&1 > mem-error-out.txt sleep 1 # Color setting diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index dbbec01329c3b6d6c2089e19225003b61512f0fb..715d76e072cae10cc266ec9182b9fda806962e83 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -264,6 +264,7 @@ bool hasMoreVnodesToTry(SSqlObj *pSql); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()); +int tscSetMgmtIpListFromCfg(const char *first, const char *second); void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 28c5ae9ca0fbba597be7890abb777a3d3734d144..7324363c9d0a9004999883f13b3c8136b6b0ad8e 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -27,6 +27,7 @@ #include "ttimer.h" #include "tutil.h" #include "tscLog.h" +#include "qsqltype.h" #define TSC_MGMT_VNODE 999 @@ -67,7 +68,7 @@ void tscPrintMgmtIp() { } } -void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) { +void tscSetMgmtIpList(SRpcIpSet *pIpList) { tscMgmtIpSet.numOfIps = pIpList->numOfIps; tscMgmtIpSet.inUse = pIpList->inUse; for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) { @@ -75,16 +76,6 @@ void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) { } } -void tscSetMgmtIpListFromEdge() { - if (tscMgmtIpSet.numOfIps != 1) { - tscMgmtIpSet.numOfIps = 1; - tscMgmtIpSet.inUse = 0; - taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[0], &tscMgmtIpSet.port[0]); - tscTrace("edge mgmt IP list:"); - tscPrintMgmtIp(); - } -} - void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { tscMgmtIpSet = *pIpSet; tscTrace("mgmt IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse); @@ -93,18 +84,6 @@ void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { } } -void tscSetMgmtIpList(SRpcIpSet *pIpList) { - /* - * The iplist returned by the cluster edition is the current management nodes - * and the iplist returned by the edge edition is empty - */ - if (pIpList->numOfIps != 0) { - tscSetMgmtIpListFromCluster(pIpList); - } else { - tscSetMgmtIpListFromEdge(); - } -} - /* * For each management node, try twice at least in case of poor network situation. * If the client start to connect to a non-management node from the client, and the first retry may fail due to @@ -132,7 +111,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { if (code == 0) { SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp; SRpcIpSet * pIpList = &pRsp->ipList; - tscSetMgmtIpList(pIpList); + if (pIpList->numOfIps > 0) + tscSetMgmtIpList(pIpList); if (pRsp->killConnection) { tscKillConnection(pObj); @@ -207,7 +187,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); } - tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); + // tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); SRpcMsg rpcMsg = { .msgType = pSql->cmd.msgType, @@ -235,7 +215,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; STscObj *pObj = pSql->pTscObj; - tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont); + // tscTrace("%p msg:%s is received from server", pSql, taosMsg[rpcMsg->msgType]); if (pSql->freed || pObj->signature != pObj) { tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, @@ -340,10 +320,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks); pRes->numOfRows += pMsg->affectedRows; - tscTrace("%p cmd:%d code:%s, inserted rows:%d, rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), - pMsg->affectedRows, pRes->rspLen); + tscTrace("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command], + tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); } else { - tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen); + tscTrace("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen); } } @@ -426,7 +406,7 @@ int tscProcessSql(SSqlObj *pSql) { assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0); } - tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type); + tscTrace("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type); if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL if (pTableMetaInfo == NULL) { pSql->res.code = TSDB_CODE_OTHERS; @@ -2225,7 +2205,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) { assert(len <= tListLen(pObj->db)); strncpy(pObj->db, temp, tListLen(pObj->db)); - tscSetMgmtIpList(&pConnect->ipList); + if (pConnect->ipList.numOfIps > 0) + tscSetMgmtIpList(&pConnect->ipList); strcpy(pObj->sversion, pConnect->serverVersion); pObj->writeAuth = pConnect->writeAuth; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 87292f4fe659adf2c0d54ad08143fbcf2ba36126..d8ec104a5072cd9331e8dfa088f3cd05f0ed1e6e 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -65,32 +65,18 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con terrno = TSDB_CODE_INVALID_PASS; return NULL; } - + + if (ip) { + if (tscSetMgmtIpListFromCfg(ip, NULL) < 0) return NULL; + if (port) tscMgmtIpSet.port[0] = port; + } + void *pDnodeConn = NULL; if (tscInitRpc(user, pass, &pDnodeConn) != 0) { terrno = TSDB_CODE_NETWORK_UNAVAIL; return NULL; } - - tscMgmtIpSet.numOfIps = 0; - - if (ip && ip[0]) { - tscMgmtIpSet.inUse = 0; - tscMgmtIpSet.numOfIps = 1; - strcpy(tscMgmtIpSet.fqdn[0], ip); - tscMgmtIpSet.port[0] = port? port: tsDnodeShellPort; - } else { - if (tsFirst[0] != 0) { - taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); - tscMgmtIpSet.numOfIps++; - } - - if (tsSecond[0] != 0) { - taosGetFqdnPortFromEp(tsSecond, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); - tscMgmtIpSet.numOfIps++; - } - } - + STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj)); if (NULL == pObj) { terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 5d8652a63140aa6039008c58ac83af9f7588ca4d..5d56fef1e9c9e3ea1a8f4ba6215fcd816b931011 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -23,6 +23,7 @@ #include "tutil.h" #include "tsched.h" #include "tscLog.h" +#include "tscUtil.h" #include "tsclient.h" #include "tglobal.h" #include "tconfig.h" @@ -114,14 +115,10 @@ void taos_init_imp() { taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); } - tscMgmtIpSet.inUse = 0; - tscMgmtIpSet.numOfIps = 1; - taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[0], &tscMgmtIpSet.port[0]); - - if (tsSecond[0] && strcmp(tsSecond, tsFirst) != 0) { - tscMgmtIpSet.numOfIps = 2; - taosGetFqdnPortFromEp(tsSecond, tscMgmtIpSet.fqdn[1], &tscMgmtIpSet.port[1]); - } + if (tscSetMgmtIpListFromCfg(tsFirst, tsSecond) < 0) { + tscError("failed to init mgmt IP list"); + return; + } tscInitMsgsFp(); int queueSize = tsMaxVnodeConnections + tsMaxMeterConnections + tsMaxMgmtConnections + tsMaxMgmtConnections; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 1d1e06d3a9a5d0bac0e4eee20f35345d65365de6..4667606aa82f2c05873a2d16d84bad1a282a53b4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2163,3 +2163,33 @@ char* strdup_throw(const char* str) { } return p; } + +int tscSetMgmtIpListFromCfg(const char *first, const char *second) { + tscMgmtIpSet.numOfIps = 0; + tscMgmtIpSet.inUse = 0; + + if (first && first[0] != 0) { + if (strlen(first) >= TSDB_FQDN_LEN) { + terrno = TSDB_CODE_INVALID_FQDN; + return -1; + } + taosGetFqdnPortFromEp(first, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); + tscMgmtIpSet.numOfIps++; + } + + if (second && second[0] != 0) { + if (strlen(second) >= TSDB_FQDN_LEN) { + terrno = TSDB_CODE_INVALID_FQDN; + return -1; + } + taosGetFqdnPortFromEp(second, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); + tscMgmtIpSet.numOfIps++; + } + + if ( tscMgmtIpSet.numOfIps == 0) { + terrno = TSDB_CODE_INVALID_FQDN; + return -1; + } + + return 0; +} diff --git a/src/common/inc/qsqltype.h b/src/common/inc/qsqltype.h new file mode 100644 index 0000000000000000000000000000000000000000..d1e5bcaa523bfceb3f89b870acabf152c0f8ebf1 --- /dev/null +++ b/src/common/inc/qsqltype.h @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_QSQLCMD_H +#define TDENGINE_QSQLCMD_H + +#ifdef __cplusplus +extern "C" { +#endif + +// sql type + +#ifdef TSDB_SQL_C +#define TSDB_DEFINE_SQL_TYPE( name, msg ) msg, +char *sqlCmd[] = { + "null", +#else +#define TSDB_DEFINE_SQL_TYPE( name, msg ) name, +enum { + TSDB_SQL_NULL = 0, +#endif + + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" ) + + // the SQL below is for mgmt node + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_ACCT, "create-acct" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_USER, "create-user" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_ACCT, "drop-acct" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_USER, "drop-user" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_USER, "alter-user" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_ACCT, "alter-acct" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DNODE, "create-dnode" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DNODE, "drop-dnode" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_DNODE, "cfg-dnode" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_MNODE, "cfg-mnode" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW, "show" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE, "retrieve" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_QUERY, "kill-query" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_STREAM, "kill-stream" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_CONNECTION, "kill-connection" ) + + // SQL below is for read operation + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_READ, "read" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CONNECT, "connect" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_USE_DB, "use-db" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_META, "meta" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_STABLEVGROUP, "stable-vgroup" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MULTI_META, "multi-meta" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_HB, "heart-beat" ) + + // SQL below for client local + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DESCRIBE_TABLE, "describe-table" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_LOCALMERGE, "retrieve-localmerge" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_TABLE_JOIN_RETRIEVE, "join-retrieve" ) + + /* + * build empty result instead of accessing dnode to fetch result + * reset the client cache + */ + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_EMPTY_RESULT, "retrieve-empty-result" ) + + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RESET_CACHE, "reset-cache" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SERV_STATUS, "serv-status" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CURRENT_DB, "current-db" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SERV_VERSION, "serv-version" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CLI_VERSION, "cli-version" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CURRENT_USER, "current-user ") + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_LOCAL, "cfg-local" ) + + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MAX, "max" ) +}; + +// create table operation type +enum TSQL_TYPE { + TSQL_CREATE_TABLE = 0x1, + TSQL_CREATE_STABLE = 0x2, + TSQL_CREATE_TABLE_FROM_STABLE = 0x3, + TSQL_CREATE_STREAM = 0x4, +}; + +extern char *sqlCmd[]; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_QSQLCMD_H diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index b077f40945bc3661a214d344249d083b2f2ced2e..da8f3cd1e1971518d744cbe69ac05b187a6aa412 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -175,7 +175,7 @@ void taosInitGlobalCfg(); bool taosCheckGlobalCfg(); void taosSetAllDebugFlag(); bool taosCfgDynamicOptions(char *msg); -int taosGetFqdnPortFromEp(char *ep, char *fqdn, uint16_t *port); +int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port); #ifdef __cplusplus } diff --git a/src/common/src/sqlcmdstr.c b/src/common/src/sqlcmdstr.c new file mode 100644 index 0000000000000000000000000000000000000000..8584ba79761835989ab7a3e24d88824c14d107c5 --- /dev/null +++ b/src/common/src/sqlcmdstr.c @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define TSDB_SQL_C + +#include "qsqltype.h" diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 90637265b322b6d429c2c39f6b2f2935acee10ca..18d8c9ebe2733218dda9a24eefd24274a6265f1d 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -61,7 +61,7 @@ int32_t tscEmbedded = 0; */ int64_t tsMsPerDay[] = {86400000L, 86400000000L}; -char tsFirst[TSDB_FQDN_LEN] = {0}; +char tsFirst[TSDB_FQDN_LEN] = {0}; char tsSecond[TSDB_FQDN_LEN] = {0}; char tsArbitrator[TSDB_FQDN_LEN] = {0}; char tsLocalEp[TSDB_FQDN_LEN] = {0}; // Local End Point, hostname:port @@ -1252,7 +1252,7 @@ bool taosCheckGlobalCfg() { return true; } -int taosGetFqdnPortFromEp(char *ep, char *fqdn, uint16_t *port) { +int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) { *port = 0; strcpy(fqdn, ep); diff --git a/src/dnode/inc/dnodeMain.h b/src/dnode/inc/dnodeMain.h new file mode 100644 index 0000000000000000000000000000000000000000..df7698ffc3246fbb419c554123b98ed74dbef9a7 --- /dev/null +++ b/src/dnode/inc/dnodeMain.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_MAIN_H +#define TDENGINE_DNODE_MAIN_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dnodeInitSystem(); +void dnodeCleanUpSystem(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 7949a629663455e528449c748bbd992106c4e43d..2f693c61fbf5df441b57a130f7beab1cc77cd81e 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -16,8 +16,6 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taos.h" -#include "tglobal.h" -#include "trpc.h" #include "tutil.h" #include "tconfig.h" #include "tglobal.h" @@ -29,112 +27,14 @@ #include "dnodeVRead.h" #include "dnodeShell.h" #include "dnodeVWrite.h" -#include "tgrant.h" -static int32_t dnodeInitSystem(); static int32_t dnodeInitStorage(); -extern void grantParseParameter(); static void dnodeCleanupStorage(); -static void dnodeCleanUpSystem(); static void dnodeSetRunStatus(SDnodeRunStatus status); -static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context); static void dnodeCheckDataDirOpenned(char *dir); static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED; -int32_t main(int32_t argc, char *argv[]) { - // Set global configuration file - for (int32_t i = 1; i < argc; ++i) { - if (strcmp(argv[i], "-c") == 0) { - if (i < argc - 1) { - strcpy(configDir, argv[++i]); - } else { - printf("'-c' requires a parameter, default:%s\n", configDir); - exit(EXIT_FAILURE); - } - } else if (strcmp(argv[i], "-V") == 0) { -#ifdef _SYNC - char *versionStr = "enterprise"; -#else - char *versionStr = "community"; -#endif - printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version); - printf("gitinfo: %s\n", gitinfo); - printf("gitinfoI: %s\n", gitinfoOfInternal); - printf("buildinfo: %s\n", buildinfo); - exit(EXIT_SUCCESS); - } else if (strcmp(argv[i], "-k") == 0) { - grantParseParameter(); - exit(EXIT_SUCCESS); - } -#ifdef TAOS_MEM_CHECK - else if (strcmp(argv[i], "--alloc-random-fail") == 0) { - if ((i < argc - 1) && (argv[i + 1][0] != '-')) { - taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, argv[++i], true); - } else { - taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, NULL, true); - } - } else if (strcmp(argv[i], "--detect-mem-leak") == 0) { - if ((i < argc - 1) && (argv[i + 1][0] != '-')) { - taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, argv[++i], true); - } else { - taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, NULL, true); - } - } -#endif - } - - /* Set termination handler. */ - struct sigaction act = {{0}}; - act.sa_flags = SA_SIGINFO; - act.sa_sigaction = signal_handler; - sigaction(SIGTERM, &act, NULL); - sigaction(SIGHUP, &act, NULL); - sigaction(SIGINT, &act, NULL); - sigaction(SIGUSR1, &act, NULL); - sigaction(SIGUSR2, &act, NULL); - - // Open /var/log/syslog file to record information. - openlog("TDengine:", LOG_PID | LOG_CONS | LOG_NDELAY, LOG_LOCAL1); - syslog(LOG_INFO, "Starting TDengine service..."); - - // Initialize the system - if (dnodeInitSystem() < 0) { - syslog(LOG_ERR, "Error initialize TDengine system"); - closelog(); - - dnodeCleanUpSystem(); - exit(EXIT_FAILURE); - } - - syslog(LOG_INFO, "Started TDengine service successfully."); - - while (1) { - sleep(1000); - } -} - -static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { - if (signum == SIGUSR1) { - taosCfgDynamicOptions("debugFlag 135"); - return; - } - if (signum == SIGUSR2) { - taosCfgDynamicOptions("resetlog"); - return; - } - syslog(LOG_INFO, "Shut down signal is %d", signum); - syslog(LOG_INFO, "Shutting down TDengine service..."); - // clean the system. - dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); - dnodeCleanUpSystem(); - // close the syslog - syslog(LOG_INFO, "Shut down TDengine service successfully"); - dPrint("TDengine is shut down!"); - closelog(); - exit(EXIT_SUCCESS); -} - -static int32_t dnodeInitSystem() { +int32_t dnodeInitSystem() { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE); tscEmbedded = 1; taosResolveCRC(); @@ -180,7 +80,7 @@ static int32_t dnodeInitSystem() { return 0; } -static void dnodeCleanUpSystem() { +void dnodeCleanUpSystem() { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeCleanupShell(); diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c new file mode 100644 index 0000000000000000000000000000000000000000..683328db29e4a4e072549d91e5e9cc7da9e6e53d --- /dev/null +++ b/src/dnode/src/dnodeSystem.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tgrant.h" +#include "tutil.h" +#include "tglobal.h" +#include "dnodeInt.h" +#include "dnodeMain.h" + +static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context); + +int32_t main(int32_t argc, char *argv[]) { + // Set global configuration file + for (int32_t i = 1; i < argc; ++i) { + if (strcmp(argv[i], "-c") == 0) { + if (i < argc - 1) { + strcpy(configDir, argv[++i]); + } else { + printf("'-c' requires a parameter, default:%s\n", configDir); + exit(EXIT_FAILURE); + } + } else if (strcmp(argv[i], "-V") == 0) { +#ifdef _SYNC + char *versionStr = "enterprise"; +#else + char *versionStr = "community"; +#endif + printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version); + printf("gitinfo: %s\n", gitinfo); + printf("gitinfoI: %s\n", gitinfoOfInternal); + printf("buildinfo: %s\n", buildinfo); + exit(EXIT_SUCCESS); + } else if (strcmp(argv[i], "-k") == 0) { + grantParseParameter(); + exit(EXIT_SUCCESS); + } +#ifdef TAOS_MEM_CHECK + else if (strcmp(argv[i], "--alloc-random-fail") == 0) { + if ((i < argc - 1) && (argv[i + 1][0] != '-')) { + taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, argv[++i], true); + } else { + taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, NULL, true); + } + } else if (strcmp(argv[i], "--detect-mem-leak") == 0) { + if ((i < argc - 1) && (argv[i + 1][0] != '-')) { + taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, argv[++i], true); + } else { + taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, NULL, true); + } + } +#endif + } + + /* Set termination handler. */ + struct sigaction act = {{0}}; + act.sa_flags = SA_SIGINFO; + act.sa_sigaction = signal_handler; + sigaction(SIGTERM, &act, NULL); + sigaction(SIGHUP, &act, NULL); + sigaction(SIGINT, &act, NULL); + sigaction(SIGUSR1, &act, NULL); + sigaction(SIGUSR2, &act, NULL); + + // Open /var/log/syslog file to record information. + openlog("TDengine:", LOG_PID | LOG_CONS | LOG_NDELAY, LOG_LOCAL1); + syslog(LOG_INFO, "Starting TDengine service..."); + + // Initialize the system + if (dnodeInitSystem() < 0) { + syslog(LOG_ERR, "Error initialize TDengine system"); + closelog(); + + dnodeCleanUpSystem(); + exit(EXIT_FAILURE); + } + + syslog(LOG_INFO, "Started TDengine service successfully."); + + while (1) { + sleep(1000); + } +} + +static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { + if (signum == SIGUSR1) { + taosCfgDynamicOptions("debugFlag 135"); + return; + } + if (signum == SIGUSR2) { + taosCfgDynamicOptions("resetlog"); + return; + } + syslog(LOG_INFO, "Shut down signal is %d", signum); + syslog(LOG_INFO, "Shutting down TDengine service..."); + // clean the system. + dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid); + dnodeCleanUpSystem(); + // close the syslog + syslog(LOG_INFO, "Shut down TDengine service successfully"); + dPrint("TDengine is shut down!"); + closelog(); + exit(EXIT_SUCCESS); +} diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index aaf71838bfe2cc3916f10c579bd89e34acb89553..aa8cd997858c7ad27a02d751b00e765fc176d327 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -92,8 +92,6 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { char *pCont = (char *) pMsg->pCont; void *pVnode; - dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle); - while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); @@ -214,6 +212,7 @@ static void *dnodeProcessReadQueue(void *param) { continue; } + dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); dnodeSendRpcReadRsp(pVnode, pReadMsg, code); taosFreeQitem(pReadMsg); diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 5de4c16c500258fe5f3d5d7588bc8782530d8791..879082f223e27537f8f0dbd8a4e6cc53fce5117b 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -200,6 +200,7 @@ static void *dnodeProcessWriteQueue(void *param) { pHead->msgType = pWrite->rpcMsg.msgType; pHead->version = 0; pHead->len = pWrite->contLen; + dTrace("%p, msg:%s will be processed", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); } else { pHead = (SWalHead *)item; } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 61e82720b123a1249c6bfeefa99366b99a58a8a7..ba015d7bbfbae0d3a386359bcd918316888ac307 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -218,7 +218,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_LOCALE_LEN 64 #define TSDB_TIMEZONE_LEN 64 -#define TSDB_FQDN_LEN 72 +#define TSDB_FQDN_LEN 256 #define TSDB_IPv4ADDR_LEN 16 #define TSDB_FILENAME_LEN 128 #define TSDB_METER_VNODE_BITS 20 @@ -234,7 +234,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE #define TSDB_PAYLOAD_SIZE (TSDB_DEFAULT_PKT_SIZE - 100) -#define TSDB_DEFAULT_PAYLOAD_SIZE 1024 // default payload size +#define TSDB_DEFAULT_PAYLOAD_SIZE 2048 // default payload size #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_CQ_SQL_SIZE 1024 #define TSDB_MAX_VNODES 256 diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index f9bc1404fe2895ac26b43ec08ffb0a8bc21df67d..1390d66113ac9bd5f184947dd27ec4ebf4baaac6 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -170,6 +170,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QHANDLE, 0, 459, "invalid handle" TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 460, "query cancelled") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 461, "invalid ie") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 462, "invalid value") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FQDN, 0, 463, "invalid FQDN") // others TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 500, "invalid file format") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index e58bcf52371084ad254ce9d69c1ab87d99382bfa..27862ef4894a3ff9914b95fda110789f18205cfc 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -187,13 +187,13 @@ typedef struct SMsgHead { // Submit message for one table typedef struct SSubmitBlk { - int64_t uid; // table unique id - int32_t tid; // table id - int32_t padding; // TODO just for padding here - int32_t sversion; // data schema version - int32_t len; // data part length, not including the SSubmitBlk head - int16_t numOfRows; // total number of rows in current submit block - char data[]; + uint64_t uid; // table unique id + int32_t tid; // table id + int32_t padding; // TODO just for padding here + int32_t sversion; // data schema version + int32_t len; // data part length, not including the SSubmitBlk head + int16_t numOfRows; // total number of rows in current submit block + char data[]; } SSubmitBlk; // Submit message for this TSDB @@ -236,6 +236,7 @@ typedef struct { int16_t numOfTags; int32_t sid; int32_t sversion; + int32_t tversion; int32_t tagDataLen; int32_t sqlDataLen; uint64_t uid; @@ -327,9 +328,9 @@ typedef struct { } SMDDropTableMsg; typedef struct { - int32_t contLen; - int32_t vgId; - int64_t uid; + int32_t contLen; + int32_t vgId; + uint64_t uid; char tableId[TSDB_TABLE_ID_LEN + 1]; } SMDDropSTableMsg; @@ -404,9 +405,9 @@ typedef struct SColumnInfo { } SColumnInfo; typedef struct STableIdInfo { - int64_t uid; - int32_t tid; - TSKEY key; // last accessed ts, for subscription + uint64_t uid; + int32_t tid; + TSKEY key; // last accessed ts, for subscription } STableIdInfo; typedef struct STimeWindow { @@ -632,6 +633,7 @@ typedef struct STableMetaMsg { uint8_t tableType; int16_t numOfColumns; int16_t sversion; + int16_t tversion; int32_t sid; uint64_t uid; SCMVgroupInfo vgroup; diff --git a/src/inc/trpc.h b/src/inc/trpc.h index eff210433f7d7bcc2f4a5ad1d12bd88ed59581be..16223b813a1a738f6b0213e8d6c4f62cd0c2290c 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -48,6 +48,7 @@ typedef struct { int contLen; int32_t code; void *handle; + void *ahandle; //app handle set by client, for debug purpose } SRpcMsg; typedef struct { diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index 381c6f1dbfcc9f4351919b7602a26be206248f0b..549b0ef9775c073847c9aa7da8054509894cad3a 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -41,14 +41,13 @@ // dynamic config timestamp width according to maximum time precision extern int32_t TIMESTAMP_OUTPUT_LENGTH; -typedef struct History History; -struct History { +typedef struct SShellHistory { char* hist[MAX_HISTORY_SIZE]; int hstart; int hend; -}; +} SShellHistory; -struct arguments { +typedef struct SShellArguments { char* host; char* password; char* user; @@ -62,11 +61,11 @@ struct arguments { char* commands; int abort; int port; -}; +} SShellArguments; /**************** Function declarations ****************/ -extern void shellParseArgument(int argc, char* argv[], struct arguments* arguments); -extern TAOS* shellInit(struct arguments* args); +extern void shellParseArgument(int argc, char* argv[], SShellArguments* arguments); +extern TAOS* shellInit(SShellArguments* args); extern void* shellLoopQuery(void* arg); extern void taos_error(TAOS* con); extern int regex_match(const char* s, const char* reg, int cflags); @@ -76,7 +75,7 @@ void shellRunCommandOnServer(TAOS* con, char command[]); void read_history(); void write_history(); void source_file(TAOS* con, char* fptr); -void source_dir(TAOS* con, struct arguments* args); +void source_dir(TAOS* con, SShellArguments* args); void get_history_path(char* history); void cleanup_handler(void* arg); void exitShell(); @@ -89,12 +88,12 @@ int isCommentLine(char *line); extern char PROMPT_HEADER[]; extern char CONTINUE_PROMPT[]; extern int prompt_size; -extern History history; +extern SShellHistory history; extern struct termios oldtio; extern void set_terminal_mode(); extern int get_old_terminal_mode(struct termios* tio); -extern void reset_terminal_mode(); -extern struct arguments args; -extern TAOS_RES* result; +extern void reset_terminal_mode(); +extern SShellArguments args; +extern TAOS_RES* result; #endif diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 69475371dcf1fee1ae1296eec9b344cdc92290a5..dbb05f6a359fc501d7561444dc032ffa37462877 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -33,12 +33,12 @@ char PROMPT_HEADER[] = "taos> "; char CONTINUE_PROMPT[] = " -> "; int prompt_size = 6; TAOS_RES *result = NULL; -History history; +SShellHistory history; /* * FUNCTION: Initialize the shell. */ -TAOS *shellInit(struct arguments *args) { +TAOS *shellInit(SShellArguments *args) { printf("\n"); printf(CLIENT_VERSION, tsOsName, taos_get_client_info()); fflush(stdout); diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c index b29b96379b5def3ddb60e58400b58c721dc27b7b..2cbd07db4bdee34a52edd1573dd0aa923aec35ad 100644 --- a/src/kit/shell/src/shellImport.c +++ b/src/kit/shell/src/shellImport.c @@ -221,7 +221,7 @@ void* shellImportThreadFp(void *arg) return NULL; } -static void shellRunImportThreads(struct arguments* args) +static void shellRunImportThreads(SShellArguments* args) { pthread_attr_t thattr; ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj)); @@ -254,7 +254,7 @@ static void shellRunImportThreads(struct arguments* args) free(threadObj); } -void source_dir(TAOS* con, struct arguments* args) { +void source_dir(TAOS* con, SShellArguments* args) { shellGetDirectoryFileList(args->dir); int64_t start = taosGetTimestampMs(); diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index f5a1145cf8994c01dded1685b9e8e70406e51013..da2bd94814da4dfd2b83318e61a0dfaffdf4116d 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -50,7 +50,7 @@ static struct argp_option options[] = { static error_t parse_opt(int key, char *arg, struct argp_state *state) { /* Get the input argument from argp_parse, which we know is a pointer to our arguments structure. */ - struct arguments *arguments = state->input; + SShellArguments *arguments = state->input; wordexp_t full_path; switch (key) { @@ -129,7 +129,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { /* Our argp parser. */ static struct argp argp = {options, parse_opt, args_doc, doc}; -void shellParseArgument(int argc, char *argv[], struct arguments *arguments) { +void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { static char verType[32] = {0}; sprintf(verType, "version: %s\n", version); diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index b39fba285ff09a5e83632e17299565fcd1b4a943..f8010b84cd24d3c0282b0366bb38315463428a71 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -62,7 +62,7 @@ int checkVersion() { } // Global configurations -struct arguments args = { +SShellArguments args = { .host = NULL, .password = NULL, .user = NULL, diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 33b76cea2deaa8b54d844b0002fbf648d1822599..3cacafa50501cfd0cf677c92327df499bff7a220 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -67,7 +67,7 @@ static struct argp_option options[] = { {0}}; /* Used by main to communicate with parse_opt. */ -struct arguments { +typedef struct DemoArguments { char *host; uint16_t port; char *user; @@ -87,13 +87,13 @@ struct arguments { int num_of_DPT; int abort; char **arg_list; -}; +} SDemoArguments; /* Parse a single option. */ static error_t parse_opt(int key, char *arg, struct argp_state *state) { /* Get the input argument from argp_parse, which we know is a pointer to our arguments structure. */ - struct arguments *arguments = state->input; + SDemoArguments *arguments = state->input; wordexp_t full_path; char **sptr; switch (key) { @@ -269,7 +269,7 @@ double getCurrentTime(); void callBack(void *param, TAOS_RES *res, int code); int main(int argc, char *argv[]) { - struct arguments arguments = {NULL, // host + SDemoArguments arguments = {NULL, // host 0, // port "root", // user "taosdata", // password diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index ed98a9b92c517b4387b8144cf5a8082e91384ad2..adba0911369731624934cacf9be350c4e1972745 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -168,7 +168,7 @@ static struct argp_option options[] = { {0}}; /* Used by main to communicate with parse_opt. */ -struct arguments { +typedef struct SDumpArguments { // connection option char *host; char *user; @@ -193,13 +193,13 @@ struct arguments { char **arg_list; int arg_list_len; bool isDumpIn; -}; +} SDumpArguments; /* Parse a single option. */ static error_t parse_opt(int key, char *arg, struct argp_state *state) { /* Get the input argument from argp_parse, which we know is a pointer to our arguments structure. */ - struct arguments *arguments = state->input; + SDumpArguments *arguments = state->input; wordexp_t full_path; switch (key) { @@ -296,31 +296,31 @@ char *command = NULL; char *lcommand = NULL; char *buffer = NULL; -int taosDumpOut(struct arguments *arguments); +int taosDumpOut(SDumpArguments *arguments); -int taosDumpIn(struct arguments *arguments); +int taosDumpIn(SDumpArguments *arguments); void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp); -int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp); +int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp); -void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, struct arguments *arguments, FILE *fp); +void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, SDumpArguments *arguments, FILE *fp); -void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, struct arguments *arguments, +void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, SDumpArguments *arguments, FILE *fp); -int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FILE *fp); +int32_t taosDumpTable(char *table, char *metric, SDumpArguments *arguments, FILE *fp); -int32_t taosDumpMetric(char *metric, struct arguments *arguments, FILE *fp); +int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp); -int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments); +int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments); -int taosCheckParam(struct arguments *arguments); +int taosCheckParam(SDumpArguments *arguments); void taosFreeDbInfos(); int main(int argc, char *argv[]) { - struct arguments arguments = { + SDumpArguments arguments = { // connection option NULL, "root", "taosdata", 0, // output file @@ -424,7 +424,7 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) { return -1; } -int taosDumpOut(struct arguments *arguments) { +int taosDumpOut(SDumpArguments *arguments) { TAOS_ROW row; char *temp = NULL; FILE *fp = NULL; @@ -602,7 +602,7 @@ void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) { fprintf(fp, "%s\n\n", buffer); } -int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp) { +int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) { TAOS_ROW row; int fd = -1; STableRecord tableRecord; @@ -660,7 +660,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp) { return 0; } -void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, struct arguments *arguments, FILE *fp) { +void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, SDumpArguments *arguments, FILE *fp) { char *pstr = NULL; pstr = buffer; int counter = 0; @@ -703,7 +703,7 @@ void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, struct argume fprintf(fp, "%s\n\n", buffer); } -void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, struct arguments *arguments, +void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, SDumpArguments *arguments, FILE *fp) { char *pstr = NULL; pstr = buffer; @@ -786,7 +786,7 @@ int taosGetTableDes(char *table, STableDef *tableDes) { return count; } -int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FILE *fp) { +int32_t taosDumpTable(char *table, char *metric, SDumpArguments *arguments, FILE *fp) { int count = 0; STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); @@ -828,7 +828,7 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI return taosDumpTableData(fp, table, arguments); } -int32_t taosDumpMetric(char *metric, struct arguments *arguments, FILE *fp) { +int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) { TAOS_ROW row = NULL; int fd = -1; STableRecord tableRecord; @@ -877,7 +877,7 @@ int32_t taosDumpMetric(char *metric, struct arguments *arguments, FILE *fp) { return 0; } -int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments) { +int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) { /* char temp[MAX_COMMAND_SIZE] = "\0"; */ int count = 0; char *pstr = NULL; @@ -987,7 +987,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments) { return 0; } -int taosCheckParam(struct arguments *arguments) { +int taosCheckParam(SDumpArguments *arguments) { if (arguments->all_databases && arguments->databases) { fprintf(stderr, "conflict option --all-databases and --databases\n"); return -1; @@ -1072,7 +1072,7 @@ void taosReplaceCtrlChar(char *str) { *pstr = '\0'; } -int taosDumpIn(struct arguments *arguments) { +int taosDumpIn(SDumpArguments *arguments) { assert(arguments->isDumpIn); int tsize = 0; diff --git a/src/mnode/inc/mgmtDef.h b/src/mnode/inc/mgmtDef.h index 894ee870999864b7e2e6a414b6db063a8f106f79..7c1fd6c5abfd1de6cd1475dcdf3fffacc313e9bb 100644 --- a/src/mnode/inc/mgmtDef.h +++ b/src/mnode/inc/mgmtDef.h @@ -77,6 +77,7 @@ typedef struct SSuperTableObj { uint64_t uid; int64_t createdTime; int32_t sversion; + int32_t tversion; int32_t numOfColumns; int32_t numOfTags; int8_t reserved[15]; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 804572b9ffbf52259a333c0e45bf44a5a1c0f160..d8bcf6724236d6e7bfa1e4372797a78ab95aa1e3 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -124,6 +124,8 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { + mTrace("%p, msg:%s will be processed", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]); + if (rpcMsg->pCont == NULL) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); return; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 50f24eef9f6266d6d503114ec8d3aec20ef3e280..de8f2f0ab0eef21fd0485fca9d4b2d07824681b5 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -763,6 +763,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { pStable->createdTime = taosGetTimestampMs(); pStable->uid = (((uint64_t) pStable->createdTime) << 16) + (sdbGetVersion() & ((1ul << 16) - 1ul)); pStable->sversion = 0; + pStable->tversion = 0; pStable->numOfColumns = htons(pCreate->numOfColumns); pStable->numOfTags = htons(pCreate->numOfTags); @@ -882,7 +883,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i } pStable->numOfTags += ntags; - pStable->sversion++; + pStable->tversion++; SSdbOper oper = { .type = SDB_OPER_GLOBAL, @@ -909,7 +910,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { memmove(pStable->schema + pStable->numOfColumns + col, pStable->schema + pStable->numOfColumns + col + 1, sizeof(SSchema) * (pStable->numOfTags - col - 1)); pStable->numOfTags--; - pStable->sversion++; + pStable->tversion++; SSdbOper oper = { .type = SDB_OPER_GLOBAL, @@ -1235,6 +1236,7 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)); pMeta->uid = htobe64(pTable->uid); pMeta->sversion = htons(pTable->sversion); + pMeta->tversion = htons(pTable->tversion); pMeta->precision = pMsg->pDb->cfg.precision; pMeta->numOfTags = (uint8_t)pTable->numOfTags; pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns); @@ -1358,12 +1360,14 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb pCreate->numOfColumns = htons(pTable->superTable->numOfColumns); pCreate->numOfTags = htons(pTable->superTable->numOfTags); pCreate->sversion = htonl(pTable->superTable->sversion); + pCreate->tversion = htonl(pTable->superTable->tversion); pCreate->tagDataLen = htonl(tagDataLen); pCreate->superTableUid = htobe64(pTable->superTable->uid); } else { pCreate->numOfColumns = htons(pTable->numOfColumns); pCreate->numOfTags = 0; pCreate->sversion = htonl(pTable->sversion); + pCreate->tversion = 0; pCreate->tagDataLen = 0; pCreate->superTableUid = 0; } @@ -1686,15 +1690,17 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { pMeta->sid = htonl(pTable->sid); pMeta->precision = pDb->cfg.precision; pMeta->tableType = pTable->info.type; - strncpy(pMeta->tableId, pTable->info.tableId, tListLen(pTable->info.tableId)); + strncpy(pMeta->tableId, pTable->info.tableId, strlen(pTable->info.tableId)); if (pTable->info.type == TSDB_CHILD_TABLE) { pMeta->sversion = htons(pTable->superTable->sversion); + pMeta->tversion = htons(pTable->superTable->tversion); pMeta->numOfTags = (int8_t)pTable->superTable->numOfTags; pMeta->numOfColumns = htons((int16_t)pTable->superTable->numOfColumns); pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable); } else { pMeta->sversion = htons(pTable->sversion); + pMeta->tversion = 0; pMeta->numOfTags = 0; pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns); pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable); diff --git a/src/query/inc/qsqltype.h b/src/query/inc/qsqltype.h deleted file mode 100644 index 4087be49eeac3042f04013b9e929fce30d323cea..0000000000000000000000000000000000000000 --- a/src/query/inc/qsqltype.h +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_QSQLCMD_H -#define TDENGINE_QSQLCMD_H - -#ifdef __cplusplus -extern "C" { -#endif - -enum _sql_type { - TSDB_SQL_SELECT = 1, - TSDB_SQL_FETCH, - TSDB_SQL_INSERT, - - TSDB_SQL_MGMT, // the SQL below is for mgmt node - TSDB_SQL_CREATE_DB, - TSDB_SQL_CREATE_TABLE, - TSDB_SQL_DROP_DB, - TSDB_SQL_DROP_TABLE, - TSDB_SQL_CREATE_ACCT, - TSDB_SQL_CREATE_USER, // 10 - TSDB_SQL_DROP_ACCT, - TSDB_SQL_DROP_USER, - TSDB_SQL_ALTER_USER, - TSDB_SQL_ALTER_ACCT, - TSDB_SQL_ALTER_TABLE, - TSDB_SQL_ALTER_DB, - TSDB_SQL_CREATE_MNODE, - TSDB_SQL_DROP_MNODE, - TSDB_SQL_CREATE_DNODE, - TSDB_SQL_DROP_DNODE, // 20 - TSDB_SQL_CFG_DNODE, - TSDB_SQL_CFG_MNODE, - TSDB_SQL_SHOW, - TSDB_SQL_RETRIEVE, - TSDB_SQL_KILL_QUERY, - TSDB_SQL_KILL_STREAM, - TSDB_SQL_KILL_CONNECTION, - - TSDB_SQL_READ, // SQL below is for read operation - TSDB_SQL_CONNECT, - TSDB_SQL_USE_DB, // 30 - TSDB_SQL_META, - TSDB_SQL_STABLEVGROUP, - TSDB_SQL_MULTI_META, - TSDB_SQL_HB, - - TSDB_SQL_LOCAL, // SQL below for client local - TSDB_SQL_DESCRIBE_TABLE, - TSDB_SQL_RETRIEVE_LOCALMERGE, - TSDB_SQL_TABLE_JOIN_RETRIEVE, - - /* - * build empty result instead of accessing dnode to fetch result - * reset the client cache - */ - TSDB_SQL_RETRIEVE_EMPTY_RESULT, - - TSDB_SQL_RESET_CACHE, // 40 - TSDB_SQL_SERV_STATUS, - TSDB_SQL_CURRENT_DB, - TSDB_SQL_SERV_VERSION, - TSDB_SQL_CLI_VERSION, - TSDB_SQL_CURRENT_USER, - TSDB_SQL_CFG_LOCAL, - - TSDB_SQL_MAX // 47 -}; - - -// create table operation type -enum TSQL_TYPE { - TSQL_CREATE_TABLE = 0x1, - TSQL_CREATE_STABLE = 0x2, - TSQL_CREATE_TABLE_FROM_STABLE = 0x3, - TSQL_CREATE_STREAM = 0x4, -}; - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_QSQLCMD_H diff --git a/src/rpc/inc/rpcHead.h b/src/rpc/inc/rpcHead.h index 8b5410a596ae64a27ebecc9e3591fa90de044323..520edadc7dd072849720cad53c7f6f4ba605a06c 100644 --- a/src/rpc/inc/rpcHead.h +++ b/src/rpc/inc/rpcHead.h @@ -49,6 +49,7 @@ typedef struct { char encrypt:3; // encrypt algorithm, 0: no encryption uint16_t tranId; // transcation ID uint32_t linkUid; // for unique connection ID assigned by client + uint64_t ahandle; // ahandle assigned by client uint32_t sourceId; // source ID, an index for connection list uint32_t destId; // destination ID, an index for connection list uint32_t destIp; // destination IP address, for NAT scenario diff --git a/src/rpc/src/rpcCache.c b/src/rpc/src/rpcCache.c index edbb9b3e12be6dc24acfbb60fff1f91fe8e1c898..7a96571ab9d1a220e98d6d48a817fcc251097374 100644 --- a/src/rpc/src/rpcCache.c +++ b/src/rpc/src/rpcCache.c @@ -146,7 +146,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in rpcUnlockCache(pCache->lockedBy+hash); pCache->total++; - tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); + // tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]); return; } @@ -200,9 +200,9 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy rpcUnlockCache(pCache->lockedBy+hash); if (pData) { - tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); + //tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]); } else { - tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]); + //tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]); } return pData; @@ -240,8 +240,8 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash pNext = pNode->next; pCache->total--; pCache->count[hash]--; - tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, - pCache->count[hash]); + //tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode, + // pCache->count[hash]); taosMemPoolFree(pCache->connHashMemPool, (char *)pNode); pNode = pNext; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index ff96306beaf08cec37465041d85338a74a1c3f19..445e542387961cde730943f02a56c2d9fe81d610 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -87,6 +87,7 @@ typedef struct { } SRpcReqContext; typedef struct SRpcConn { + char info[50];// debug info: label + pConn + ahandle int sid; // session ID uint32_t ownId; // own link ID uint32_t peerId; // peer link ID @@ -275,7 +276,7 @@ void *rpcOpen(const SRpcInit *pInit) { return NULL; } - tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads); + tTrace("%s rpc is openned, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions); return pRpc; } @@ -299,7 +300,7 @@ void rpcClose(void *param) { tfree(pRpc->connList); pthread_mutex_destroy(&pRpc->mutex); - tTrace("%s RPC is closed", pRpc->label); + tTrace("%s rpc is closed", pRpc->label); tfree(pRpc); } @@ -375,8 +376,6 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) void rpcSendResponse(const SRpcMsg *pRsp) { int msgLen = 0; SRpcConn *pConn = (SRpcConn *)pRsp->handle; - SRpcInfo *pRpc = pConn->pRpc; - SRpcMsg rpcMsg = *pRsp; SRpcMsg *pMsg = &rpcMsg; @@ -394,7 +393,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { rpcLockConn(pConn); if ( pConn->inType == 0 || pConn->user[0] == 0 ) { - tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn); + tTrace("%s, connection is already released, rsp wont be sent", pConn->info); rpcUnlockConn(pConn); return; } @@ -410,7 +409,8 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pHead->linkUid = pConn->linkUid; pHead->port = htons(pConn->localPort); pHead->code = htonl(pMsg->code); - + pHead->ahandle = (uint64_t) pConn->ahandle; + // set pConn parameters pConn->inType = 0; @@ -492,6 +492,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, uint32_t peerIp = taosGetIpFromFqdn(peerFqdn); if (peerIp == -1) { tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn); + terrno = TSDB_CODE_APP_ERROR; return NULL; } @@ -507,11 +508,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, if (taosOpenConn[connType]) { void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle; pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort); - if (pConn->chandle) { - tTrace("%s %p, rpc connection is set up, sid:%d id:%s %s:%hu connType:%d", pRpc->label, - pConn, pConn->sid, pRpc->user, peerFqdn, pConn->peerPort, pConn->connType); - } else { - tError("%s %p, failed to set up connection to %s:%hu", pRpc->label, pConn, peerFqdn, pConn->peerPort); + if (pConn->chandle == NULL) { terrno = TSDB_CODE_NETWORK_UNAVAIL; rpcCloseConn(pConn); pConn = NULL; @@ -558,7 +555,7 @@ static void rpcCloseConn(void *thandle) { taosFreeId(pRpc->idPool, pConn->sid); pConn->pContext = NULL; - tTrace("%s %p, rpc connection is closed", pRpc->label, pConn); + tTrace("%s, rpc connection is closed", pConn->info); rpcUnlockConn(pConn); } @@ -620,7 +617,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } if (terrno != 0) { - tWarn("%s %p, user not there or server not ready", pRpc->label, pConn); taosFreeId(pRpc->idPool, sid); // sid shall be released pConn = NULL; } @@ -635,8 +631,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { } taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); - tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", - pRpc->label, pConn, sid, pConn->user, pConn->localPort); } return pConn; @@ -661,7 +655,6 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { if (pConn) { if (pConn->linkUid != pHead->linkUid) { - tTrace("%s %p, linkUid:0x%x not matched, received:0x%x", pRpc->label, pConn, pConn->linkUid, pHead->linkUid); terrno = TSDB_CODE_MISMATCHED_METER_ID; pConn = NULL; } @@ -678,21 +671,25 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); if ( pConn == NULL || pConn->user[0] == 0) { pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType); + } + + if (pConn) { + pConn->ahandle = pContext->ahandle; + sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle); } else { - tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn); + tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno)); } return pConn; } static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { - SRpcInfo *pRpc= pConn->pRpc; if (pConn->peerId == 0) { pConn->peerId = pHead->sourceId; } else { if (pConn->peerId != pHead->sourceId) { - tTrace("%s %p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn, + tTrace("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info, pConn->peerId, pHead->sourceId); return TSDB_CODE_INVALID_VALUE; } @@ -701,17 +698,16 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { if (pConn->inTranId == pHead->tranId) { if (pConn->inType == pHead->msgType) { if (pHead->code == 0) { - tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]); + tTrace("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); } else { // do nothing, it is heart beat from client } } else if (pConn->inType == 0) { - tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn, - taosMsg[pHead->msgType], pConn->inTranId); + tTrace("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId); rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response } else { - tTrace("%s %p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]); + tTrace("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]); } // do not reply any message @@ -719,7 +715,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { } if (pConn->inType != 0) { - tTrace("%s %p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn, + tTrace("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info, pConn->inTranId, pHead->tranId); return TSDB_CODE_LAST_SESSION_NOT_FINISHED; } @@ -751,7 +747,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) { if (pConn->tretry <= tsRpcMaxRetry) { - tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn); + tTrace("%s, peer is still processing the transaction", pConn->info); pConn->tretry++; rpcSendReqHead(pConn); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); @@ -790,7 +786,15 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { } pConn = rpcGetConnObj(pRpc, sid, pRecv); - if (pConn == NULL) return NULL; + if (pConn == NULL) { + tError("%s %p, failed to get connection obj(%s)", pRpc->label, pHead->ahandle, tstrerror(terrno)); + return NULL; + } else { + if (rpcIsReq(pHead->msgType)) { + pConn->ahandle = (void *)pHead->ahandle; + sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle); + } + } rpcLockConn(pConn); sid = pConn->sid; @@ -827,7 +831,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { static void rpcProcessBrokenLink(SRpcConn *pConn) { SRpcInfo *pRpc = pConn->pRpc; - tTrace("%s %p, link is broken", pRpc->label, pConn); + tTrace("%s, link is broken", pConn->info); // pConn->chandle = NULL; if (pConn->outType) { @@ -838,7 +842,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn->inType) { // if there are pending request, notify the app - tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn); + tTrace("%s, connection is gone, notify the app", pConn->info); /* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; @@ -873,17 +877,17 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) { - tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", - pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, + tTrace("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", + pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); } int32_t code = terrno; if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != 0) { // parsing error - if ( rpcIsReq(pHead->msgType) ) { + if (rpcIsReq(pHead->msgType)) { rpcSendErrorMsgToPeer(pRecv, code); - tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); + tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // parsing OK rpcProcessIncomingMsg(pConn, pHead); @@ -925,6 +929,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; rpcMsg.code = pHead->code; + rpcMsg.ahandle = pConn->ahandle; if ( rpcIsReq(pHead->msgType) ) { rpcMsg.handle = pConn; @@ -949,14 +954,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { pContext->redirect++; if (pContext->redirect > TSDB_MAX_REPLICA) { pHead->code = TSDB_CODE_NETWORK_UNAVAIL; - tWarn("%s %p, too many redirects, quit", pRpc->label, pConn); + tWarn("%s, too many redirects, quit", pConn->info); } } if (pHead->code == TSDB_CODE_REDIRECT) { pContext->numOfTry = 0; memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); - tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); + tTrace("%s, redirect is received, numOfIps:%d", pConn->info, pContext->ipSet.numOfIps); for (int i=0; iipSet.numOfIps; ++i) pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]); rpcSendReqToServer(pRpc, pContext); @@ -1062,6 +1067,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { return; } + pConn->ahandle = pContext->ahandle; rpcLockConn(pConn); // set the message header @@ -1075,6 +1081,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { pHead->destId = pConn->peerId; pHead->port = 0; pHead->linkUid = pConn->linkUid; + pHead->ahandle = (uint64_t)pConn->ahandle; if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user)); // set the connection parameters @@ -1092,30 +1099,28 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { int writtenLen = 0; - SRpcInfo *pRpc = pConn->pRpc; SRpcHead *pHead = (SRpcHead *)msg; msgLen = rpcAddAuthPart(pConn, msg, msgLen); if ( rpcIsReq(pHead->msgType)) { if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", - pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn, - pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + tTrace("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", + pConn->info, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort, + msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) - tTrace( "%s %p, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", - pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, + tTrace("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", + pConn->info, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } - tTrace("connection type is: %d", pConn->connType); + //tTrace("connection type is: %d", pConn->connType); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); if (writtenLen != msgLen) { - tError("%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn, - msgLen, writtenLen, strerror(errno)); + tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); } tDump(msg, msgLen); @@ -1130,7 +1135,7 @@ static void rpcProcessConnError(void *param, void *id) { return; } - tTrace("%s connection error happens", pRpc->label); + tTrace("%s %p, connection error happens", pRpc->label, pContext->ahandle); if (pContext->numOfTry >= pContext->ipSet.numOfIps) { rpcMsg.msgType = pContext->msgType+1; @@ -1156,23 +1161,21 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { rpcLockConn(pConn); if (pConn->outType && pConn->user[0]) { - tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]); + tTrace("%s, expected %s is not received", pConn->info, taosMsg[(int)pConn->outType + 1]); pConn->pTimer = NULL; pConn->retry++; if (pConn->retry < 4) { - tTrace("%s %p, re-send msg:%s to %s:%hu", pRpc->label, pConn, - taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); + tTrace("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen); taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { // close the connection - tTrace("%s %p, failed to send msg:%s to %s:%hu", pRpc->label, pConn, - taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); + tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); reportDisc = 1; } } else { - tTrace("%s %p, retry timer not processed", pRpc->label, pConn); + tTrace("%s, retry timer not processed", pConn->info); } rpcUnlockConn(pConn); @@ -1189,10 +1192,10 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcInfo *pRpc = pConn->pRpc; if (pConn->user[0]) { - tTrace("%s %p, close the connection since no activity", pRpc->label, pConn); + tTrace("%s, close the connection since no activity", pConn->info); if (pConn->inType && pRpc->cfp) { // if there are pending request, notify the app - tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn); + tTrace("%s, notify the app, connection is gone", pConn->info); /* SRpcMsg rpcMsg; rpcMsg.pCont = NULL; @@ -1205,7 +1208,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) { } rpcCloseConn(pConn); } else { - tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId); + tTrace("%s, idle timer:%p not processed", pConn->info, tmrId); } } @@ -1216,11 +1219,11 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) { rpcLockConn(pConn); if (pConn->inType && pConn->user[0]) { - tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn); + tTrace("%s, progress timer expired, send progress", pConn->info); rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); } else { - tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId); + tTrace("%s, progress timer:%p not processed", pConn->info, tmrId); } rpcUnlockConn(pConn); @@ -1254,7 +1257,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { memcpy(pCont + overhead, buf, compLen); pHead->comp = 1; - tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); + //tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen); finalLen = compLen + overhead; } else { finalLen = contLen; @@ -1288,7 +1291,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { pNewHead->msgLen = rpcMsgLenFromCont(origLen); rpcFreeMsg(pHead); // free the compressed message buffer pHead = pNewHead; - tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); + //tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); } else { tError("failed to allocate memory to decompress msg, contLen:%d", contLen); } @@ -1345,7 +1348,6 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) { static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { SRpcHead *pHead = (SRpcHead *)msg; - SRpcInfo *pRpc = pConn->pRpc; int code = 0; if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){ @@ -1373,20 +1375,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) { delta = (int32_t)htonl(pDigest->timeStamp); delta -= (int32_t)taosGetTimestampSec(); if (abs(delta) > 900) { - tWarn("%s %p, time diff:%d is too big, msg discarded", pRpc->label, pConn, delta); + tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); code = TSDB_CODE_INVALID_TIME_STAMP; } else { if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { - tError("%s %p, authentication failed, msg discarded", pRpc->label, pConn); + tError("%s, authentication failed, msg discarded", pConn->info); code = TSDB_CODE_AUTH_FAILURE; } else { pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client - tTrace("%s %p, message is authenticated", pRpc->label, pConn); + //tTrace("%s, message is authenticated", pConn->info); } } } else { - tTrace("%s %p, auth spi:%d not matched with received:%d", pRpc->label, pConn, pConn->spi, pHead->spi); + tError("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); code = TSDB_CODE_AUTH_FAILURE; } diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index c551f6b1db1acfe847ce51ef70b1f883ed66cb05..677187e3b999c3e55301eea3efc3bf5b8a19395e 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -218,7 +218,7 @@ static void *taosRecvUdpData(void *param) { while (1) { dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen); port = ntohs(sourceAdd.sin_port); - tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen); + //tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen); if (dataLen < sizeof(SRpcHead)) { tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno)); diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 0a1444c2f0d6c74283788515445bf31567a71655..c5a6ed54c3ed4678d9a4a7a9476560720cf79691 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -214,12 +214,12 @@ typedef struct { int64_t tombSize; // unused file size int32_t totalBlocks; int32_t totalSubBlocks; -} SFileInfo; +} STsdbFileInfo; typedef struct { int fd; char fname[128]; - SFileInfo info; + STsdbFileInfo info; } SFile; #define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1) @@ -350,7 +350,7 @@ SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid); void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); // TSDB repository definition -typedef struct _tsdb_repo { +typedef struct STsdbRepo { char *rootDir; // TSDB configuration STsdbCfg config; diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index ec772fd4d1f1468147a47f9cc728532d75e18298..8634a523f3d5b2a1a8a49180acd5a7dab3785cfb 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -12,6 +12,7 @@ python3 ./test.py $1 -f insert/binary.py python3 ./test.py $1 -f insert/nchar.py python3 ./test.py $1 -f insert/nchar-boundary.py python3 ./test.py $1 -f insert/nchar-unicode.py +python3 ./test.py $1 -f insert/multi.py python3 ./test.py $1 -f table/column_name.py python3 ./test.py $1 -f table/column_num.py diff --git a/tests/pytest/insert/bool.py b/tests/pytest/insert/bool.py index 062563f4ab84c9e09ea3ed08ced60fac53b1add8..c175afd8b556f0a9e82c23c4a6d62eacf45a0f70 100644 --- a/tests/pytest/insert/bool.py +++ b/tests/pytest/insert/bool.py @@ -58,6 +58,14 @@ class TDTestCase: tdSql.query('select * from tb order by ts desc') tdLog.info('tdSql.checkRow(6)') tdSql.checkRows(6) + tdLog.info('=============== step7') + tdLog.info("insert into tb values (now+6m, true)") + tdSql.execute("insert into tb values (now+5m, true)") + tdLog.info('select * from tb order by ts desc') + tdSql.query('select * from tb order by ts desc') + tdLog.info('tdSql.checkRow(7)') + tdSql.checkRows(7) +# convert end # convert end def stop(self): diff --git a/tests/pytest/smoketest.sh b/tests/pytest/smoketest.sh index c0878354e032707f85596abae078ef7080035dae..6314b3f83bce23ee2f513271e768ae7b6c6668f1 100755 --- a/tests/pytest/smoketest.sh +++ b/tests/pytest/smoketest.sh @@ -21,6 +21,8 @@ python3 ./test.py $1 -f insert/date.py python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -f insert/nchar.py python3 ./test.py $1 -s && sleep 1 +python3 ./test.py $1 -f insert/multi.py +python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -f table/column_name.py python3 ./test.py $1 -s && sleep 1 diff --git a/tests/pytest/table/tablename-boundary.py b/tests/pytest/table/tablename-boundary.py new file mode 100644 index 0000000000000000000000000000000000000000..335073065c5a5b6af82469c967bac1df623d5e5a --- /dev/null +++ b/tests/pytest/table/tablename-boundary.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- + +import sys +import string +import random +import subprocess +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + def init(self, conn): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + def run(self): + tdSql.prepare() + + getTableNameLen = "grep -w '#define TSDB_TABLE_NAME_LEN' ../../src/inc/taosdef.h|awk '{print $3}'" + tableNameMaxLen = int( + subprocess.check_output( + getTableNameLen, shell=True)) + tdLog.info("table name max length is %d" % tableNameMaxLen) + chars = string.ascii_uppercase + string.ascii_lowercase + tb_name = ''.join(random.choices(chars, k=tableNameMaxLen)) + tdLog.info('tb_name length %d' % len(tb_name)) + tdLog.info('create table %s (ts timestamp, value int)' % tb_name) + tdSql.error( + 'create table %s (ts timestamp, speed binary(4089))' % + tb_name) + + tb_name = ''.join(random.choices(chars, k=191)) + tdLog.info('tb_name length %d' % len(tb_name)) + tdLog.info('create table %s (ts timestamp, value int)' % tb_name) + tdSql.execute( + 'create table %s (ts timestamp, speed binary(4089))' % + tb_name) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 9f7466a7e7b6c75cbafc274c131cff4e0e571b04..727016adb320e917c4d68b3ed1486577e423b1f9 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -223,8 +223,8 @@ class TDDnode: self.running = 1 tdLog.debug("dnode:%d is running with %s " % (self.index, cmd)) - tdLog.debug("wait 4 seconds for the dnode:%d to start." % (self.index)) - time.sleep(4) + tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index)) + time.sleep(5) def stop(self): if self.valgrind == 0: @@ -233,16 +233,14 @@ class TDDnode: toBeKilled = "valgrind.bin" if self.running != 0: - killCmd = "ps -ef|grep -w %s| grep '%s' | grep -v grep | awk '{print $2}' | xargs kill -INT" % ( - toBeKilled, self.cfgDir) - psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -INT %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") self.running = 0 tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index)) @@ -254,15 +252,14 @@ class TDDnode: toBeKilled = "valgrind.bin" if self.running != 0: - killCmd = "ps -ef|grep -w %s| grep '%s' | grep -v grep | awk '{print $2}' | xargs kill -KILL" % ( - toBeKilled, self.cfgDir) psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -KILL %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") self.running = 0 tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index)) @@ -307,21 +304,21 @@ class TDDnodes: self.dnodes.append(TDDnode(10)) def init(self, path): - killCmd = "ps -ef|grep -w taosd | grep -v grep | awk '{print $2}' | xargs kill -KILL" psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -KILL %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - killCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs kill -KILL" psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -KILL %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") binPath = os.path.dirname(os.path.realpath(__file__)) binPath = binPath + "/../../../debug/" @@ -407,27 +404,27 @@ class TDDnodes: self.dnodes[i].stop() psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") if processID: cmd = "sudo systemctl stop taosd" os.system(cmd) # if os.system(cmd) != 0 : # tdLog.exit(cmd) - killCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}' | xargs kill -KILL" psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -KILL %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") - killCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs kill -KILL" psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'" - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): + killCmd = "kill -KILL %s" % processID os.system(killCmd) time.sleep(1) - processID = subprocess.check_output(psCmd, shell=True) + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") # if os.system(cmd) != 0 : # tdLog.exit(cmd) diff --git a/tests/pytest/valgrind-test.sh b/tests/pytest/valgrind-test.sh index 77c2b321df1988eab3cafe9483a660c84cd9ac93..0b5dfc0fa3dbb00d4c7a4e276ea63d7944d92f6d 100755 --- a/tests/pytest/valgrind-test.sh +++ b/tests/pytest/valgrind-test.sh @@ -1,33 +1,35 @@ #!/bin/bash -python3 ./test.py $1 -f insert/basic.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/int.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/float.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/bigint.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/bool.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/double.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/smallint.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/tinyint.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/binary.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/date.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f insert/nchar.py -python3 ./test.py $1 -s && sleep 1 +python3 ./test.py -g -f insert/basic.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/int.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/float.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/bigint.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/bool.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/double.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/smallint.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/tinyint.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/binary.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/date.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/nchar.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f insert/multi.py +python3 ./test.py -g -s && sleep 1 -python3 ./test.py $1 -f table/column_name.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f table/column_num.py -python3 ./test.py $1 -s && sleep 1 -python3 ./test.py $1 -f table/db_table.py -python3 ./test.py $1 -s && sleep 1 +python3 ./test.py -g -f table/column_name.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f table/column_num.py +python3 ./test.py -g -s && sleep 1 +python3 ./test.py -g -f table/db_table.py +python3 ./test.py -g -s && sleep 1 -python3 ./test.py $1 -f import_merge/importDataLastSub.py -python3 ./test.py $1 -s && sleep 1 +python3 ./test.py -g -f import_merge/importDataLastSub.py +python3 ./test.py -g -s && sleep 1