未验证 提交 0d18e577 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2146 from taosdata/feature/show

Feature/show
...@@ -26,7 +26,7 @@ void tscAddIntoSqlList(SSqlObj *pSql); ...@@ -26,7 +26,7 @@ void tscAddIntoSqlList(SSqlObj *pSql);
void tscRemoveFromSqlList(SSqlObj *pSql); void tscRemoveFromSqlList(SSqlObj *pSql);
void tscAddIntoStreamList(SSqlStream *pStream); void tscAddIntoStreamList(SSqlStream *pStream);
void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj); void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj);
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj); int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj);
void tscKillQuery(STscObj *pObj, uint32_t killId); void tscKillQuery(STscObj *pObj, uint32_t killId);
void tscKillStream(STscObj *pObj, uint32_t killId); void tscKillStream(STscObj *pObj, uint32_t killId);
void tscKillConnection(STscObj *pObj); void tscKillConnection(STscObj *pObj);
......
...@@ -294,6 +294,7 @@ typedef struct STscObj { ...@@ -294,6 +294,7 @@ typedef struct STscObj {
char sversion[TSDB_VERSION_LEN]; char sversion[TSDB_VERSION_LEN];
char writeAuth : 1; char writeAuth : 1;
char superAuth : 1; char superAuth : 1;
uint32_t connId;
struct SSqlObj * pHb; struct SSqlObj * pHb;
struct SSqlObj * sqlList; struct SSqlObj * sqlList;
struct SSqlStream *streamList; struct SSqlStream *streamList;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "taosmsg.h"
void tscSaveSlowQueryFp(void *handle, void *tmrId); void tscSaveSlowQueryFp(void *handle, void *tmrId);
void *tscSlowQueryConn = NULL; void *tscSlowQueryConn = NULL;
...@@ -96,7 +97,7 @@ void tscSaveSlowQuery(SSqlObj *pSql) { ...@@ -96,7 +97,7 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
} }
tscTrace("%p query time:%" PRId64 " sql:%s", pSql, pSql->res.useconds, pSql->sqlstr); tscTrace("%p query time:%" PRId64 " sql:%s", pSql, pSql->res.useconds, pSql->sqlstr);
int32_t sqlSize = TSDB_SHOW_SQL_LEN + size; int32_t sqlSize = TSDB_SLOW_QUERY_SQL_LEN + size;
char *sql = malloc(sqlSize); char *sql = malloc(sqlSize);
if (sql == NULL) { if (sql == NULL) {
...@@ -106,9 +107,9 @@ void tscSaveSlowQuery(SSqlObj *pSql) { ...@@ -106,9 +107,9 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
int len = snprintf(sql, size, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName, int len = snprintf(sql, size, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName,
pSql->pTscObj->user, pSql->stime, pSql->res.useconds); pSql->pTscObj->user, pSql->stime, pSql->res.useconds);
int sqlLen = snprintf(sql + len, TSDB_SHOW_SQL_LEN, "%s", pSql->sqlstr); int sqlLen = snprintf(sql + len, TSDB_SLOW_QUERY_SQL_LEN, "%s", pSql->sqlstr);
if (sqlLen > TSDB_SHOW_SQL_LEN - 1) { if (sqlLen > TSDB_SLOW_QUERY_SQL_LEN - 1) {
sqlLen = len + TSDB_SHOW_SQL_LEN - 1; sqlLen = len + TSDB_SLOW_QUERY_SQL_LEN - 1;
} else { } else {
sqlLen += len; sqlLen += len;
} }
...@@ -205,28 +206,28 @@ void tscKillStream(STscObj *pObj, uint32_t killId) { ...@@ -205,28 +206,28 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
} }
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
if (pStream) { if (pStream) {
tscTrace("%p stream:%p is killed, streamId:%d", pStream->pSql, pStream, killId); tscTrace("%p stream:%p is killed, streamId:%d", pStream->pSql, pStream, killId);
if (pStream->callback) {
pStream->callback(pStream->param);
}
taos_close_stream(pStream);
} else {
tscError("failed to kill stream, streamId:%d not exist", killId);
} }
if (pStream->callback) {
pStream->callback(pStream->param);
}
taos_close_stream(pStream);
} }
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256; SCMHeartBeatMsg *pHeartbeat = pMsg;
int allocedQueriesNum = pHeartbeat->numOfQueries;
int allocedStreamsNum = pHeartbeat->numOfStreams;
SQqueryList *pQList = (SQqueryList *)pMsg; pHeartbeat->numOfQueries = 0;
pQList->numOfQueries = 0; SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData;
SQueryDesc *pQdesc = (SQueryDesc*)(pMsg + sizeof(SQqueryList));
// We extract the lock to tscBuildHeartBeatMsg function. // We extract the lock to tscBuildHeartBeatMsg function.
/* pthread_mutex_lock (&pObj->mutex); */
pMsg += sizeof(SQqueryList);
SSqlObj *pSql = pObj->sqlList; SSqlObj *pSql = pObj->sqlList;
while (pSql) { while (pSql) {
/* /*
...@@ -240,47 +241,46 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { ...@@ -240,47 +241,46 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
strncpy(pQdesc->sql, pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); strncpy(pQdesc->sql, pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
pQdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0; pQdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0;
pQdesc->stime = pSql->stime; pQdesc->stime = htobe64(pSql->stime);
pQdesc->queryId = pSql->queryId; pQdesc->queryId = htonl(pSql->queryId);
pQdesc->useconds = pSql->res.useconds; pQdesc->useconds = htobe64(pSql->res.useconds);
pQList->numOfQueries++; pHeartbeat->numOfQueries++;
pQdesc++; pQdesc++;
pSql = pSql->next; pSql = pSql->next;
pMsg += sizeof(SQueryDesc); if (pHeartbeat->numOfQueries >= allocedQueriesNum) break;
if (pMsg > pMax) break;
} }
SStreamList *pSList = (SStreamList *)pMsg; pHeartbeat->numOfStreams = 0;
pSList->numOfStreams = 0; SStreamDesc *pSdesc = (SStreamDesc *)pQdesc;
SStreamDesc *pSdesc = (SStreamDesc*) (pMsg + sizeof(SStreamList));
pMsg += sizeof(SStreamList);
SSqlStream *pStream = pObj->streamList; SSqlStream *pStream = pObj->streamList;
while (pStream) { while (pStream) {
strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
pSdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0; pSdesc->sql[TSDB_SHOW_SQL_LEN - 1] = 0;
pSdesc->streamId = pStream->streamId; pSdesc->streamId = htonl(pStream->streamId);
pSdesc->num = pStream->num; pSdesc->num = htobe64(pStream->num);
pSdesc->useconds = pStream->useconds; pSdesc->useconds = htobe64(pStream->useconds);
pSdesc->stime = pStream->stime - pStream->interval; pSdesc->stime = htobe64(pStream->stime - pStream->interval);
pSdesc->ctime = pStream->ctime; pSdesc->ctime = htobe64(pStream->ctime);
pSdesc->slidingTime = pStream->slidingTime; pSdesc->slidingTime = htobe64(pStream->slidingTime);
pSdesc->interval = pStream->interval; pSdesc->interval = htobe64(pStream->interval);
pSList->numOfStreams++; pHeartbeat->numOfStreams++;
pSdesc++; pSdesc++;
pStream = pStream->next; pStream = pStream->next;
pMsg += sizeof(SStreamDesc); if (pHeartbeat->numOfStreams >= allocedStreamsNum) break;
if (pMsg > pMax) break;
} }
/* pthread_mutex_unlock (&pObj->mutex); */ int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) +
sizeof(SCMHeartBeatMsg);
pHeartbeat->connId = htonl(pObj->connId);
pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries);
pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams);
return pMsg; return msgLen;
} }
void tscKillConnection(STscObj *pObj) { void tscKillConnection(STscObj *pObj) {
......
...@@ -93,7 +93,7 @@ static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo ...@@ -93,7 +93,7 @@ static int32_t validateArithmeticSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo
static int32_t validateDNodeConfig(tDCLSQL* pOptions); static int32_t validateDNodeConfig(tDCLSQL* pOptions);
static int32_t validateLocalConfig(tDCLSQL* pOptions); static int32_t validateLocalConfig(tDCLSQL* pOptions);
static int32_t validateColumnName(char* name); static int32_t validateColumnName(char* name);
static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo); static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField); static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo); static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
...@@ -531,7 +531,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -531,7 +531,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
case TSDB_SQL_KILL_QUERY: case TSDB_SQL_KILL_QUERY:
case TSDB_SQL_KILL_STREAM: case TSDB_SQL_KILL_STREAM:
case TSDB_SQL_KILL_CONNECTION: { case TSDB_SQL_KILL_CONNECTION: {
if ((code = setKillInfo(pSql, pInfo)) != TSDB_CODE_SUCCESS) { if ((code = setKillInfo(pSql, pInfo, pInfo->type)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2229,37 +2229,45 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -2229,37 +2229,45 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
const char* msg1 = "invalid ip address"; const char* msg1 = "invalid connection ID";
const char* msg2 = "invalid port"; const char* msg2 = "invalid query ID";
const char* msg3 = "invalid stream ID";
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
pCmd->command = pInfo->type; pCmd->command = pInfo->type;
SSQLToken* ip = &(pInfo->pDCLInfo->ip); SSQLToken* idStr = &(pInfo->pDCLInfo->ip);
if (ip->n > TSDB_KILL_MSG_LEN) { if (idStr->n > TSDB_KILL_MSG_LEN) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
strncpy(pCmd->payload, ip->z, ip->n); strncpy(pCmd->payload, idStr->z, idStr->n);
const char delim = ':'; const char delim = ':';
char* connIdStr = strtok(idStr->z, &delim);
char* queryIdStr = strtok(NULL, &delim);
char* ipStr = strtok(ip->z, &delim); int32_t connId = (int32_t)strtol(connIdStr, NULL, 10);
char* portStr = strtok(NULL, &delim); if (connId <= 0) {
if (!validateIpAddress(ipStr, strlen(ipStr))) {
memset(pCmd->payload, 0, strlen(pCmd->payload)); memset(pCmd->payload, 0, strlen(pCmd->payload));
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
uint16_t port = (uint16_t)strtol(portStr, NULL, 10); if (killType == TSDB_SQL_KILL_CONNECTION) {
if (port <= 0 || port > 65535) { return TSDB_CODE_SUCCESS;
memset(pCmd->payload, 0, strlen(pCmd->payload));
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
if (queryId <= 0) {
memset(pCmd->payload, 0, strlen(pCmd->payload));
if (killType == TSDB_SQL_KILL_QUERY) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} else {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -114,6 +114,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -114,6 +114,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pIpList->numOfIps > 0) if (pIpList->numOfIps > 0)
tscSetMgmtIpList(pIpList); tscSetMgmtIpList(pIpList);
pSql->pTscObj->connId = htonl(pRsp->connId);
if (pRsp->killConnection) { if (pRsp->killConnection) {
tscKillConnection(pObj); tscKillConnection(pObj);
} else { } else {
...@@ -1129,13 +1131,6 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1129,13 +1131,6 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCMKillQueryMsg); pCmd->payloadLen = sizeof(SCMKillQueryMsg);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
SCMKillQueryMsg *pKill = (SCMKillQueryMsg*)pCmd->payload;
strncpy(pKill->queryId, pInfo->pDCLInfo->ip.z, pInfo->pDCLInfo->ip.n);
switch (pCmd->command) { switch (pCmd->command) {
case TSDB_SQL_KILL_QUERY: case TSDB_SQL_KILL_QUERY:
pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY; pCmd->msgType = TSDB_MSG_TYPE_CM_KILL_QUERY;
...@@ -1743,57 +1738,43 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1743,57 +1738,43 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) { int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int size = 0; SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
size += tsRpcHeadSize; pthread_mutex_lock(&pObj->mutex);
size += sizeof(SQqueryList);
int32_t numOfQueries = 2;
SSqlObj *tpSql = pObj->sqlList; SSqlObj *tpSql = pObj->sqlList;
while (tpSql) { while (tpSql) {
size += sizeof(SQueryDesc);
tpSql = tpSql->next; tpSql = tpSql->next;
numOfQueries++;
} }
size += sizeof(SStreamList); int32_t numOfStreams = 2;
SSqlStream *pStream = pObj->streamList; SSqlStream *pStream = pObj->streamList;
while (pStream) { while (pStream) {
size += sizeof(SStreamDesc);
pStream = pStream->next; pStream = pStream->next;
numOfStreams++;
} }
return size + TSDB_EXTRA_PAYLOAD_SIZE; int size = numOfQueries * sizeof(SQueryDesc) + numOfStreams * sizeof(SStreamDesc) + sizeof(SCMHeartBeatMsg) + 100;
}
int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg, *pStart;
int msgLen = 0;
int size = 0;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
pthread_mutex_lock(&pObj->mutex);
size = tscEstimateHeartBeatMsgLength(pSql);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
tscError("%p failed to malloc for heartbeat msg", pSql); tscError("%p failed to malloc for heartbeat msg", pSql);
return -1; return -1;
} }
pMsg = pCmd->payload; SCMHeartBeatMsg *pHeartbeat = (SCMHeartBeatMsg *)pCmd->payload;
pStart = pMsg; pHeartbeat->numOfQueries = numOfQueries;
pHeartbeat->numOfStreams = numOfStreams;
int msgLen = tscBuildQueryStreamDesc(pHeartbeat, pObj);
pMsg = tscBuildQueryStreamDesc(pMsg, pObj);
pthread_mutex_unlock(&pObj->mutex); pthread_mutex_unlock(&pObj->mutex);
msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT; pCmd->msgType = TSDB_MSG_TYPE_CM_HEARTBEAT;
assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2204,6 +2185,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2204,6 +2185,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
strcpy(pObj->sversion, pConnect->serverVersion); strcpy(pObj->sversion, pConnect->serverVersion);
pObj->writeAuth = pConnect->writeAuth; pObj->writeAuth = pConnect->writeAuth;
pObj->superAuth = pConnect->superAuth; pObj->superAuth = pConnect->superAuth;
pObj->connId = htonl(pConnect->connId);
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
return 0; return 0;
......
...@@ -46,7 +46,7 @@ typedef struct { ...@@ -46,7 +46,7 @@ typedef struct {
void (*cleanup)(); void (*cleanup)();
} SDnodeComponent; } SDnodeComponent;
static const SDnodeComponent SDnodeComponents[] = { static const SDnodeComponent tsDnodeComponents[] = {
{"storage", dnodeInitStorage, dnodeCleanupStorage}, {"storage", dnodeInitStorage, dnodeCleanupStorage},
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead},
{"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite},
...@@ -72,14 +72,14 @@ static int dnodeCreateDir(const char *dir) { ...@@ -72,14 +72,14 @@ static int dnodeCreateDir(const char *dir) {
static void dnodeCleanupComponents(int32_t stepId) { static void dnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) { for (int32_t i = stepId; i >= 0; i--) {
SDnodeComponents[i].cleanup(); tsDnodeComponents[i].cleanup();
} }
} }
static int32_t dnodeInitComponents() { static int32_t dnodeInitComponents() {
int32_t code = 0; int32_t code = 0;
for (int32_t i = 0; i < sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]); i++) { for (int32_t i = 0; i < sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]); i++) {
if (SDnodeComponents[i].init() != 0) { if (tsDnodeComponents[i].init() != 0) {
dnodeCleanupComponents(i); dnodeCleanupComponents(i);
code = -1; code = -1;
break; break;
...@@ -133,7 +133,7 @@ int32_t dnodeInitSystem() { ...@@ -133,7 +133,7 @@ int32_t dnodeInitSystem() {
void dnodeCleanUpSystem() { void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
dnodeCleanupComponents(sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]) - 1); dnodeCleanupComponents(sizeof(tsDnodeComponents) / sizeof(tsDnodeComponents[0]) - 1);
taos_cleanup(); taos_cleanup();
taosCloseLog(); taosCloseLog();
} }
......
...@@ -231,7 +231,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -231,7 +231,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_SHELL_VNODE_BITS 24 #define TSDB_SHELL_VNODE_BITS 24
#define TSDB_SHELL_SID_MASK 0xFF #define TSDB_SHELL_SID_MASK 0xFF
#define TSDB_HTTP_TOKEN_LEN 20 #define TSDB_HTTP_TOKEN_LEN 20
#define TSDB_SHOW_SQL_LEN 512 #define TSDB_SHOW_SQL_LEN 64
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_METER_STATE_OFFLINE 0 #define TSDB_METER_STATE_OFFLINE 0
#define TSDB_METER_STATE_ONLLINE 1 #define TSDB_METER_STATE_ONLLINE 1
......
...@@ -150,6 +150,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 0x0405, "no disk perm ...@@ -150,6 +150,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 0x0405, "no disk perm
TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 0x0406, "file corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 0x0406, "file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 0x0407, "memory corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 0x0407, "memory corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUCH_FILE_OR_DIR, 0, 0x0408, "no such file or directory") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUCH_FILE_OR_DIR, 0, 0x0408, "no such file or directory")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_SHELL_CONNS, 0, 0x0409, "too many shell conns")
// client // client
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 0x0481, "invalid client version") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 0x0481, "invalid client version")
......
...@@ -137,6 +137,7 @@ enum _mgmt_table { ...@@ -137,6 +137,7 @@ enum _mgmt_table {
TSDB_MGMT_TABLE_SCORES, TSDB_MGMT_TABLE_SCORES,
TSDB_MGMT_TABLE_GRANTS, TSDB_MGMT_TABLE_GRANTS,
TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_VNODES,
TSDB_MGMT_TABLE_STREAMTABLES,
TSDB_MGMT_TABLE_MAX, TSDB_MGMT_TABLE_MAX,
}; };
...@@ -299,6 +300,9 @@ typedef struct { ...@@ -299,6 +300,9 @@ typedef struct {
char serverVersion[TSDB_VERSION_LEN]; char serverVersion[TSDB_VERSION_LEN];
int8_t writeAuth; int8_t writeAuth;
int8_t superAuth; int8_t superAuth;
int8_t reserved1;
int8_t reserved2;
int32_t connId;
SRpcIpSet ipList; SRpcIpSet ipList;
} SCMConnectRsp; } SCMConnectRsp;
...@@ -716,16 +720,10 @@ typedef struct { ...@@ -716,16 +720,10 @@ typedef struct {
} SStreamDesc; } SStreamDesc;
typedef struct { typedef struct {
int32_t numOfQueries; uint32_t connId;
} SQqueryList; int32_t numOfQueries;
int32_t numOfStreams;
typedef struct { char pData[];
int32_t numOfStreams;
} SStreamList;
typedef struct {
SQqueryList qlist;
SStreamList slist;
} SCMHeartBeatMsg; } SCMHeartBeatMsg;
typedef struct { typedef struct {
...@@ -733,6 +731,7 @@ typedef struct { ...@@ -733,6 +731,7 @@ typedef struct {
uint32_t streamId; uint32_t streamId;
uint32_t totalDnodes; uint32_t totalDnodes;
uint32_t onlineDnodes; uint32_t onlineDnodes;
uint32_t connId;
int8_t killConnection; int8_t killConnection;
SRpcIpSet ipList; SRpcIpSet ipList;
} SCMHeartBeatRsp; } SCMHeartBeatRsp;
......
...@@ -148,8 +148,8 @@ ...@@ -148,8 +148,8 @@
#define TK_SET 130 #define TK_SET 130
#define TK_KILL 131 #define TK_KILL 131
#define TK_CONNECTION 132 #define TK_CONNECTION 132
#define TK_COLON 133 #define TK_STREAM 133
#define TK_STREAM 134 #define TK_COLON 134
#define TK_ABORT 135 #define TK_ABORT 135
#define TK_AFTER 136 #define TK_AFTER 136
#define TK_ATTACH 137 #define TK_ATTACH 137
......
...@@ -182,8 +182,6 @@ typedef struct SUserObj { ...@@ -182,8 +182,6 @@ typedef struct SUserObj {
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount; int32_t refCount;
struct SAcctObj * pAcct; struct SAcctObj * pAcct;
SQqueryList * pQList; // query list
SStreamList * pSList; // stream list
} SUserObj; } SUserObj;
typedef struct { typedef struct {
......
...@@ -21,9 +21,30 @@ extern "C" { ...@@ -21,9 +21,30 @@ extern "C" {
#endif #endif
#include "mnodeDef.h" #include "mnodeDef.h"
typedef struct {
char user[TSDB_USER_LEN + 1];
int8_t killed;
uint16_t port;
uint32_t ip;
uint32_t connId;
uint64_t stime;
uint64_t lastAccess;
uint32_t queryId;
uint32_t streamId;
int32_t numOfQueries;
int32_t numOfStreams;
SStreamDesc *pStreams;
SQueryDesc * pQueries;
} SConnObj;
int32_t mnodeInitProfile(); int32_t mnodeInitProfile();
void mnodeCleanupProfile(); void mnodeCleanupProfile();
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port);
SConnObj *mnodeAccquireConn(uint32_t connId, char *user, uint32_t ip, uint16_t port);
void mnodeReleaseConn(SConnObj *pConn);
int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SCMHeartBeatMsg *pHBMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,6 +28,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon ...@@ -28,6 +28,7 @@ typedef int32_t (*SShowMetaFp)(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn); typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp); void mnodeAddShowMetaHandle(uint8_t showType, SShowMetaFp fp);
void mnodeAddShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); void mnodeAddShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp);
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -33,14 +33,54 @@ ...@@ -33,14 +33,54 @@
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeTable.h" #include "mnodeTable.h"
#include "mnodeShow.h" #include "mnodeShow.h"
#include "mnodeProfile.h"
typedef struct {
const char *const name;
int (*init)();
void (*cleanup)();
} SMnodeComponent;
void *tsMnodeTmr; void *tsMnodeTmr;
static bool tsMgmtIsRunning = false; static bool tsMgmtIsRunning = false;
static const SMnodeComponent tsMnodeComponents[] = {
{"profile", mnodeInitProfile, mnodeCleanupProfile},
{"accts", mnodeInitAccts, mnodeCleanupAccts},
{"users", mnodeInitUsers, mnodeCleanupUsers},
{"dnodes", mnodeInitDnodes, mnodeCleanupDnodes},
{"dbs", mnodeInitDbs, mnodeCleanupDbs},
{"vgroups", mnodeInitVgroups, mnodeCleanupVgroups},
{"tables", mnodeInitTables, mnodeCleanupTables},
{"mnodes", mnodeInitMnodes, mnodeCleanupMnodes},
{"sdb", sdbInit, sdbCleanUp},
{"balance", balanceInit, balanceCleanUp},
{"grant", grantInit, grantCleanUp},
{"show", mnodeInitShow, mnodeCleanUpShow}
};
static void mnodeInitTimer(); static void mnodeInitTimer();
static void mnodeCleanupTimer(); static void mnodeCleanupTimer();
static bool mnodeNeedStart() ; static bool mnodeNeedStart() ;
static void mnodeCleanupComponents(int32_t stepId) {
for (int32_t i = stepId; i >= 0; i--) {
tsMnodeComponents[i].cleanup();
}
}
static int32_t mnodeInitComponents() {
int32_t code = 0;
for (int32_t i = 0; i < sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]); i++) {
if (tsMnodeComponents[i].init() != 0) {
mnodeCleanupComponents(i);
code = -1;
break;
}
}
return code;
}
int32_t mnodeStartSystem() { int32_t mnodeStartSystem() {
if (tsMgmtIsRunning) { if (tsMgmtIsRunning) {
mPrint("mnode module already started..."); mPrint("mnode module already started...");
...@@ -57,57 +97,7 @@ int32_t mnodeStartSystem() { ...@@ -57,57 +97,7 @@ int32_t mnodeStartSystem() {
dnodeAllocateMnodeRqueue(); dnodeAllocateMnodeRqueue();
dnodeAllocateMnodePqueue(); dnodeAllocateMnodePqueue();
if (mnodeInitAccts() < 0) { if (mnodeInitComponents() != 0) {
mError("failed to init accts");
return -1;
}
if (mnodeInitUsers() < 0) {
mError("failed to init users");
return -1;
}
if (mnodeInitDnodes() < 0) {
mError("failed to init dnodes");
return -1;
}
if (mnodeInitDbs() < 0) {
mError("failed to init dbs");
return -1;
}
if (mnodeInitVgroups() < 0) {
mError("failed to init vgroups");
return -1;
}
if (mnodeInitTables() < 0) {
mError("failed to init tables");
return -1;
}
if (mnodeInitMnodes() < 0) {
mError("failed to init mnodes");
return -1;
}
if (sdbInit() < 0) {
mError("failed to init sdb");
return -1;
}
if (balanceInit() < 0) {
mError("failed to init balance")
}
if (grantInit() < 0) {
mError("failed to init grant");
return -1;
}
if (mnodeInitShow() < 0) {
mError("failed to init show");
return -1; return -1;
} }
...@@ -115,7 +105,6 @@ int32_t mnodeStartSystem() { ...@@ -115,7 +105,6 @@ int32_t mnodeStartSystem() {
tsMgmtIsRunning = true; tsMgmtIsRunning = true;
mPrint("mnode is initialized successfully"); mPrint("mnode is initialized successfully");
return 0; return 0;
} }
...@@ -133,17 +122,8 @@ void mnodeCleanupSystem() { ...@@ -133,17 +122,8 @@ void mnodeCleanupSystem() {
dnodeFreeMnodeRqueue(); dnodeFreeMnodeRqueue();
dnodeFreeMnodePqueue(); dnodeFreeMnodePqueue();
mnodeCleanupTimer(); mnodeCleanupTimer();
mnodeCleanUpShow(); mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
grantCleanUp();
balanceCleanUp();
sdbCleanUp();
mnodeCleanupMnodes();
mnodeCleanupTables();
mnodeCleanupVgroups();
mnodeCleanupDbs();
mnodeCleanupDnodes();
mnodeCleanupUsers();
mnodeCleanupAccts();
mPrint("mnode is cleaned up"); mPrint("mnode is cleaned up");
} }
......
此差异已折叠。
...@@ -227,21 +227,47 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { ...@@ -227,21 +227,47 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
SCMHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
SRpcConnInfo connInfo;
rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
int32_t connId = htonl(pHBMsg->connId);
SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (pConn == NULL) {
pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort);
}
if (pConn == NULL) {
// do not close existing links, otherwise
// mError("failed to create connId, close connect");
// pHBRsp->killConnection = 1;
} else {
pHBRsp->connId = htonl(pConn->connId);
mnodeSaveQueryStreamList(pConn, pHBMsg);
if (pConn->killed != 0) {
pHBRsp->killConnection = 1;
}
if (pConn->streamId != 0) {
pHBRsp->streamId = htonl(pConn->streamId);
pConn->streamId = 0;
}
if (pConn->queryId != 0) {
pHBRsp->queryId = htonl(pConn->queryId);
pConn->queryId = 0;
}
}
pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum()); pHBRsp->onlineDnodes = htonl(mnodeGetOnlinDnodesNum());
pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum()); pHBRsp->totalDnodes = htonl(mnodeGetDnodesNum());
mnodeGetMnodeIpSetForShell(&pHBRsp->ipList); mnodeGetMnodeIpSetForShell(&pHBRsp->ipList);
/*
* TODO
* Dispose kill stream or kill query message
*/
pHBRsp->queryId = 0;
pHBRsp->streamId = 0;
pHBRsp->killConnection = 0;
pMsg->rpcRsp.rsp = pHBRsp; pMsg->rpcRsp.rsp = pHBRsp;
pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp); pMsg->rpcRsp.len = sizeof(SCMHeartBeatRsp);
mnodeReleaseConn(pConn);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -281,11 +307,19 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -281,11 +307,19 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
goto connect_over; goto connect_over;
} }
SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort);
if (pConn == NULL) {
code = terrno;
} else {
pConnectRsp->connId = htonl(pConn->connId);
mnodeReleaseConn(pConn);
}
sprintf(pConnectRsp->acctId, "%x", pAcct->acctId); sprintf(pConnectRsp->acctId, "%x", pAcct->acctId);
strcpy(pConnectRsp->serverVersion, version); strcpy(pConnectRsp->serverVersion, version);
pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->writeAuth = pUser->writeAuth;
pConnectRsp->superAuth = pUser->superAuth; pConnectRsp->superAuth = pUser->superAuth;
mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList); mnodeGetMnodeIpSetForShell(&pConnectRsp->ipList);
connect_over: connect_over:
...@@ -358,3 +392,11 @@ static void mnodeReleaseShowObj(void *pShow, bool forceRemove) { ...@@ -358,3 +392,11 @@ static void mnodeReleaseShowObj(void *pShow, bool forceRemove) {
mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false"); mTrace("%p, show is released, force:%s", pShow, forceRemove ? "true" : "false");
taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove); taosCacheRelease(tsMnodeShowCache, &pShow, forceRemove);
} }
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
if (rows < capacity) {
for (int32_t i = 0; i < numOfCols; ++i) {
memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
}
}
}
\ No newline at end of file
...@@ -61,8 +61,8 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -61,8 +61,8 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg); static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *mnodeMsg);
static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg); static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg);
...@@ -568,8 +568,8 @@ int32_t mnodeInitTables() { ...@@ -568,8 +568,8 @@ int32_t mnodeInitTables() {
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mnodeRetrieveShowTables); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mnodeRetrieveShowTables);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mnodeGetShowSuperTableMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_METRIC, mnodeGetShowSuperTableMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mnodeRetrieveShowSuperTables); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_METRIC, mnodeRetrieveShowSuperTables);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMS, mnodeGetStreamMeta); mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_STREAMTABLES, mnodeGetStreamTableMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMS, mnodeRetrieveStreams); mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_STREAMTABLES, mnodeRetrieveStreamTables);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2111,14 +2111,6 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void ...@@ -2111,14 +2111,6 @@ static int32_t mnodeGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
return 0; return 0;
} }
static void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow) {
if (rows < capacity) {
for (int32_t i = 0; i < numOfCols; ++i) {
memmove(data + pShow->offset[i] * rows, data + pShow->offset[i] * capacity, pShow->bytes[i] * rows);
}
}
}
static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db); SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0; if (pDb == NULL) return 0;
...@@ -2262,7 +2254,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) { ...@@ -2262,7 +2254,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
return code; return code;
} }
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mnodeGetStreamTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db); SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
...@@ -2308,7 +2300,7 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p ...@@ -2308,7 +2300,7 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
return 0; return 0;
} }
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) { static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
SDbObj *pDb = mnodeGetDb(pShow->db); SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0; if (pDb == NULL) return 0;
......
...@@ -183,7 +183,7 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { ...@@ -183,7 +183,7 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
snprintf(sql, SQL_LENGTH, snprintf(sql, SQL_LENGTH,
"create table if not exists %s.slowquery(ts timestamp, username " "create table if not exists %s.slowquery(ts timestamp, username "
"binary(%d), created_time timestamp, time bigint, sql binary(%d))", "binary(%d), created_time timestamp, time bigint, sql binary(%d))",
tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SHOW_SQL_LEN); tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SLOW_QUERY_SQL_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_LOG) { } else if (cmd == MONITOR_CMD_CREATE_TB_LOG) {
snprintf(sql, SQL_LENGTH, snprintf(sql, SQL_LENGTH,
"create table if not exists %s.log(ts timestamp, level tinyint, " "create table if not exists %s.log(ts timestamp, level tinyint, "
......
此差异已折叠。
...@@ -111,6 +111,15 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz ...@@ -111,6 +111,15 @@ void *taosCachePut(SCacheObj *pCacheObj, const char *key, const void *pData, siz
*/ */
void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key); void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key);
/**
* update the expire time of data in cache
* @param pCacheObj cache object
* @param key key
* @param expireTime new expire time of data
* @return
*/
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime);
/** /**
* Add one reference count for the exist data, and assign this data for a new owner. * Add one reference count for the exist data, and assign this data for a new owner.
* The new owner needs to invoke the taosCacheRelease when it does not need this data anymore. * The new owner needs to invoke the taosCacheRelease when it does not need this data anymore.
......
...@@ -488,6 +488,35 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) { ...@@ -488,6 +488,35 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) {
return (ptNode != NULL) ? (*ptNode)->data : NULL; return (ptNode != NULL) ? (*ptNode)->data : NULL;
} }
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, const char *key, uint64_t expireTime) {
if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) {
return NULL;
}
uint32_t keyLen = (uint32_t)strlen(key);
__cache_rd_lock(pCacheObj);
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
if (ptNode != NULL) {
T_REF_INC(*ptNode);
(*ptNode)->expiredTime = expireTime;
}
__cache_unlock(pCacheObj);
if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uTrace("key:%s expireTime is updated in cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode));
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uTrace("key:%s not in cache, retrieved failed", key);
}
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? (*ptNode)->data : NULL;
}
void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
if (pCacheObj == NULL || data == NULL) return NULL; if (pCacheObj == NULL || data == NULL) return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册