提交 0c62f023 编写于 作者: B Bomin Zhang

Merge remote-tracking branch 'origin/develop' into feature/boundary-check

......@@ -50,7 +50,7 @@ matrix:
./test-all.sh $TRAVIS_EVENT_TYPE || travis_terminate $?
cd ${TRAVIS_BUILD_DIR}/tests/pytest
./valgrind-test.sh -g 2>&1 > mem-error-out.txt
./valgrind-test.sh 2>&1 > mem-error-out.txt
sleep 1
# Color setting
......
......@@ -264,6 +264,7 @@ bool hasMoreVnodesToTry(SSqlObj *pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
int tscSetMgmtIpListFromCfg(const char *first, const char *second);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -27,6 +27,7 @@
#include "ttimer.h"
#include "tutil.h"
#include "tscLog.h"
#include "qsqltype.h"
#define TSC_MGMT_VNODE 999
......@@ -67,7 +68,7 @@ void tscPrintMgmtIp() {
}
}
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
tscMgmtIpSet.numOfIps = pIpList->numOfIps;
tscMgmtIpSet.inUse = pIpList->inUse;
for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
......@@ -75,16 +76,6 @@ void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
}
}
void tscSetMgmtIpListFromEdge() {
if (tscMgmtIpSet.numOfIps != 1) {
tscMgmtIpSet.numOfIps = 1;
tscMgmtIpSet.inUse = 0;
taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[0], &tscMgmtIpSet.port[0]);
tscTrace("edge mgmt IP list:");
tscPrintMgmtIp();
}
}
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
tscMgmtIpSet = *pIpSet;
tscTrace("mgmt IP list is changed for ufp is called, numOfIps:%d inUse:%d", tscMgmtIpSet.numOfIps, tscMgmtIpSet.inUse);
......@@ -93,18 +84,6 @@ void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
}
}
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
/*
* The iplist returned by the cluster edition is the current management nodes
* and the iplist returned by the edge edition is empty
*/
if (pIpList->numOfIps != 0) {
tscSetMgmtIpListFromCluster(pIpList);
} else {
tscSetMgmtIpListFromEdge();
}
}
/*
* For each management node, try twice at least in case of poor network situation.
* If the client start to connect to a non-management node from the client, and the first retry may fail due to
......@@ -132,7 +111,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (code == 0) {
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
SRpcIpSet * pIpList = &pRsp->ipList;
tscSetMgmtIpList(pIpList);
if (pIpList->numOfIps > 0)
tscSetMgmtIpList(pIpList);
if (pRsp->killConnection) {
tscKillConnection(pObj);
......@@ -207,7 +187,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
}
tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
// tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
......@@ -235,7 +215,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
tscTrace("%p msg:%p is received from server", pSql, rpcMsg->pCont);
// tscTrace("%p msg:%s is received from server", pSql, taosMsg[rpcMsg->msgType]);
if (pSql->freed || pObj->signature != pObj) {
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
......@@ -340,10 +320,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);
pRes->numOfRows += pMsg->affectedRows;
tscTrace("%p cmd:%d code:%s, inserted rows:%d, rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code),
pMsg->affectedRows, pRes->rspLen);
tscTrace("%p SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql, sqlCmd[pCmd->command],
tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
} else {
tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen);
tscTrace("%p SQL cmd:%s, code:%s rspLen:%d", pSql, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
}
}
......@@ -426,7 +406,7 @@ int tscProcessSql(SSqlObj *pSql) {
assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
}
tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
tscTrace("%p SQL cmd:%s will be processed, name:%s, type:%d", pSql, sqlCmd[pCmd->command], name, type);
if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
if (pTableMetaInfo == NULL) {
pSql->res.code = TSDB_CODE_OTHERS;
......@@ -2225,7 +2205,8 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
assert(len <= tListLen(pObj->db));
strncpy(pObj->db, temp, tListLen(pObj->db));
tscSetMgmtIpList(&pConnect->ipList);
if (pConnect->ipList.numOfIps > 0)
tscSetMgmtIpList(&pConnect->ipList);
strcpy(pObj->sversion, pConnect->serverVersion);
pObj->writeAuth = pConnect->writeAuth;
......
......@@ -65,32 +65,18 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
terrno = TSDB_CODE_INVALID_PASS;
return NULL;
}
if (ip) {
if (tscSetMgmtIpListFromCfg(ip, NULL) < 0) return NULL;
if (port) tscMgmtIpSet.port[0] = port;
}
void *pDnodeConn = NULL;
if (tscInitRpc(user, pass, &pDnodeConn) != 0) {
terrno = TSDB_CODE_NETWORK_UNAVAIL;
return NULL;
}
tscMgmtIpSet.numOfIps = 0;
if (ip && ip[0]) {
tscMgmtIpSet.inUse = 0;
tscMgmtIpSet.numOfIps = 1;
strcpy(tscMgmtIpSet.fqdn[0], ip);
tscMgmtIpSet.port[0] = port? port: tsDnodeShellPort;
} else {
if (tsFirst[0] != 0) {
taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
tscMgmtIpSet.numOfIps++;
}
if (tsSecond[0] != 0) {
taosGetFqdnPortFromEp(tsSecond, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
tscMgmtIpSet.numOfIps++;
}
}
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
if (NULL == pObj) {
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
......
......@@ -23,6 +23,7 @@
#include "tutil.h"
#include "tsched.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tglobal.h"
#include "tconfig.h"
......@@ -114,14 +115,10 @@ void taos_init_imp() {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
}
tscMgmtIpSet.inUse = 0;
tscMgmtIpSet.numOfIps = 1;
taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[0], &tscMgmtIpSet.port[0]);
if (tsSecond[0] && strcmp(tsSecond, tsFirst) != 0) {
tscMgmtIpSet.numOfIps = 2;
taosGetFqdnPortFromEp(tsSecond, tscMgmtIpSet.fqdn[1], &tscMgmtIpSet.port[1]);
}
if (tscSetMgmtIpListFromCfg(tsFirst, tsSecond) < 0) {
tscError("failed to init mgmt IP list");
return;
}
tscInitMsgsFp();
int queueSize = tsMaxVnodeConnections + tsMaxMeterConnections + tsMaxMgmtConnections + tsMaxMgmtConnections;
......
......@@ -2163,3 +2163,33 @@ char* strdup_throw(const char* str) {
}
return p;
}
int tscSetMgmtIpListFromCfg(const char *first, const char *second) {
tscMgmtIpSet.numOfIps = 0;
tscMgmtIpSet.inUse = 0;
if (first && first[0] != 0) {
if (strlen(first) >= TSDB_FQDN_LEN) {
terrno = TSDB_CODE_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(first, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
tscMgmtIpSet.numOfIps++;
}
if (second && second[0] != 0) {
if (strlen(second) >= TSDB_FQDN_LEN) {
terrno = TSDB_CODE_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(second, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
tscMgmtIpSet.numOfIps++;
}
if ( tscMgmtIpSet.numOfIps == 0) {
terrno = TSDB_CODE_INVALID_FQDN;
return -1;
}
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QSQLCMD_H
#define TDENGINE_QSQLCMD_H
#ifdef __cplusplus
extern "C" {
#endif
// sql type
#ifdef TSDB_SQL_C
#define TSDB_DEFINE_SQL_TYPE( name, msg ) msg,
char *sqlCmd[] = {
"null",
#else
#define TSDB_DEFINE_SQL_TYPE( name, msg ) name,
enum {
TSDB_SQL_NULL = 0,
#endif
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
// the SQL below is for mgmt node
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_ACCT, "create-acct" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_USER, "create-user" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_ACCT, "drop-acct" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_USER, "drop-user" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_USER, "alter-user" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_ACCT, "alter-acct" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DNODE, "create-dnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DNODE, "drop-dnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_DNODE, "cfg-dnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_MNODE, "cfg-mnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW, "show" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE, "retrieve" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_QUERY, "kill-query" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_STREAM, "kill-stream" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_CONNECTION, "kill-connection" )
// SQL below is for read operation
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_READ, "read" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CONNECT, "connect" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_USE_DB, "use-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_META, "meta" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_STABLEVGROUP, "stable-vgroup" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MULTI_META, "multi-meta" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_HB, "heart-beat" )
// SQL below for client local
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DESCRIBE_TABLE, "describe-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_LOCALMERGE, "retrieve-localmerge" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_TABLE_JOIN_RETRIEVE, "join-retrieve" )
/*
* build empty result instead of accessing dnode to fetch result
* reset the client cache
*/
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_EMPTY_RESULT, "retrieve-empty-result" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RESET_CACHE, "reset-cache" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SERV_STATUS, "serv-status" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CURRENT_DB, "current-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SERV_VERSION, "serv-version" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CLI_VERSION, "cli-version" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CURRENT_USER, "current-user ")
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CFG_LOCAL, "cfg-local" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MAX, "max" )
};
// create table operation type
enum TSQL_TYPE {
TSQL_CREATE_TABLE = 0x1,
TSQL_CREATE_STABLE = 0x2,
TSQL_CREATE_TABLE_FROM_STABLE = 0x3,
TSQL_CREATE_STREAM = 0x4,
};
extern char *sqlCmd[];
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_QSQLCMD_H
......@@ -175,7 +175,7 @@ void taosInitGlobalCfg();
bool taosCheckGlobalCfg();
void taosSetAllDebugFlag();
bool taosCfgDynamicOptions(char *msg);
int taosGetFqdnPortFromEp(char *ep, char *fqdn, uint16_t *port);
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define TSDB_SQL_C
#include "qsqltype.h"
......@@ -61,7 +61,7 @@ int32_t tscEmbedded = 0;
*/
int64_t tsMsPerDay[] = {86400000L, 86400000000L};
char tsFirst[TSDB_FQDN_LEN] = {0};
char tsFirst[TSDB_FQDN_LEN] = {0};
char tsSecond[TSDB_FQDN_LEN] = {0};
char tsArbitrator[TSDB_FQDN_LEN] = {0};
char tsLocalEp[TSDB_FQDN_LEN] = {0}; // Local End Point, hostname:port
......@@ -1252,7 +1252,7 @@ bool taosCheckGlobalCfg() {
return true;
}
int taosGetFqdnPortFromEp(char *ep, char *fqdn, uint16_t *port) {
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) {
*port = 0;
strcpy(fqdn, ep);
......
......@@ -13,84 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QSQLCMD_H
#define TDENGINE_QSQLCMD_H
#ifndef TDENGINE_DNODE_MAIN_H
#define TDENGINE_DNODE_MAIN_H
#ifdef __cplusplus
extern "C" {
#endif
enum _sql_type {
TSDB_SQL_SELECT = 1,
TSDB_SQL_FETCH,
TSDB_SQL_INSERT,
TSDB_SQL_MGMT, // the SQL below is for mgmt node
TSDB_SQL_CREATE_DB,
TSDB_SQL_CREATE_TABLE,
TSDB_SQL_DROP_DB,
TSDB_SQL_DROP_TABLE,
TSDB_SQL_CREATE_ACCT,
TSDB_SQL_CREATE_USER, // 10
TSDB_SQL_DROP_ACCT,
TSDB_SQL_DROP_USER,
TSDB_SQL_ALTER_USER,
TSDB_SQL_ALTER_ACCT,
TSDB_SQL_ALTER_TABLE,
TSDB_SQL_ALTER_DB,
TSDB_SQL_CREATE_MNODE,
TSDB_SQL_DROP_MNODE,
TSDB_SQL_CREATE_DNODE,
TSDB_SQL_DROP_DNODE, // 20
TSDB_SQL_CFG_DNODE,
TSDB_SQL_CFG_MNODE,
TSDB_SQL_SHOW,
TSDB_SQL_RETRIEVE,
TSDB_SQL_KILL_QUERY,
TSDB_SQL_KILL_STREAM,
TSDB_SQL_KILL_CONNECTION,
TSDB_SQL_READ, // SQL below is for read operation
TSDB_SQL_CONNECT,
TSDB_SQL_USE_DB, // 30
TSDB_SQL_META,
TSDB_SQL_STABLEVGROUP,
TSDB_SQL_MULTI_META,
TSDB_SQL_HB,
TSDB_SQL_LOCAL, // SQL below for client local
TSDB_SQL_DESCRIBE_TABLE,
TSDB_SQL_RETRIEVE_LOCALMERGE,
TSDB_SQL_TABLE_JOIN_RETRIEVE,
/*
* build empty result instead of accessing dnode to fetch result
* reset the client cache
*/
TSDB_SQL_RETRIEVE_EMPTY_RESULT,
TSDB_SQL_RESET_CACHE, // 40
TSDB_SQL_SERV_STATUS,
TSDB_SQL_CURRENT_DB,
TSDB_SQL_SERV_VERSION,
TSDB_SQL_CLI_VERSION,
TSDB_SQL_CURRENT_USER,
TSDB_SQL_CFG_LOCAL,
TSDB_SQL_MAX // 47
};
// create table operation type
enum TSQL_TYPE {
TSQL_CREATE_TABLE = 0x1,
TSQL_CREATE_STABLE = 0x2,
TSQL_CREATE_TABLE_FROM_STABLE = 0x3,
TSQL_CREATE_STREAM = 0x4,
};
int32_t dnodeInitSystem();
void dnodeCleanUpSystem();
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_QSQLCMD_H
#endif
......@@ -16,8 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taos.h"
#include "tglobal.h"
#include "trpc.h"
#include "tutil.h"
#include "tconfig.h"
#include "tglobal.h"
......@@ -29,112 +27,14 @@
#include "dnodeVRead.h"
#include "dnodeShell.h"
#include "dnodeVWrite.h"
#include "tgrant.h"
static int32_t dnodeInitSystem();
static int32_t dnodeInitStorage();
extern void grantParseParameter();
static void dnodeCleanupStorage();
static void dnodeCleanUpSystem();
static void dnodeSetRunStatus(SDnodeRunStatus status);
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
static void dnodeCheckDataDirOpenned(char *dir);
static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
int32_t main(int32_t argc, char *argv[]) {
// Set global configuration file
for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
strcpy(configDir, argv[++i]);
} else {
printf("'-c' requires a parameter, default:%s\n", configDir);
exit(EXIT_FAILURE);
}
} else if (strcmp(argv[i], "-V") == 0) {
#ifdef _SYNC
char *versionStr = "enterprise";
#else
char *versionStr = "community";
#endif
printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("gitinfoI: %s\n", gitinfoOfInternal);
printf("buildinfo: %s\n", buildinfo);
exit(EXIT_SUCCESS);
} else if (strcmp(argv[i], "-k") == 0) {
grantParseParameter();
exit(EXIT_SUCCESS);
}
#ifdef TAOS_MEM_CHECK
else if (strcmp(argv[i], "--alloc-random-fail") == 0) {
if ((i < argc - 1) && (argv[i + 1][0] != '-')) {
taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, argv[++i], true);
} else {
taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, NULL, true);
}
} else if (strcmp(argv[i], "--detect-mem-leak") == 0) {
if ((i < argc - 1) && (argv[i + 1][0] != '-')) {
taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, argv[++i], true);
} else {
taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, NULL, true);
}
}
#endif
}
/* Set termination handler. */
struct sigaction act = {{0}};
act.sa_flags = SA_SIGINFO;
act.sa_sigaction = signal_handler;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL);
sigaction(SIGUSR1, &act, NULL);
sigaction(SIGUSR2, &act, NULL);
// Open /var/log/syslog file to record information.
openlog("TDengine:", LOG_PID | LOG_CONS | LOG_NDELAY, LOG_LOCAL1);
syslog(LOG_INFO, "Starting TDengine service...");
// Initialize the system
if (dnodeInitSystem() < 0) {
syslog(LOG_ERR, "Error initialize TDengine system");
closelog();
dnodeCleanUpSystem();
exit(EXIT_FAILURE);
}
syslog(LOG_INFO, "Started TDengine service successfully.");
while (1) {
sleep(1000);
}
}
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
if (signum == SIGUSR1) {
taosCfgDynamicOptions("debugFlag 135");
return;
}
if (signum == SIGUSR2) {
taosCfgDynamicOptions("resetlog");
return;
}
syslog(LOG_INFO, "Shut down signal is %d", signum);
syslog(LOG_INFO, "Shutting down TDengine service...");
// clean the system.
dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
dnodeCleanUpSystem();
// close the syslog
syslog(LOG_INFO, "Shut down TDengine service successfully");
dPrint("TDengine is shut down!");
closelog();
exit(EXIT_SUCCESS);
}
static int32_t dnodeInitSystem() {
int32_t dnodeInitSystem() {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE);
tscEmbedded = 1;
taosResolveCRC();
......@@ -180,7 +80,7 @@ static int32_t dnodeInitSystem() {
return 0;
}
static void dnodeCleanUpSystem() {
void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
dnodeCleanupShell();
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tgrant.h"
#include "tutil.h"
#include "tglobal.h"
#include "dnodeInt.h"
#include "dnodeMain.h"
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
int32_t main(int32_t argc, char *argv[]) {
// Set global configuration file
for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
strcpy(configDir, argv[++i]);
} else {
printf("'-c' requires a parameter, default:%s\n", configDir);
exit(EXIT_FAILURE);
}
} else if (strcmp(argv[i], "-V") == 0) {
#ifdef _SYNC
char *versionStr = "enterprise";
#else
char *versionStr = "community";
#endif
printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("gitinfoI: %s\n", gitinfoOfInternal);
printf("buildinfo: %s\n", buildinfo);
exit(EXIT_SUCCESS);
} else if (strcmp(argv[i], "-k") == 0) {
grantParseParameter();
exit(EXIT_SUCCESS);
}
#ifdef TAOS_MEM_CHECK
else if (strcmp(argv[i], "--alloc-random-fail") == 0) {
if ((i < argc - 1) && (argv[i + 1][0] != '-')) {
taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, argv[++i], true);
} else {
taosSetAllocMode(TAOS_ALLOC_MODE_RANDOM_FAIL, NULL, true);
}
} else if (strcmp(argv[i], "--detect-mem-leak") == 0) {
if ((i < argc - 1) && (argv[i + 1][0] != '-')) {
taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, argv[++i], true);
} else {
taosSetAllocMode(TAOS_ALLOC_MODE_DETECT_LEAK, NULL, true);
}
}
#endif
}
/* Set termination handler. */
struct sigaction act = {{0}};
act.sa_flags = SA_SIGINFO;
act.sa_sigaction = signal_handler;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL);
sigaction(SIGUSR1, &act, NULL);
sigaction(SIGUSR2, &act, NULL);
// Open /var/log/syslog file to record information.
openlog("TDengine:", LOG_PID | LOG_CONS | LOG_NDELAY, LOG_LOCAL1);
syslog(LOG_INFO, "Starting TDengine service...");
// Initialize the system
if (dnodeInitSystem() < 0) {
syslog(LOG_ERR, "Error initialize TDengine system");
closelog();
dnodeCleanUpSystem();
exit(EXIT_FAILURE);
}
syslog(LOG_INFO, "Started TDengine service successfully.");
while (1) {
sleep(1000);
}
}
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
if (signum == SIGUSR1) {
taosCfgDynamicOptions("debugFlag 135");
return;
}
if (signum == SIGUSR2) {
taosCfgDynamicOptions("resetlog");
return;
}
syslog(LOG_INFO, "Shut down signal is %d", signum);
syslog(LOG_INFO, "Shutting down TDengine service...");
// clean the system.
dPrint("shut down signal is %d, sender PID:%d", signum, sigInfo->si_pid);
dnodeCleanUpSystem();
// close the syslog
syslog(LOG_INFO, "Shut down TDengine service successfully");
dPrint("TDengine is shut down!");
closelog();
exit(EXIT_SUCCESS);
}
......@@ -92,8 +92,6 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
char *pCont = (char *) pMsg->pCont;
void *pVnode;
dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle);
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
......@@ -214,6 +212,7 @@ static void *dnodeProcessReadQueue(void *param) {
continue;
}
dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
taosFreeQitem(pReadMsg);
......
......@@ -200,6 +200,7 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0;
pHead->len = pWrite->contLen;
dTrace("%p, msg:%s will be processed", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
} else {
pHead = (SWalHead *)item;
}
......
......@@ -218,7 +218,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_LOCALE_LEN 64
#define TSDB_TIMEZONE_LEN 64
#define TSDB_FQDN_LEN 72
#define TSDB_FQDN_LEN 256
#define TSDB_IPv4ADDR_LEN 16
#define TSDB_FILENAME_LEN 128
#define TSDB_METER_VNODE_BITS 20
......@@ -234,7 +234,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
#define TSDB_PAYLOAD_SIZE (TSDB_DEFAULT_PKT_SIZE - 100)
#define TSDB_DEFAULT_PAYLOAD_SIZE 1024 // default payload size
#define TSDB_DEFAULT_PAYLOAD_SIZE 2048 // default payload size
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MAX_VNODES 256
......
......@@ -170,6 +170,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QHANDLE, 0, 459, "invalid handle"
TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 460, "query cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 461, "invalid ie")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 462, "invalid value")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FQDN, 0, 463, "invalid FQDN")
// others
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 500, "invalid file format")
......
......@@ -187,13 +187,13 @@ typedef struct SMsgHead {
// Submit message for one table
typedef struct SSubmitBlk {
int64_t uid; // table unique id
int32_t tid; // table id
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t len; // data part length, not including the SSubmitBlk head
int16_t numOfRows; // total number of rows in current submit block
char data[];
uint64_t uid; // table unique id
int32_t tid; // table id
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t len; // data part length, not including the SSubmitBlk head
int16_t numOfRows; // total number of rows in current submit block
char data[];
} SSubmitBlk;
// Submit message for this TSDB
......@@ -236,6 +236,7 @@ typedef struct {
int16_t numOfTags;
int32_t sid;
int32_t sversion;
int32_t tversion;
int32_t tagDataLen;
int32_t sqlDataLen;
uint64_t uid;
......@@ -327,9 +328,9 @@ typedef struct {
} SMDDropTableMsg;
typedef struct {
int32_t contLen;
int32_t vgId;
int64_t uid;
int32_t contLen;
int32_t vgId;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1];
} SMDDropSTableMsg;
......@@ -404,9 +405,9 @@ typedef struct SColumnInfo {
} SColumnInfo;
typedef struct STableIdInfo {
int64_t uid;
int32_t tid;
TSKEY key; // last accessed ts, for subscription
uint64_t uid;
int32_t tid;
TSKEY key; // last accessed ts, for subscription
} STableIdInfo;
typedef struct STimeWindow {
......@@ -632,6 +633,7 @@ typedef struct STableMetaMsg {
uint8_t tableType;
int16_t numOfColumns;
int16_t sversion;
int16_t tversion;
int32_t sid;
uint64_t uid;
SCMVgroupInfo vgroup;
......
......@@ -48,6 +48,7 @@ typedef struct {
int contLen;
int32_t code;
void *handle;
void *ahandle; //app handle set by client, for debug purpose
} SRpcMsg;
typedef struct {
......
......@@ -41,14 +41,13 @@
// dynamic config timestamp width according to maximum time precision
extern int32_t TIMESTAMP_OUTPUT_LENGTH;
typedef struct History History;
struct History {
typedef struct SShellHistory {
char* hist[MAX_HISTORY_SIZE];
int hstart;
int hend;
};
} SShellHistory;
struct arguments {
typedef struct SShellArguments {
char* host;
char* password;
char* user;
......@@ -62,11 +61,11 @@ struct arguments {
char* commands;
int abort;
int port;
};
} SShellArguments;
/**************** Function declarations ****************/
extern void shellParseArgument(int argc, char* argv[], struct arguments* arguments);
extern TAOS* shellInit(struct arguments* args);
extern void shellParseArgument(int argc, char* argv[], SShellArguments* arguments);
extern TAOS* shellInit(SShellArguments* args);
extern void* shellLoopQuery(void* arg);
extern void taos_error(TAOS* con);
extern int regex_match(const char* s, const char* reg, int cflags);
......@@ -76,7 +75,7 @@ void shellRunCommandOnServer(TAOS* con, char command[]);
void read_history();
void write_history();
void source_file(TAOS* con, char* fptr);
void source_dir(TAOS* con, struct arguments* args);
void source_dir(TAOS* con, SShellArguments* args);
void get_history_path(char* history);
void cleanup_handler(void* arg);
void exitShell();
......@@ -89,12 +88,12 @@ int isCommentLine(char *line);
extern char PROMPT_HEADER[];
extern char CONTINUE_PROMPT[];
extern int prompt_size;
extern History history;
extern SShellHistory history;
extern struct termios oldtio;
extern void set_terminal_mode();
extern int get_old_terminal_mode(struct termios* tio);
extern void reset_terminal_mode();
extern struct arguments args;
extern TAOS_RES* result;
extern void reset_terminal_mode();
extern SShellArguments args;
extern TAOS_RES* result;
#endif
......@@ -33,12 +33,12 @@ char PROMPT_HEADER[] = "taos> ";
char CONTINUE_PROMPT[] = " -> ";
int prompt_size = 6;
TAOS_RES *result = NULL;
History history;
SShellHistory history;
/*
* FUNCTION: Initialize the shell.
*/
TAOS *shellInit(struct arguments *args) {
TAOS *shellInit(SShellArguments *args) {
printf("\n");
printf(CLIENT_VERSION, tsOsName, taos_get_client_info());
fflush(stdout);
......
......@@ -221,7 +221,7 @@ void* shellImportThreadFp(void *arg)
return NULL;
}
static void shellRunImportThreads(struct arguments* args)
static void shellRunImportThreads(SShellArguments* args)
{
pthread_attr_t thattr;
ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj));
......@@ -254,7 +254,7 @@ static void shellRunImportThreads(struct arguments* args)
free(threadObj);
}
void source_dir(TAOS* con, struct arguments* args) {
void source_dir(TAOS* con, SShellArguments* args) {
shellGetDirectoryFileList(args->dir);
int64_t start = taosGetTimestampMs();
......
......@@ -50,7 +50,7 @@ static struct argp_option options[] = {
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Get the input argument from argp_parse, which we
know is a pointer to our arguments structure. */
struct arguments *arguments = state->input;
SShellArguments *arguments = state->input;
wordexp_t full_path;
switch (key) {
......@@ -129,7 +129,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Our argp parser. */
static struct argp argp = {options, parse_opt, args_doc, doc};
void shellParseArgument(int argc, char *argv[], struct arguments *arguments) {
void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
static char verType[32] = {0};
sprintf(verType, "version: %s\n", version);
......
......@@ -62,7 +62,7 @@ int checkVersion() {
}
// Global configurations
struct arguments args = {
SShellArguments args = {
.host = NULL,
.password = NULL,
.user = NULL,
......
......@@ -67,7 +67,7 @@ static struct argp_option options[] = {
{0}};
/* Used by main to communicate with parse_opt. */
struct arguments {
typedef struct DemoArguments {
char *host;
uint16_t port;
char *user;
......@@ -87,13 +87,13 @@ struct arguments {
int num_of_DPT;
int abort;
char **arg_list;
};
} SDemoArguments;
/* Parse a single option. */
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Get the input argument from argp_parse, which we
know is a pointer to our arguments structure. */
struct arguments *arguments = state->input;
SDemoArguments *arguments = state->input;
wordexp_t full_path;
char **sptr;
switch (key) {
......@@ -269,7 +269,7 @@ double getCurrentTime();
void callBack(void *param, TAOS_RES *res, int code);
int main(int argc, char *argv[]) {
struct arguments arguments = {NULL, // host
SDemoArguments arguments = {NULL, // host
0, // port
"root", // user
"taosdata", // password
......
......@@ -168,7 +168,7 @@ static struct argp_option options[] = {
{0}};
/* Used by main to communicate with parse_opt. */
struct arguments {
typedef struct SDumpArguments {
// connection option
char *host;
char *user;
......@@ -193,13 +193,13 @@ struct arguments {
char **arg_list;
int arg_list_len;
bool isDumpIn;
};
} SDumpArguments;
/* Parse a single option. */
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Get the input argument from argp_parse, which we
know is a pointer to our arguments structure. */
struct arguments *arguments = state->input;
SDumpArguments *arguments = state->input;
wordexp_t full_path;
switch (key) {
......@@ -296,31 +296,31 @@ char *command = NULL;
char *lcommand = NULL;
char *buffer = NULL;
int taosDumpOut(struct arguments *arguments);
int taosDumpOut(SDumpArguments *arguments);
int taosDumpIn(struct arguments *arguments);
int taosDumpIn(SDumpArguments *arguments);
void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp);
int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp);
int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp);
void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, struct arguments *arguments, FILE *fp);
void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, SDumpArguments *arguments, FILE *fp);
void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, struct arguments *arguments,
void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, SDumpArguments *arguments,
FILE *fp);
int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FILE *fp);
int32_t taosDumpTable(char *table, char *metric, SDumpArguments *arguments, FILE *fp);
int32_t taosDumpMetric(char *metric, struct arguments *arguments, FILE *fp);
int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp);
int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments);
int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments);
int taosCheckParam(struct arguments *arguments);
int taosCheckParam(SDumpArguments *arguments);
void taosFreeDbInfos();
int main(int argc, char *argv[]) {
struct arguments arguments = {
SDumpArguments arguments = {
// connection option
NULL, "root", "taosdata", 0,
// output file
......@@ -424,7 +424,7 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) {
return -1;
}
int taosDumpOut(struct arguments *arguments) {
int taosDumpOut(SDumpArguments *arguments) {
TAOS_ROW row;
char *temp = NULL;
FILE *fp = NULL;
......@@ -602,7 +602,7 @@ void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
fprintf(fp, "%s\n\n", buffer);
}
int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp) {
int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) {
TAOS_ROW row;
int fd = -1;
STableRecord tableRecord;
......@@ -660,7 +660,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp) {
return 0;
}
void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, struct arguments *arguments, FILE *fp) {
void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, SDumpArguments *arguments, FILE *fp) {
char *pstr = NULL;
pstr = buffer;
int counter = 0;
......@@ -703,7 +703,7 @@ void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, struct argume
fprintf(fp, "%s\n\n", buffer);
}
void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, struct arguments *arguments,
void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, SDumpArguments *arguments,
FILE *fp) {
char *pstr = NULL;
pstr = buffer;
......@@ -786,7 +786,7 @@ int taosGetTableDes(char *table, STableDef *tableDes) {
return count;
}
int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FILE *fp) {
int32_t taosDumpTable(char *table, char *metric, SDumpArguments *arguments, FILE *fp) {
int count = 0;
STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
......@@ -828,7 +828,7 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI
return taosDumpTableData(fp, table, arguments);
}
int32_t taosDumpMetric(char *metric, struct arguments *arguments, FILE *fp) {
int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) {
TAOS_ROW row = NULL;
int fd = -1;
STableRecord tableRecord;
......@@ -877,7 +877,7 @@ int32_t taosDumpMetric(char *metric, struct arguments *arguments, FILE *fp) {
return 0;
}
int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments) {
int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) {
/* char temp[MAX_COMMAND_SIZE] = "\0"; */
int count = 0;
char *pstr = NULL;
......@@ -987,7 +987,7 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments) {
return 0;
}
int taosCheckParam(struct arguments *arguments) {
int taosCheckParam(SDumpArguments *arguments) {
if (arguments->all_databases && arguments->databases) {
fprintf(stderr, "conflict option --all-databases and --databases\n");
return -1;
......@@ -1072,7 +1072,7 @@ void taosReplaceCtrlChar(char *str) {
*pstr = '\0';
}
int taosDumpIn(struct arguments *arguments) {
int taosDumpIn(SDumpArguments *arguments) {
assert(arguments->isDumpIn);
int tsize = 0;
......
......@@ -77,6 +77,7 @@ typedef struct SSuperTableObj {
uint64_t uid;
int64_t createdTime;
int32_t sversion;
int32_t tversion;
int32_t numOfColumns;
int32_t numOfTags;
int8_t reserved[15];
......
......@@ -124,6 +124,8 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) {
void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
mTrace("%p, msg:%s will be processed", rpcMsg->ahandle, taosMsg[rpcMsg->msgType]);
if (rpcMsg->pCont == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
return;
......
......@@ -763,6 +763,7 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) {
pStable->createdTime = taosGetTimestampMs();
pStable->uid = (((uint64_t) pStable->createdTime) << 16) + (sdbGetVersion() & ((1ul << 16) - 1ul));
pStable->sversion = 0;
pStable->tversion = 0;
pStable->numOfColumns = htons(pCreate->numOfColumns);
pStable->numOfTags = htons(pCreate->numOfTags);
......@@ -882,7 +883,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
}
pStable->numOfTags += ntags;
pStable->sversion++;
pStable->tversion++;
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
......@@ -909,7 +910,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
memmove(pStable->schema + pStable->numOfColumns + col, pStable->schema + pStable->numOfColumns + col + 1,
sizeof(SSchema) * (pStable->numOfTags - col - 1));
pStable->numOfTags--;
pStable->sversion++;
pStable->tversion++;
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
......@@ -1235,6 +1236,7 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) {
STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16));
pMeta->uid = htobe64(pTable->uid);
pMeta->sversion = htons(pTable->sversion);
pMeta->tversion = htons(pTable->tversion);
pMeta->precision = pMsg->pDb->cfg.precision;
pMeta->numOfTags = (uint8_t)pTable->numOfTags;
pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns);
......@@ -1358,12 +1360,14 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb
pCreate->numOfColumns = htons(pTable->superTable->numOfColumns);
pCreate->numOfTags = htons(pTable->superTable->numOfTags);
pCreate->sversion = htonl(pTable->superTable->sversion);
pCreate->tversion = htonl(pTable->superTable->tversion);
pCreate->tagDataLen = htonl(tagDataLen);
pCreate->superTableUid = htobe64(pTable->superTable->uid);
} else {
pCreate->numOfColumns = htons(pTable->numOfColumns);
pCreate->numOfTags = 0;
pCreate->sversion = htonl(pTable->sversion);
pCreate->tversion = 0;
pCreate->tagDataLen = 0;
pCreate->superTableUid = 0;
}
......@@ -1686,15 +1690,17 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
pMeta->sid = htonl(pTable->sid);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = pTable->info.type;
strncpy(pMeta->tableId, pTable->info.tableId, tListLen(pTable->info.tableId));
strncpy(pMeta->tableId, pTable->info.tableId, strlen(pTable->info.tableId));
if (pTable->info.type == TSDB_CHILD_TABLE) {
pMeta->sversion = htons(pTable->superTable->sversion);
pMeta->tversion = htons(pTable->superTable->tversion);
pMeta->numOfTags = (int8_t)pTable->superTable->numOfTags;
pMeta->numOfColumns = htons((int16_t)pTable->superTable->numOfColumns);
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable);
} else {
pMeta->sversion = htons(pTable->sversion);
pMeta->tversion = 0;
pMeta->numOfTags = 0;
pMeta->numOfColumns = htons((int16_t)pTable->numOfColumns);
pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable);
......
......@@ -49,6 +49,7 @@ typedef struct {
char encrypt:3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID
uint32_t linkUid; // for unique connection ID assigned by client
uint64_t ahandle; // ahandle assigned by client
uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario
......
......@@ -146,7 +146,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
rpcUnlockCache(pCache->lockedBy+hash);
pCache->total++;
tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]);
// tTrace("%p %s:%hu:%d:%d:%p added into cache, connections:%d", data, fqdn, port, connType, hash, pNode, pCache->count[hash]);
return;
}
......@@ -200,9 +200,9 @@ void *rpcGetConnFromCache(void *handle, char *fqdn, uint16_t port, int8_t connTy
rpcUnlockCache(pCache->lockedBy+hash);
if (pData) {
tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]);
//tTrace("%p %s:%hu:%d:%d:%p retrieved from cache, connections:%d", pData, fqdn, port, connType, hash, pNode, pCache->count[hash]);
} else {
tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]);
//tTrace("%s:%hu:%d:%d failed to retrieve conn from cache, connections:%d", fqdn, port, connType, hash, pCache->count[hash]);
}
return pData;
......@@ -240,8 +240,8 @@ static void rpcRemoveExpiredNodes(SConnCache *pCache, SConnHash *pNode, int hash
pNext = pNode->next;
pCache->total--;
pCache->count[hash]--;
tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode,
pCache->count[hash]);
//tTrace("%p %s:%hu:%d:%d:%p removed from cache, connections:%d", pNode->data, pNode->fqdn, pNode->port, pNode->connType, hash, pNode,
// pCache->count[hash]);
taosMemPoolFree(pCache->connHashMemPool, (char *)pNode);
pNode = pNext;
}
......
......@@ -87,6 +87,7 @@ typedef struct {
} SRpcReqContext;
typedef struct SRpcConn {
char info[50];// debug info: label + pConn + ahandle
int sid; // session ID
uint32_t ownId; // own link ID
uint32_t peerId; // peer link ID
......@@ -275,7 +276,7 @@ void *rpcOpen(const SRpcInit *pInit) {
return NULL;
}
tTrace("%s RPC is openned, numOfThreads:%d", pRpc->label, pRpc->numOfThreads);
tTrace("%s rpc is openned, threads:%d sessions:%d", pRpc->label, pRpc->numOfThreads, pInit->sessions);
return pRpc;
}
......@@ -299,7 +300,7 @@ void rpcClose(void *param) {
tfree(pRpc->connList);
pthread_mutex_destroy(&pRpc->mutex);
tTrace("%s RPC is closed", pRpc->label);
tTrace("%s rpc is closed", pRpc->label);
tfree(pRpc);
}
......@@ -375,8 +376,6 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg)
void rpcSendResponse(const SRpcMsg *pRsp) {
int msgLen = 0;
SRpcConn *pConn = (SRpcConn *)pRsp->handle;
SRpcInfo *pRpc = pConn->pRpc;
SRpcMsg rpcMsg = *pRsp;
SRpcMsg *pMsg = &rpcMsg;
......@@ -394,7 +393,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
rpcLockConn(pConn);
if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
tTrace("%s %p, connection is already released, rsp wont be sent", pRpc->label, pConn);
tTrace("%s, connection is already released, rsp wont be sent", pConn->info);
rpcUnlockConn(pConn);
return;
}
......@@ -410,7 +409,8 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
pHead->linkUid = pConn->linkUid;
pHead->port = htons(pConn->localPort);
pHead->code = htonl(pMsg->code);
pHead->ahandle = (uint64_t) pConn->ahandle;
// set pConn parameters
pConn->inType = 0;
......@@ -492,6 +492,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
uint32_t peerIp = taosGetIpFromFqdn(peerFqdn);
if (peerIp == -1) {
tError("%s, failed to resolve FQDN:%s", pRpc->label, peerFqdn);
terrno = TSDB_CODE_APP_ERROR;
return NULL;
}
......@@ -507,11 +508,7 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
if (taosOpenConn[connType]) {
void *shandle = (connType & RPC_CONN_TCP)? pRpc->tcphandle:pRpc->udphandle;
pConn->chandle = (*taosOpenConn[connType])(shandle, pConn, pConn->peerIp, pConn->peerPort);
if (pConn->chandle) {
tTrace("%s %p, rpc connection is set up, sid:%d id:%s %s:%hu connType:%d", pRpc->label,
pConn, pConn->sid, pRpc->user, peerFqdn, pConn->peerPort, pConn->connType);
} else {
tError("%s %p, failed to set up connection to %s:%hu", pRpc->label, pConn, peerFqdn, pConn->peerPort);
if (pConn->chandle == NULL) {
terrno = TSDB_CODE_NETWORK_UNAVAIL;
rpcCloseConn(pConn);
pConn = NULL;
......@@ -558,7 +555,7 @@ static void rpcCloseConn(void *thandle) {
taosFreeId(pRpc->idPool, pConn->sid);
pConn->pContext = NULL;
tTrace("%s %p, rpc connection is closed", pRpc->label, pConn);
tTrace("%s, rpc connection is closed", pConn->info);
rpcUnlockConn(pConn);
}
......@@ -620,7 +617,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
if (terrno != 0) {
tWarn("%s %p, user not there or server not ready", pRpc->label, pConn);
taosFreeId(pRpc->idPool, sid); // sid shall be released
pConn = NULL;
}
......@@ -635,8 +631,6 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
pRpc->label, pConn, sid, pConn->user, pConn->localPort);
}
return pConn;
......@@ -661,7 +655,6 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
if (pConn) {
if (pConn->linkUid != pHead->linkUid) {
tTrace("%s %p, linkUid:0x%x not matched, received:0x%x", pRpc->label, pConn, pConn->linkUid, pHead->linkUid);
terrno = TSDB_CODE_MISMATCHED_METER_ID;
pConn = NULL;
}
......@@ -678,21 +671,25 @@ static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
pConn = rpcGetConnFromCache(pRpc->pCache, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType);
if ( pConn == NULL || pConn->user[0] == 0) {
pConn = rpcOpenConn(pRpc, pIpSet->fqdn[pIpSet->inUse], pIpSet->port[pIpSet->inUse], pContext->connType);
}
if (pConn) {
pConn->ahandle = pContext->ahandle;
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
} else {
tTrace("%s %p, connection is retrieved from cache", pRpc->label, pConn);
tError("%s %p, failed to set up connection(%s)", pRpc->label, pContext->ahandle, tstrerror(terrno));
}
return pConn;
}
static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc= pConn->pRpc;
if (pConn->peerId == 0) {
pConn->peerId = pHead->sourceId;
} else {
if (pConn->peerId != pHead->sourceId) {
tTrace("%s %p, source Id is changed, old:0x%08x new:0x%08x", pRpc->label, pConn,
tTrace("%s, source Id is changed, old:0x%08x new:0x%08x", pConn->info,
pConn->peerId, pHead->sourceId);
return TSDB_CODE_INVALID_VALUE;
}
......@@ -701,17 +698,16 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
if (pConn->inTranId == pHead->tranId) {
if (pConn->inType == pHead->msgType) {
if (pHead->code == 0) {
tTrace("%s %p, %s is retransmitted", pRpc->label, pConn, taosMsg[pHead->msgType]);
tTrace("%s, %s is retransmitted", pConn->info, taosMsg[pHead->msgType]);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
} else {
// do nothing, it is heart beat from client
}
} else if (pConn->inType == 0) {
tTrace("%s %p, %s is already processed, tranId:%d", pRpc->label, pConn,
taosMsg[pHead->msgType], pConn->inTranId);
tTrace("%s, %s is already processed, tranId:%d", pConn->info, taosMsg[pHead->msgType], pConn->inTranId);
rpcSendMsgToPeer(pConn, pConn->pRspMsg, pConn->rspMsgLen); // resend the response
} else {
tTrace("%s %p, mismatched message %s and tranId", pRpc->label, pConn, taosMsg[pHead->msgType]);
tTrace("%s, mismatched message %s and tranId", pConn->info, taosMsg[pHead->msgType]);
}
// do not reply any message
......@@ -719,7 +715,7 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
}
if (pConn->inType != 0) {
tTrace("%s %p, last session is not finished, inTranId:%d tranId:%d", pRpc->label, pConn,
tTrace("%s, last session is not finished, inTranId:%d tranId:%d", pConn->info,
pConn->inTranId, pHead->tranId);
return TSDB_CODE_LAST_SESSION_NOT_FINISHED;
}
......@@ -751,7 +747,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
if (pHead->code == TSDB_CODE_ACTION_IN_PROGRESS) {
if (pConn->tretry <= tsRpcMaxRetry) {
tTrace("%s %p, peer is still processing the transaction", pRpc->label, pConn);
tTrace("%s, peer is still processing the transaction", pConn->info);
pConn->tretry++;
rpcSendReqHead(pConn);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
......@@ -790,7 +786,15 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
pConn = rpcGetConnObj(pRpc, sid, pRecv);
if (pConn == NULL) return NULL;
if (pConn == NULL) {
tError("%s %p, failed to get connection obj(%s)", pRpc->label, pHead->ahandle, tstrerror(terrno));
return NULL;
} else {
if (rpcIsReq(pHead->msgType)) {
pConn->ahandle = (void *)pHead->ahandle;
sprintf(pConn->info, "%s %p %p", pRpc->label, pConn, pConn->ahandle);
}
}
rpcLockConn(pConn);
sid = pConn->sid;
......@@ -827,7 +831,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
static void rpcProcessBrokenLink(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc;
tTrace("%s %p, link is broken", pRpc->label, pConn);
tTrace("%s, link is broken", pConn->info);
// pConn->chandle = NULL;
if (pConn->outType) {
......@@ -838,7 +842,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
if (pConn->inType) {
// if there are pending request, notify the app
tTrace("%s %p, connection is gone, notify the app", pRpc->label, pConn);
tTrace("%s, connection is gone, notify the app", pConn->info);
/*
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
......@@ -873,17 +877,17 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
pConn = rpcProcessMsgHead(pRpc, pRecv);
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) {
tTrace("%s %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x",
pRpc->label, pConn, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
tTrace("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x",
pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType], pRecv->ip, pRecv->port, terrno,
pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code);
}
int32_t code = terrno;
if (code != TSDB_CODE_ALREADY_PROCESSED) {
if (code != 0) { // parsing error
if ( rpcIsReq(pHead->msgType) ) {
if (rpcIsReq(pHead->msgType)) {
rpcSendErrorMsgToPeer(pRecv, code);
tTrace("%s %p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code);
tTrace("%s %p %p, %s is sent with error code:%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code);
}
} else { // parsing OK
rpcProcessIncomingMsg(pConn, pHead);
......@@ -925,6 +929,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code;
rpcMsg.ahandle = pConn->ahandle;
if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.handle = pConn;
......@@ -949,14 +954,14 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
pContext->redirect++;
if (pContext->redirect > TSDB_MAX_REPLICA) {
pHead->code = TSDB_CODE_NETWORK_UNAVAIL;
tWarn("%s %p, too many redirects, quit", pRpc->label, pConn);
tWarn("%s, too many redirects, quit", pConn->info);
}
}
if (pHead->code == TSDB_CODE_REDIRECT) {
pContext->numOfTry = 0;
memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet));
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps);
tTrace("%s, redirect is received, numOfIps:%d", pConn->info, pContext->ipSet.numOfIps);
for (int i=0; i<pContext->ipSet.numOfIps; ++i)
pContext->ipSet.port[i] = htons(pContext->ipSet.port[i]);
rpcSendReqToServer(pRpc, pContext);
......@@ -1062,6 +1067,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
return;
}
pConn->ahandle = pContext->ahandle;
rpcLockConn(pConn);
// set the message header
......@@ -1075,6 +1081,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead->destId = pConn->peerId;
pHead->port = 0;
pHead->linkUid = pConn->linkUid;
pHead->ahandle = (uint64_t)pConn->ahandle;
if (!pConn->secured) memcpy(pHead->user, pConn->user, tListLen(pHead->user));
// set the connection parameters
......@@ -1092,30 +1099,28 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
int writtenLen = 0;
SRpcInfo *pRpc = pConn->pRpc;
SRpcHead *pHead = (SRpcHead *)msg;
msgLen = rpcAddAuthPart(pConn, msg, msgLen);
if ( rpcIsReq(pHead->msgType)) {
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
tTrace("%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerFqdn,
pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
tTrace("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d",
pConn->info, taosMsg[pHead->msgType], pConn->peerFqdn, pConn->peerPort,
msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} else {
if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s %p, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort,
tTrace("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
pConn->info, taosMsg[pHead->msgType], pConn->peerIp, pConn->peerPort,
htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
}
tTrace("connection type is: %d", pConn->connType);
//tTrace("connection type is: %d", pConn->connType);
writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);
if (writtenLen != msgLen) {
tError("%s %p, failed to send, dataLen:%d writtenLen:%d, reason:%s", pRpc->label, pConn,
msgLen, writtenLen, strerror(errno));
tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
}
tDump(msg, msgLen);
......@@ -1130,7 +1135,7 @@ static void rpcProcessConnError(void *param, void *id) {
return;
}
tTrace("%s connection error happens", pRpc->label);
tTrace("%s %p, connection error happens", pRpc->label, pContext->ahandle);
if (pContext->numOfTry >= pContext->ipSet.numOfIps) {
rpcMsg.msgType = pContext->msgType+1;
......@@ -1156,23 +1161,21 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
rpcLockConn(pConn);
if (pConn->outType && pConn->user[0]) {
tTrace("%s %p, expected %s is not received", pRpc->label, pConn, taosMsg[(int)pConn->outType + 1]);
tTrace("%s, expected %s is not received", pConn->info, taosMsg[(int)pConn->outType + 1]);
pConn->pTimer = NULL;
pConn->retry++;
if (pConn->retry < 4) {
tTrace("%s %p, re-send msg:%s to %s:%hu", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
tTrace("%s, re-send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
rpcSendMsgToPeer(pConn, pConn->pReqMsg, pConn->reqMsgLen);
taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else {
// close the connection
tTrace("%s %p, failed to send msg:%s to %s:%hu", pRpc->label, pConn,
taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
reportDisc = 1;
}
} else {
tTrace("%s %p, retry timer not processed", pRpc->label, pConn);
tTrace("%s, retry timer not processed", pConn->info);
}
rpcUnlockConn(pConn);
......@@ -1189,10 +1192,10 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcInfo *pRpc = pConn->pRpc;
if (pConn->user[0]) {
tTrace("%s %p, close the connection since no activity", pRpc->label, pConn);
tTrace("%s, close the connection since no activity", pConn->info);
if (pConn->inType && pRpc->cfp) {
// if there are pending request, notify the app
tTrace("%s %p, notify the app, connection is gone", pRpc->label, pConn);
tTrace("%s, notify the app, connection is gone", pConn->info);
/*
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
......@@ -1205,7 +1208,7 @@ static void rpcProcessIdleTimer(void *param, void *tmrId) {
}
rpcCloseConn(pConn);
} else {
tTrace("%s %p, idle timer:%p not processed", pRpc->label, pConn, tmrId);
tTrace("%s, idle timer:%p not processed", pConn->info, tmrId);
}
}
......@@ -1216,11 +1219,11 @@ static void rpcProcessProgressTimer(void *param, void *tmrId) {
rpcLockConn(pConn);
if (pConn->inType && pConn->user[0]) {
tTrace("%s %p, progress timer expired, send progress", pRpc->label, pConn);
tTrace("%s, progress timer expired, send progress", pConn->info);
rpcSendQuickRsp(pConn, TSDB_CODE_ACTION_IN_PROGRESS);
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
} else {
tTrace("%s %p, progress timer:%p not processed", pRpc->label, pConn, tmrId);
tTrace("%s, progress timer:%p not processed", pConn->info, tmrId);
}
rpcUnlockConn(pConn);
......@@ -1254,7 +1257,7 @@ static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
memcpy(pCont + overhead, buf, compLen);
pHead->comp = 1;
tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
//tTrace("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead;
} else {
finalLen = contLen;
......@@ -1288,7 +1291,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
pNewHead->msgLen = rpcMsgLenFromCont(origLen);
rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead;
tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
//tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen);
} else {
tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
}
......@@ -1345,7 +1348,6 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen) {
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
SRpcHead *pHead = (SRpcHead *)msg;
SRpcInfo *pRpc = pConn->pRpc;
int code = 0;
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)){
......@@ -1373,20 +1375,20 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) {
tWarn("%s %p, time diff:%d is too big, msg discarded", pRpc->label, pConn, delta);
tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
code = TSDB_CODE_INVALID_TIME_STAMP;
} else {
if (rpcAuthenticateMsg(pHead, msgLen-TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
tError("%s %p, authentication failed, msg discarded", pRpc->label, pConn);
tError("%s, authentication failed, msg discarded", pConn->info);
code = TSDB_CODE_AUTH_FAILURE;
} else {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
if ( !rpcIsReq(pHead->msgType) ) pConn->secured = 1; // link is secured for client
tTrace("%s %p, message is authenticated", pRpc->label, pConn);
//tTrace("%s, message is authenticated", pConn->info);
}
}
} else {
tTrace("%s %p, auth spi:%d not matched with received:%d", pRpc->label, pConn, pConn->spi, pHead->spi);
tError("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
code = TSDB_CODE_AUTH_FAILURE;
}
......
......@@ -218,7 +218,7 @@ static void *taosRecvUdpData(void *param) {
while (1) {
dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
port = ntohs(sourceAdd.sin_port);
tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen);
//tTrace("%s msg is recv from 0x%x:%hu len:%d", pConn->label, sourceAdd.sin_addr.s_addr, port, dataLen);
if (dataLen < sizeof(SRpcHead)) {
tError("%s recvfrom failed, reason:%s\n", pConn->label, strerror(errno));
......
......@@ -214,12 +214,12 @@ typedef struct {
int64_t tombSize; // unused file size
int32_t totalBlocks;
int32_t totalSubBlocks;
} SFileInfo;
} STsdbFileInfo;
typedef struct {
int fd;
char fname[128];
SFileInfo info;
STsdbFileInfo info;
} SFile;
#define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1)
......@@ -350,7 +350,7 @@ SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid);
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
// TSDB repository definition
typedef struct _tsdb_repo {
typedef struct STsdbRepo {
char *rootDir;
// TSDB configuration
STsdbCfg config;
......
......@@ -12,6 +12,7 @@ python3 ./test.py $1 -f insert/binary.py
python3 ./test.py $1 -f insert/nchar.py
python3 ./test.py $1 -f insert/nchar-boundary.py
python3 ./test.py $1 -f insert/nchar-unicode.py
python3 ./test.py $1 -f insert/multi.py
python3 ./test.py $1 -f table/column_name.py
python3 ./test.py $1 -f table/column_num.py
......
......@@ -58,6 +58,14 @@ class TDTestCase:
tdSql.query('select * from tb order by ts desc')
tdLog.info('tdSql.checkRow(6)')
tdSql.checkRows(6)
tdLog.info('=============== step7')
tdLog.info("insert into tb values (now+6m, true)")
tdSql.execute("insert into tb values (now+5m, true)")
tdLog.info('select * from tb order by ts desc')
tdSql.query('select * from tb order by ts desc')
tdLog.info('tdSql.checkRow(7)')
tdSql.checkRows(7)
# convert end
# convert end
def stop(self):
......
......@@ -21,6 +21,8 @@ python3 ./test.py $1 -f insert/date.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/nchar.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/multi.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f table/column_name.py
python3 ./test.py $1 -s && sleep 1
......
# -*- coding: utf-8 -*-
import sys
import string
import random
import subprocess
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
getTableNameLen = "grep -w '#define TSDB_TABLE_NAME_LEN' ../../src/inc/taosdef.h|awk '{print $3}'"
tableNameMaxLen = int(
subprocess.check_output(
getTableNameLen, shell=True))
tdLog.info("table name max length is %d" % tableNameMaxLen)
chars = string.ascii_uppercase + string.ascii_lowercase
tb_name = ''.join(random.choices(chars, k=tableNameMaxLen))
tdLog.info('tb_name length %d' % len(tb_name))
tdLog.info('create table %s (ts timestamp, value int)' % tb_name)
tdSql.error(
'create table %s (ts timestamp, speed binary(4089))' %
tb_name)
tb_name = ''.join(random.choices(chars, k=191))
tdLog.info('tb_name length %d' % len(tb_name))
tdLog.info('create table %s (ts timestamp, value int)' % tb_name)
tdSql.execute(
'create table %s (ts timestamp, speed binary(4089))' %
tb_name)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
......@@ -223,8 +223,8 @@ class TDDnode:
self.running = 1
tdLog.debug("dnode:%d is running with %s " % (self.index, cmd))
tdLog.debug("wait 4 seconds for the dnode:%d to start." % (self.index))
time.sleep(4)
tdLog.debug("wait 5 seconds for the dnode:%d to start." % (self.index))
time.sleep(5)
def stop(self):
if self.valgrind == 0:
......@@ -233,16 +233,14 @@ class TDDnode:
toBeKilled = "valgrind.bin"
if self.running != 0:
killCmd = "ps -ef|grep -w %s| grep '%s' | grep -v grep | awk '{print $2}' | xargs kill -INT" % (
toBeKilled, self.cfgDir)
psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID):
killCmd = "kill -INT %s" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
self.running = 0
tdLog.debug("dnode:%d is stopped by kill -INT" % (self.index))
......@@ -254,15 +252,14 @@ class TDDnode:
toBeKilled = "valgrind.bin"
if self.running != 0:
killCmd = "ps -ef|grep -w %s| grep '%s' | grep -v grep | awk '{print $2}' | xargs kill -KILL" % (
toBeKilled, self.cfgDir)
psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID):
killCmd = "kill -KILL %s" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
self.running = 0
tdLog.debug("dnode:%d is stopped by kill -KILL" % (self.index))
......@@ -307,21 +304,21 @@ class TDDnodes:
self.dnodes.append(TDDnode(10))
def init(self, path):
killCmd = "ps -ef|grep -w taosd | grep -v grep | awk '{print $2}' | xargs kill -KILL"
psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID):
killCmd = "kill -KILL %s" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
killCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs kill -KILL"
psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID):
killCmd = "kill -KILL %s" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
binPath = os.path.dirname(os.path.realpath(__file__))
binPath = binPath + "/../../../debug/"
......@@ -407,27 +404,27 @@ class TDDnodes:
self.dnodes[i].stop()
psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
if processID:
cmd = "sudo systemctl stop taosd"
os.system(cmd)
# if os.system(cmd) != 0 :
# tdLog.exit(cmd)
killCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}' | xargs kill -KILL"
psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID):
killCmd = "kill -KILL %s" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
killCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}' | xargs kill -KILL"
psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID):
killCmd = "kill -KILL %s" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True)
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
# if os.system(cmd) != 0 :
# tdLog.exit(cmd)
......
#!/bin/bash
python3 ./test.py $1 -f insert/basic.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/int.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/float.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/bigint.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/bool.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/double.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/smallint.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/tinyint.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/binary.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/date.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/nchar.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py -g -f insert/basic.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/int.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/float.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/bigint.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/bool.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/double.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/smallint.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/tinyint.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/binary.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/date.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/nchar.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f insert/multi.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py $1 -f table/column_name.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f table/column_num.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f table/db_table.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py -g -f table/column_name.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f table/column_num.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py -g -f table/db_table.py
python3 ./test.py -g -s && sleep 1
python3 ./test.py $1 -f import_merge/importDataLastSub.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py -g -f import_merge/importDataLastSub.py
python3 ./test.py -g -s && sleep 1
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册