未验证 提交 7be9936c 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #1254 from taosdata/refact/slguan

Refact/slguan
......@@ -324,14 +324,12 @@ typedef struct _sql_obj {
short vnode;
int64_t stime;
uint32_t queryId;
void * thandle;
SRpcIpSet ipSet;
void * pStream;
void * pSubscription;
char * sqlstr;
char retry;
char maxRetry;
uint8_t index;
SRpcIpSet *ipList;
char freed : 4;
char listed : 4;
tsem_t rspSem;
......@@ -373,14 +371,16 @@ typedef struct _sstream {
struct _sstream *prev, *next;
} SSqlStream;
int32_t tscInitRpc(const char *user, const char *secret);
// tscSql API
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgs();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle);
int tscProcessSql(SSqlObj *pSql);
void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int tscProcessSql(SSqlObj *pSql);
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
......
......@@ -318,13 +318,6 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
int cmd = pCmd->command;
int code = pRes->code ? -pRes->code : pRes->numOfRows;
if ((tscKeepConn[cmd] == 0 || (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS)) &&
pSql->pStream == NULL) {
if (pSql->thandle) taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pTscObj->user);
pSql->thandle = NULL;
}
// in case of async insert, restore the user specified callback function
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
......@@ -454,8 +447,8 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p failed to renew meterMeta", pSql);
tsem_post(&pSql->rspSem);
} else {
tscTrace("%p renew meterMeta successfully, command:%d, code:%d, thandle:%p, retry:%d",
pSql, pSql->cmd.command, pSql->res.code, pSql->thandle, pSql->retry);
tscTrace("%p renew meterMeta successfully, command:%d, code:%d, retry:%d",
pSql, pSql->cmd.command, pSql->res.code, pSql->retry);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
assert(pMeterMetaInfo->pMeterMeta == NULL);
......
......@@ -451,7 +451,6 @@ static int insertStmtExecute(STscStmt* stmt) {
pRes->numOfTotalInCurrentClause = 0;
pRes->qhandle = 0;
pSql->thandle = NULL;
tscDoQuery(pSql);
......
......@@ -145,7 +145,7 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) {
if (pSql == NULL) return;
tscTrace("%p query is killed, queryId:%d thandle:%p", pSql, killId, pSql->thandle);
tscTrace("%p query is killed, queryId:%d", pSql, killId);
taos_stop_query(pSql);
}
......@@ -209,16 +209,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
}
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
SQList *pQList = (SQList *)pMsg;
SCMQqueryList *pQList = (SCMQqueryList *)pMsg;
char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256;
SQDesc *pQdesc = pQList->qdesc;
SCMQueryDesc *pQdesc = pQList->qdesc;
pQList->numOfQueries = 0;
// We extract the lock to tscBuildHeartBeatMsg function.
/* pthread_mutex_lock (&pObj->mutex); */
pMsg += sizeof(SQList);
pMsg += sizeof(SCMQqueryList);
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
/*
......@@ -239,15 +239,15 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
pQList->numOfQueries++;
pQdesc++;
pSql = pSql->next;
pMsg += sizeof(SQDesc);
pMsg += sizeof(SCMQueryDesc);
if (pMsg > pMax) break;
}
SSList *pSList = (SSList *)pMsg;
SSDesc *pSdesc = pSList->sdesc;
SCMStreamList *pSList = (SCMStreamList *)pMsg;
SCMStreamDesc *pSdesc = pSList->sdesc;
pSList->numOfStreams = 0;
pMsg += sizeof(SSList);
pMsg += sizeof(SCMStreamList);
SSqlStream *pStream = pObj->streamList;
while (pStream) {
strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
......@@ -265,7 +265,7 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
pSList->numOfStreams++;
pSdesc++;
pStream = pStream->next;
pMsg += sizeof(SSDesc);
pMsg += sizeof(SCMStreamDesc);
if (pMsg > pMax) break;
}
......
此差异已折叠。
......@@ -36,11 +36,6 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
taos_init();
if (pTscMgmtConn == NULL || pVnodeConn == NULL) {
globalCode = TSDB_CODE_APP_ERROR;
return NULL;
}
if (user == NULL) {
globalCode = TSDB_CODE_INVALID_ACCT;
return NULL;
......@@ -63,15 +58,30 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
}
}
if (tscInitRpc(user, pass) != 0) {
globalCode = TSDB_CODE_NETWORK_UNAVAIL;
return NULL;
}
if (ip && ip[0]) {
tscMgmtIpList.numOfIps = 3;
tscMgmtIpList.ip[0] = inet_addr(ip);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
tscMgmtIpList.index = 0;
tscMgmtIpList.port = tsMgmtShellPort;
tscMgmtIpList.numOfIps = 1;
tscMgmtIpList.ip[0] = inet_addr(ip);
if (tsMasterIp[0] && strcmp(ip, tsMasterIp) != 0) {
tscMgmtIpList.numOfIps = 2;
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
}
if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
tscMgmtIpList.numOfIps = 3;
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
}
}
tscMgmtIpList.port = port ? port : tsMgmtShellPort;
pObj = (STscObj *)malloc(sizeof(STscObj));
if (NULL == pObj) {
globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY;
......@@ -208,7 +218,6 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/
pRes->qhandle = 0;
pSql->thandle = NULL;
if (pRes->code == TSDB_CODE_SUCCESS) {
tscDoQuery(pSql);
......@@ -713,7 +722,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
/* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
if (pSql->fp != NULL) {
pSql->thandle = NULL;
tscFreeSqlObj(pSql);
tscTrace("%p Async SqlObj is freed by app", pSql);
} else if (keepCmd) {
......@@ -774,7 +782,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
*
* Then this object will be reused and no free operation is required.
*/
pSql->thandle = NULL;
if (keepCmd) {
tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed by app while sql command is kept", pSql);
......@@ -785,7 +792,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
}
} else {
// if no free resource msg is sent to vnode, we free this object immediately.
pSql->thandle = NULL;
if (pSql->fp) {
assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL));
......@@ -899,11 +905,6 @@ void taos_stop_query(TAOS_RES *res) {
return;
}
if (pSql->thandle == NULL) {
tscTrace("%p no connection, abort cancel", res);
return;
}
//taosStopRpcConn(pSql->thandle);
tscTrace("%p query is cancelled", res);
}
......@@ -1147,7 +1148,6 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/
pRes->qhandle = 0;
pSql->thandle = NULL;
free(str);
if (pRes->code != TSDB_CODE_SUCCESS) {
......
......@@ -382,7 +382,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pRes->numOfRows = 1;
pRes->numOfTotal = 0;
pRes->qhandle = 0;
pSql->thandle = NULL;
pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type;
......
......@@ -45,6 +45,7 @@ int tsInsertHeadSize;
extern int tscEmbedded;
int tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tscMutex;
extern int tsTscEnableRecordSql;
extern int tsNumOfLogLines;
......@@ -56,11 +57,64 @@ void tscCheckDiskUsage(void *para, void *unused) {
taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
}
int32_t tscInitRpc(const char *user, const char *secret) {
SRpcInit rpcInit;
char secretEncrypt[32] = {0};
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
pthread_mutex_lock(&tscMutex);
if (pVnodeConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-vnode";
rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxVnodeConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = user;
rpcInit.ckey = "key";
rpcInit.secret = secretEncrypt;
pVnodeConn = rpcOpen(&rpcInit);
if (pVnodeConn == NULL) {
tscError("failed to init connection to vnode");
pthread_mutex_unlock(&tscMutex);
return -1;
}
}
if (pTscMgmtConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxMgmtConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 2000;
rpcInit.user = "root";
rpcInit.ckey = "key";
rpcInit.secret = secretEncrypt;
pTscMgmtConn = rpcOpen(&rpcInit);
if (pTscMgmtConn == NULL) {
tscError("failed to init connection to mgmt");
pthread_mutex_unlock(&tscMutex);
return -1;
}
}
pthread_mutex_unlock(&tscMutex);
return 0;
}
void taos_init_imp() {
char temp[128];
struct stat dirstat;
SRpcInit rpcInit;
pthread_mutex_init(&tscMutex, NULL);
srand(taosGetTimestampSec());
deltaToUtcInitOnce();
......@@ -101,7 +155,7 @@ void taos_init_imp() {
tscMgmtIpList.numOfIps = 1;
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
if (tsSecondIp[0]) {
if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
tscMgmtIpList.numOfIps = 2;
tscMgmtIpList.ip[1] = inet_addr(tsSecondIp);
}
......@@ -124,34 +178,6 @@ void taos_init_imp() {
return;
}
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-vnode";
rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.afp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxVnodeConnections;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
pVnodeConn = rpcOpen(&rpcInit);
if (pVnodeConn == NULL) {
tscError("failed to init connection to vnode");
return;
}
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1;
rpcInit.afp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxMgmtConnections;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
pTscMgmtConn = rpcOpen(&rpcInit);
if (pTscMgmtConn == NULL) {
tscError("failed to init connection to mgmt");
return;
}
tscTmr = taosTmrInit(tsMaxMgmtConnections * 2, 200, 60000, "TSC");
if(0 == tscEmbedded){
taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);
......@@ -319,10 +345,10 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
assert(cfg != NULL);
if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) {
tscError("only 'tcp' or 'udp' allowed for configuring the socket type");
return -1;
}
// if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) {
// tscError("only 'tcp' or 'udp' allowed for configuring the socket type");
// return -1;
// }
strncpy(tsSocketType, pStr, tListLen(tsSocketType));
cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
......
......@@ -121,10 +121,6 @@ void dnodeStartModulesImp() {
}
}
}
if (tsModule[TSDB_MOD_MGMT].num != 0 && tsModule[TSDB_MOD_MGMT].cleanUpFp) {
(*tsModule[TSDB_MOD_MGMT].cleanUpFp)();
}
}
void (*dnodeStartModules)() = dnodeStartModulesImp;
......@@ -85,7 +85,7 @@ int32_t dnodeInitShell() {
rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dnodeProcessMsgFromShell;
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 2000;
tsDnodeShellServer = rpcOpen(&rpcInit);
......
......@@ -244,6 +244,8 @@ typedef struct _db_obj {
void * vgTimer;
} SDbObj;
struct _acctObj;
typedef struct _user_obj {
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN];
......@@ -254,6 +256,9 @@ typedef struct _user_obj {
char reserved[16];
char updateEnd[1];
struct _user_obj *prev, *next;
struct _acctObj * pAcct;
SCMQqueryList * pQList; // query list
SCMStreamList * pSList; // stream list
} SUserObj;
typedef struct {
......@@ -274,7 +279,7 @@ typedef struct {
char accessState; // Checked by mgmt heartbeat message
} SAcctInfo;
typedef struct {
typedef struct _acctObj {
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN];
SAcctCfg cfg;
......@@ -290,28 +295,6 @@ typedef struct {
pthread_mutex_t mutex;
} SAcctObj;
typedef struct _connObj {
SAcctObj * pAcct;
SDbObj * pDb;
SUserObj * pUser;
char user[TSDB_USER_LEN];
uint64_t stime; // login time
char superAuth : 1; // super user flag
char writeAuth : 1; // write flag
char killConnection : 1; // kill the connection flag
uint8_t usePublicIp : 1; // if the connection request is publicIp
uint8_t reserved : 4;
uint32_t queryId; // query ID to be killed
uint32_t streamId; // stream ID to be killed
uint32_t ip; // shell IP
uint16_t port; // shell port
void * thandle;
SQList * pQList; // query list
SSList * pSList; // stream list
uint64_t qhandle;
struct _connObj *prev, *next;
} SConnObj;
typedef struct {
char spi;
char encrypt;
......
......@@ -37,8 +37,8 @@ extern int sdbExtConns;
extern int sdbMaster;
extern uint32_t sdbPublicIp;
extern uint32_t sdbMasterStartTime;
extern SIpList *pSdbIpList;
extern SIpList *pSdbPublicIpList;
extern SRpcIpSet *pSdbIpList;
extern SRpcIpSet *pSdbPublicIpList;
extern void (*sdbWorkAsMasterCallback)(); // this function pointer will be set by taosd
......@@ -71,8 +71,6 @@ enum _sdbaction {
SDB_MAX_ACTION_TYPES
};
#ifdef CLUSTER
#define SDB_MAX_PEERS 4
typedef struct {
uint32_t ip;
......@@ -103,8 +101,6 @@ extern SSdbPeer *sdbPeer[];
#define sdbInited (sdbPeer[0])
#define sdbStatus (sdbPeer[0]->status)
#endif
void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory,
void *(*appTool)(char, void *, char *, int, int *));
......@@ -138,6 +134,7 @@ int sdbCfgNode(char *cont);
int64_t sdbGetVersion();
int32_t sdbGetRunStatus();
#define TSDB_MAX_TABLES 1000
extern void* tsChildTableSdb;
......
......@@ -26,6 +26,7 @@ extern "C" {
#include "taosdef.h"
#include "taoserror.h"
#include "taosdef.h"
#include "trpc.h"
// message type
#define TSDB_MSG_TYPE_REG 1
......@@ -187,25 +188,8 @@ typedef enum {
extern char *taosMsg[];
#define TSDB_MSG_DEF_MAX_MPEERS 5
#define TSDB_MSG_DEF_VERSION_LEN 64
#define TSDB_MSG_DEF_DB_LEN 128
#define TSDB_MSG_DEF_USER_LEN 128
#define TSDB_MSG_DEF_TABLE_LEN 128
#define TSDB_MSG_DEF_ACCT_LEN 128
#pragma pack(push, 1)
typedef struct {
char numOfIps;
uint32_t ip[];
} SIpList;
typedef struct {
char numOfIps;
uint32_t ip[TSDB_MAX_MGMT_IPS];
} SMgmtIpList;
typedef struct {
uint32_t customerId;
uint32_t osId;
......@@ -332,20 +316,17 @@ typedef struct {
} SAlterTableMsg;
typedef struct {
char clientVersion[TSDB_MSG_DEF_VERSION_LEN];
char msgVersion[TSDB_MSG_DEF_VERSION_LEN];
char db[TSDB_MSG_DEF_DB_LEN];
char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_ID_LEN];
} SCMConnectMsg;
typedef struct {
char acctId[TSDB_MSG_DEF_ACCT_LEN];
char serverVersion[TSDB_MSG_DEF_VERSION_LEN];
int8_t writeAuth;
int8_t superAuth;
int16_t index;
int16_t numOfIps;
uint16_t port;
uint32_t ip[TSDB_MSG_DEF_MAX_MPEERS];
char acctId[TSDB_ACCT_LEN];
char serverVersion[TSDB_VERSION_LEN];
int8_t writeAuth;
int8_t superAuth;
SRpcIpSet ipList;
} SCMConnectRsp;
typedef struct {
......@@ -799,19 +780,12 @@ typedef struct {
char config[60];
} SCfgMsg;
typedef struct {
uint32_t queryId;
uint32_t streamId;
char killConnection;
SIpList ipList;
} SHeartBeatRsp;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
uint32_t queryId;
int64_t useconds;
int64_t stime;
} SQDesc;
} SCMQueryDesc;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
......@@ -822,17 +796,29 @@ typedef struct {
int64_t stime;
int64_t slidingTime;
int64_t interval;
} SSDesc;
} SCMStreamDesc;
typedef struct {
int32_t numOfQueries;
SQDesc qdesc[];
} SQList;
SCMQueryDesc qdesc[];
} SCMQqueryList;
typedef struct {
int32_t numOfStreams;
SSDesc sdesc[];
} SSList;
SCMStreamDesc sdesc[];
} SCMStreamList;
typedef struct {
SCMQqueryList qlist;
SCMStreamList slist;
} SCMHeartBeatMsg;
typedef struct {
uint32_t queryId;
uint32_t streamId;
int8_t killConnection;
SRpcIpSet ipList;
} SCMHeartBeatRsp;
typedef struct {
uint64_t handle;
......
......@@ -39,7 +39,7 @@ typedef struct {
uint32_t clientIp;
uint16_t clientPort;
uint32_t serverIp;
char *user;
char user[TSDB_USER_LEN];
} SRpcConnInfo;
typedef struct {
......
......@@ -800,6 +800,8 @@ void source_file(TAOS *con, char *fptr) {
}
void shellGetGrantInfo(void *con) {
return;
char sql[] = "show grants";
int code = taos_query(con, sql);
......
......@@ -81,7 +81,7 @@ struct arguments args = {
*/
int main(int argc, char* argv[]) {
/*setlocale(LC_ALL, "en_US.UTF-8"); */
if (!checkVersion()) {
exit(EXIT_FAILURE);
}
......
......@@ -34,8 +34,8 @@ extern int32_t (*mgmtCheckDbLimit)(SAcctObj *pAcct);
extern int32_t (*mgmtCheckTableLimit)(SAcctObj *pAcct, SCreateTableMsg *pCreate);
extern void (*mgmtCheckAcct)();
extern void (*mgmtCleanUpAccts)();
extern int32_t (*mgmtGetAcctMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
extern int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
extern int32_t (*mgmtGetAcctMeta)(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
extern int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@ extern "C" {
#include "mnode.h"
int mgmtGetConnsMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int mgmtGetConnsMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn);
#ifdef __cplusplus
}
......
......@@ -24,14 +24,13 @@ extern "C" {
void mgmtMonitorDbDrop(void *unused, void *unusedt);
int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter);
int32_t mgmtUseDb(SConnObj *pConn, char *name);
int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup);
int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtCleanUpDbs();
int32_t mgmtInitDbs();
......
......@@ -30,19 +30,19 @@ int32_t mgmtDropDnodeByIp(uint32_t ip);
int32_t mgmtGetNextVnode(SVnodeGid *pVnodeGid);
void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId);
void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes);
int32_t mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtSendCfgDnodeMsg(char *cont);
void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode);
int32_t mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
extern int32_t (*mgmtInitDnodes)();
extern void (*mgmtCleanUpDnodes)();
......@@ -51,8 +51,8 @@ extern int32_t (*mgmtGetDnodesNum)();
extern void* (*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode);
extern int32_t (*mgmtUpdateDnode)(SDnodeObj *pDnode);
extern void (*mgmtSetDnodeUnRemove)(SDnodeObj *pDnode);
extern int32_t (*mgmtGetScoresMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
extern int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
extern int32_t (*mgmtGetScoresMeta)(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
extern int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
extern bool (*mgmtCheckConfigShow)(SGlobalConfig *cfg);
extern SDnodeObj tsDnodeObj;
......
......@@ -30,8 +30,8 @@ extern void (*mgmtRestoreTimeSeries)(uint32_t timeseries);
extern int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries);
extern int32_t (*mgmtCheckUserGrant)();
extern int32_t (*mgmtCheckDbGrant)();
extern int32_t (*mgmtGetGrantsMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
extern int32_t (*mgmtRetrieveGrants)(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
extern int32_t (*mgmtGetGrantsMeta)(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
extern int32_t (*mgmtRetrieveGrants)(SShowObj *pShow, char *data, int rows, void *pConn);
#ifdef __cplusplus
}
......
......@@ -24,8 +24,8 @@ extern "C" {
#include <stdbool.h>
#include "mnode.h"
extern int32_t (*mgmtGetMnodeMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
extern int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
extern int32_t (*mgmtGetMnodeMeta)(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
extern int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, void *pConn);
#ifdef __cplusplus
}
......
......@@ -22,21 +22,21 @@ extern "C" {
#include "mnode.h"
int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int mgmtRetrieveQueries(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int mgmtRetrieveStreams(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int mgmtSaveQueryStreamList(char *cont, int contLen, SConnObj *pConn);
int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg);
int mgmtKillQuery(char *qidstr, SConnObj *pConn);
int32_t mgmtKillQuery(char *qidstr, void *pConn);
int mgmtKillStream(char *qidstr, SConnObj *pConn);
int32_t mgmtKillStream(char *qidstr, void *pConn);
int mgmtKillConnection(char *qidstr, SConnObj *pConn);
int32_t mgmtKillConnection(char *qidstr, void *pConn);
#ifdef __cplusplus
}
......
......@@ -27,14 +27,14 @@ extern "C" {
int32_t mgmtInitShell();
void mgmtCleanUpShell();
extern int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType);
extern int32_t (*mgmtProcessAlterAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn);
extern int32_t (*mgmtProcessCreateDnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn);
extern int32_t (*mgmtProcessCfgMnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn);
extern int32_t (*mgmtProcessDropMnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn);
extern int32_t (*mgmtProcessDropDnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn);
extern int32_t (*mgmtProcessDropAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn);
extern int32_t (*mgmtProcessCreateAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn);
extern int32_t (*mgmtCheckRedirectMsg)(void *pConn, int32_t msgType);
extern int32_t (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
#ifdef __cplusplus
}
......
......@@ -29,18 +29,18 @@ int32_t mgmtInitTables();
STableInfo* mgmtGetTable(char *tableId);
STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate);
int32_t mgmtDropTable(SDbObj *pDb, char *meterId, int32_t ignore);
int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtCleanUpMeters();
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......
......@@ -29,9 +29,10 @@ SUserObj *mgmtGetUser(char *name);
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
int32_t mgmtDropUser(SAcctObj *pAcct, char *name);
int32_t mgmtUpdateUser(SUserObj *pUser);
int32_t mgmtGetUserMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetUserMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtCleanUpUsers();
SUserObj *mgmtGetUserFromConn(void *pConn);
#ifdef __cplusplus
}
......
......@@ -29,8 +29,8 @@ SVgObj *mgmtGetVgroup(int32_t vgId);
SVgObj *mgmtCreateVgroup(SDbObj *pDb);
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup);
void mgmtSetVgroupIdPool();
int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtCleanUpVgroups();
SVgObj *mgmtGetAvailVgroup(SDbObj *pDb);
......
......@@ -153,14 +153,14 @@ void mgmtCleanUpAcctsImp() {
void (*mgmtCleanUpAccts)() = mgmtCleanUpAcctsImp;
int32_t mgmtGetAcctMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t mgmtGetAcctMetaImp(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
int32_t (*mgmtGetAcctMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = mgmtGetAcctMetaImp;
int32_t (*mgmtGetAcctMeta)(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetAcctMetaImp;
int32_t mgmtRetrieveAcctsImp(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveAcctsImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
return 0;
}
int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) = mgmtRetrieveAcctsImp;
int32_t (*mgmtRetrieveAccts)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveAcctsImp;
......@@ -32,42 +32,42 @@ typedef struct {
SConnInfo connInfo[];
} SConnShow;
int mgmtGetConns(SShowObj *pShow, SConnObj *pConn) {
SAcctObj * pAcct = pConn->pAcct;
SConnShow *pConnShow;
pthread_mutex_lock(&pAcct->mutex);
pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow));
pConnShow->index = 0;
pConnShow->numOfConns = 0;
if (pAcct->acctInfo.numOfConns > 0) {
pConn = pAcct->pConn;
SConnInfo *pConnInfo = pConnShow->connInfo;
while (pConn && pConn->pUser) {
strcpy(pConnInfo->user, pConn->pUser->user);
pConnInfo->ip = pConn->ip;
pConnInfo->port = pConn->port;
pConnInfo->stime = pConn->stime;
pConnShow->numOfConns++;
pConnInfo++;
pConn = pConn->next;
}
}
pthread_mutex_unlock(&pAcct->mutex);
// sorting based on useconds
pShow->pNode = pConnShow;
int mgmtGetConns(SShowObj *pShow, void *pConn) {
// SAcctObj * pAcct = pConn->pAcct;
// SConnShow *pConnShow;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow));
// pConnShow->index = 0;
// pConnShow->numOfConns = 0;
//
// if (pAcct->acctInfo.numOfConns > 0) {
// pConn = pAcct->pConn;
// SConnInfo *pConnInfo = pConnShow->connInfo;
//
// while (pConn && pConn->pUser) {
// strcpy(pConnInfo->user, pConn->pUser->user);
// pConnInfo->ip = pConn->ip;
// pConnInfo->port = pConn->port;
// pConnInfo->stime = pConn->stime;
//
// pConnShow->numOfConns++;
// pConnInfo++;
// pConn = pConn->next;
// }
// }
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// // sorting based on useconds
//
// pShow->pNode = pConnShow;
return 0;
}
int mgmtGetConnsMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int mgmtGetConnsMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
int cols = 0;
pShow->bytes[cols] = TSDB_METER_NAME_LEN;
......@@ -104,7 +104,7 @@ int mgmtGetConnsMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn) {
int numOfRows = 0;
char *pWrite;
int cols = 0;
......
......@@ -26,6 +26,7 @@
#include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
extern void *tsVgroupSdb;
......@@ -472,20 +473,6 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
return code;
}
int32_t mgmtUseDb(SConnObj *pConn, char *name) {
SDbObj *pDb;
int32_t code = TSDB_CODE_INVALID_DB;
// here change the default db for connect.
pDb = mgmtGetDb(name);
if (pDb) {
pConn->pDb = pDb;
code = 0;
}
return code;
}
int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) {
pVgroup->next = pDb->pHead;
pVgroup->prev = NULL;
......@@ -540,10 +527,12 @@ void mgmtCleanUpDbs() {
sdbCloseTable(tsDbSdb);
}
int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta);
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
pShow->bytes[cols] = TSDB_DB_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
......@@ -564,7 +553,7 @@ int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
cols++;
#ifndef __CLOUD_VERSION__
if (strcmp(pConn->pAcct->user, "root") == 0) {
if (strcmp(pUser->user, "root") == 0) {
#endif
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......@@ -576,7 +565,7 @@ int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
#endif
#ifndef __CLOUD_VERSION__
if (strcmp(pConn->pAcct->user, "root") == 0) {
if (strcmp(pUser->user, "root") == 0) {
#endif
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
......@@ -600,7 +589,7 @@ int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
cols++;
#ifndef __CLOUD_VERSION__
if (strcmp(pConn->pAcct->user, "root") == 0) {
if (strcmp(pUser->user, "root") == 0) {
#endif
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
......@@ -675,8 +664,8 @@ int32_t mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->numOfRows = pConn->pAcct->acctInfo.numOfDbs;
pShow->pNode = pConn->pAcct->pHead;
pShow->numOfRows = pUser->pAcct->acctInfo.numOfDbs;
pShow->pNode = pUser->pAcct->pHead;
return 0;
}
......@@ -687,18 +676,20 @@ char *mgmtGetDbStr(char *src) {
return ++pos;
}
int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDbObj *pDb = NULL;
char * pWrite;
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
while (numOfRows < rows) {
pDb = (SDbObj *)pShow->pNode;
if (pDb == NULL) break;
pShow->pNode = (void *)pDb->next;
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && strcmp(pConn->pUser->user, "monitor") != 0 ) {
if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && strcmp(pUser->user, "monitor") != 0 ) {
continue;
}
}
......@@ -718,7 +709,7 @@ int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pCo
cols++;
#ifndef __CLOUD_VERSION__
if (strcmp(pConn->pAcct->user, "root") == 0) {
if (strcmp(pUser->user, "root") == 0) {
#endif
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->numOfVgroups;
......@@ -728,7 +719,7 @@ int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pCo
#endif
#ifndef __CLOUD_VERSION__
if (strcmp(pConn->pAcct->user, "root") == 0) {
if (strcmp(pUser->user, "root") == 0) {
#endif
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDb->cfg.replications;
......@@ -746,7 +737,7 @@ int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pCo
cols++;
#ifndef __CLOUD_VERSION__
if (strcmp(pConn->pAcct->user, "root") == 0) {
if (strcmp(pUser->user, "root") == 0) {
#endif
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.maxSessions - 1; // table num can be created should minus 1
......
......@@ -21,6 +21,7 @@
#include "mnode.h"
#include "mgmtDnode.h"
#include "mgmtBalance.h"
#include "mgmtUser.h"
SDnodeObj tsDnodeObj;
......@@ -96,10 +97,13 @@ void mgmtUnSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes) {
}
}
int32_t mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = tsGetSchema(pMeta);
......@@ -158,7 +162,7 @@ int32_t mgmtGetDnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char *pWrite;
......@@ -208,10 +212,13 @@ int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *
return numOfRows;
}
int32_t mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = tsGetSchema(pMeta);
......@@ -259,7 +266,7 @@ int32_t mgmtGetModuleMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
......@@ -298,10 +305,13 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, SConnObj
return numOfRows;
}
int32_t mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = tsGetSchema(pMeta);
......@@ -336,7 +346,7 @@ int32_t mgmtGetConfigMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) {
......@@ -383,10 +393,11 @@ int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, SConnObj
return numOfRows;
}
int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
if (strcmp(pConn->pAcct->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = tsGetSchema(pMeta);
......@@ -456,7 +467,7 @@ int32_t mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
......@@ -560,17 +571,17 @@ void *mgmtGetNextDnodeImp(SShowObj *pShow, SDnodeObj **pDnode) {
void *(*mgmtGetNextDnode)(SShowObj *pShow, SDnodeObj **pDnode) = mgmtGetNextDnodeImp;
int32_t mgmtGetScoresMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t mgmtGetScoresMetaImp(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
int32_t (*mgmtGetScoresMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = mgmtGetScoresMetaImp;
int32_t (*mgmtGetScoresMeta)(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetScoresMetaImp;
int32_t mgmtRetrieveScoresImp(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveScoresImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
return 0;
}
int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) = mgmtRetrieveScoresImp;
int32_t (*mgmtRetrieveScores)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveScoresImp;
void mgmtSetDnodeUnRemoveImp(SDnodeObj *pDnode) {
}
......
......@@ -37,9 +37,9 @@ int32_t (*mgmtCheckTimeSeries)(uint32_t timeseries) = mgmtCheckTimeSeriesImp;
bool mgmtCheckExpiredImp() { return false; }
bool (*mgmtCheckExpired)() = mgmtCheckExpiredImp;
int32_t mgmtGetGrantsMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return TSDB_CODE_OPS_NOT_SUPPORT; }
int32_t (*mgmtGetGrantsMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = mgmtGetGrantsMetaImp;
int32_t mgmtGetGrantsMetaImp(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) { return TSDB_CODE_OPS_NOT_SUPPORT; }
int32_t (*mgmtGetGrantsMeta)(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetGrantsMetaImp;
int32_t mgmtRetrieveGrantsImp(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { return 0; }
int32_t (*mgmtRetrieveGrants)(SShowObj *pShow, char *data, int rows, SConnObj *pConn) = mgmtRetrieveGrantsImp;
int32_t mgmtRetrieveGrantsImp(SShowObj *pShow, char *data, int rows, void *pConn) { return 0; }
int32_t (*mgmtRetrieveGrants)(SShowObj *pShow, char *data, int rows, void *pConn) = mgmtRetrieveGrantsImp;
......@@ -16,14 +16,14 @@
#define _DEFAULT_SOURCE
#include "mgmtMnode.h"
int32_t mgmtGetMnodeMetaImp(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t mgmtGetMnodeMetaImp(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
return TSDB_CODE_OPS_NOT_SUPPORT;
}
int32_t (*mgmtGetMnodeMeta)(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = mgmtGetMnodeMetaImp;
int32_t (*mgmtGetMnodeMeta)(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) = mgmtGetMnodeMetaImp;
int32_t mgmtRetrieveMnodesImp(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveMnodesImp(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
return 0;
}
int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) = mgmtRetrieveMnodesImp;
int32_t (*mgmtRetrieveMnodes)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = mgmtRetrieveMnodesImp;
......@@ -28,97 +28,97 @@ typedef struct {
} SCDesc;
typedef struct {
int index;
int numOfQueries;
int32_t index;
int32_t numOfQueries;
SCDesc * connInfo;
SCDesc **cdesc;
SQDesc qdesc[];
SCMQueryDesc qdesc[];
} SQueryShow;
typedef struct {
int index;
int numOfStreams;
int32_t index;
int32_t numOfStreams;
SCDesc * connInfo;
SCDesc **cdesc;
SSDesc sdesc[];
SCMStreamDesc sdesc[];
} SStreamShow;
int mgmtSaveQueryStreamList(char *cont, int contLen, SConnObj *pConn) {
SAcctObj *pAcct = pConn->pAcct;
if (contLen <= 0 || pAcct == NULL) {
return 0;
}
pthread_mutex_lock(&pAcct->mutex);
if (pConn->pQList) {
pAcct->acctInfo.numOfQueries -= pConn->pQList->numOfQueries;
pAcct->acctInfo.numOfStreams -= pConn->pSList->numOfStreams;
}
pConn->pQList = realloc(pConn->pQList, contLen);
memcpy(pConn->pQList, cont, contLen);
pConn->pSList = (SSList *)(((char *)pConn->pQList) + pConn->pQList->numOfQueries * sizeof(SQDesc) + sizeof(SQList));
pAcct->acctInfo.numOfQueries += pConn->pQList->numOfQueries;
pAcct->acctInfo.numOfStreams += pConn->pSList->numOfStreams;
pthread_mutex_unlock(&pAcct->mutex);
return 0;
int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg) {
// SAcctObj *pAcct = pConn->pAcct;
//
// if (contLen <= 0 || pAcct == NULL) {
// return 0;
// }
//
// pthread_mutex_lock(&pAcct->mutex);
//
// if (pConn->pQList) {
// pAcct->acctInfo.numOfQueries -= pConn->pQList->numOfQueries;
// pAcct->acctInfo.numOfStreams -= pConn->pSList->numOfStreams;
// }
//
// pConn->pQList = realloc(pConn->pQList, contLen);
// memcpy(pConn->pQList, cont, contLen);
//
// pConn->pSList = (SCMStreamList *)(((char *)pConn->pQList) + pConn->pQList->numOfQueries * sizeof(SCMQueryDesc) + sizeof(SCMQqueryList));
//
// pAcct->acctInfo.numOfQueries += pConn->pQList->numOfQueries;
// pAcct->acctInfo.numOfStreams += pConn->pSList->numOfStreams;
//
// pthread_mutex_unlock(&pAcct->mutex);
return TSDB_CODE_SUCCESS;
}
int mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) {
SAcctObj * pAcct = pConn->pAcct;
SQueryShow *pQueryShow;
pthread_mutex_lock(&pAcct->mutex);
pQueryShow = malloc(sizeof(SQDesc) * pAcct->acctInfo.numOfQueries + sizeof(SQueryShow));
pQueryShow->numOfQueries = 0;
pQueryShow->index = 0;
pQueryShow->connInfo = NULL;
pQueryShow->cdesc = NULL;
if (pAcct->acctInfo.numOfQueries > 0) {
pQueryShow->connInfo = (SCDesc *)malloc(pAcct->acctInfo.numOfConns * sizeof(SCDesc));
pQueryShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfQueries * sizeof(SCDesc *));
pConn = pAcct->pConn;
SQDesc * pQdesc = pQueryShow->qdesc;
SCDesc * pCDesc = pQueryShow->connInfo;
SCDesc **ppCDesc = pQueryShow->cdesc;
while (pConn) {
if (pConn->pQList && pConn->pQList->numOfQueries > 0) {
pCDesc->ip = pConn->ip;
pCDesc->port = pConn->port;
strcpy(pCDesc->user, pConn->pUser->user);
memcpy(pQdesc, pConn->pQList->qdesc, sizeof(SQDesc) * pConn->pQList->numOfQueries);
pQdesc += pConn->pQList->numOfQueries;
pQueryShow->numOfQueries += pConn->pQList->numOfQueries;
for (int i = 0; i < pConn->pQList->numOfQueries; ++i, ++ppCDesc) *ppCDesc = pCDesc;
pCDesc++;
}
pConn = pConn->next;
}
}
pthread_mutex_unlock(&pAcct->mutex);
// sorting based on useconds
pShow->pNode = pQueryShow;
int32_t mgmtGetQueries(SShowObj *pShow, void *pConn) {
// SAcctObj * pAcct = pConn->pAcct;
// SQueryShow *pQueryShow;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pQueryShow = malloc(sizeof(SCMQueryDesc) * pAcct->acctInfo.numOfQueries + sizeof(SQueryShow));
// pQueryShow->numOfQueries = 0;
// pQueryShow->index = 0;
// pQueryShow->connInfo = NULL;
// pQueryShow->cdesc = NULL;
//
// if (pAcct->acctInfo.numOfQueries > 0) {
// pQueryShow->connInfo = (SCDesc *)malloc(pAcct->acctInfo.numOfConns * sizeof(SCDesc));
// pQueryShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfQueries * sizeof(SCDesc *));
//
// pConn = pAcct->pConn;
// SCMQueryDesc * pQdesc = pQueryShow->qdesc;
// SCDesc * pCDesc = pQueryShow->connInfo;
// SCDesc **ppCDesc = pQueryShow->cdesc;
//
// while (pConn) {
// if (pConn->pQList && pConn->pQList->numOfQueries > 0) {
// pCDesc->ip = pConn->ip;
// pCDesc->port = pConn->port;
// strcpy(pCDesc->user, pConn->pUser->user);
//
// memcpy(pQdesc, pConn->pQList->qdesc, sizeof(SCMQueryDesc) * pConn->pQList->numOfQueries);
// pQdesc += pConn->pQList->numOfQueries;
// pQueryShow->numOfQueries += pConn->pQList->numOfQueries;
// for (int32_t i = 0; i < pConn->pQList->numOfQueries; ++i, ++ppCDesc) *ppCDesc = pCDesc;
//
// pCDesc++;
// }
// pConn = pConn->next;
// }
// }
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// // sorting based on useconds
//
// pShow->pNode = pQueryShow;
return 0;
}
int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
int32_t mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta);
......@@ -156,7 +156,7 @@ int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 1000000;
pShow->pNode = NULL;
......@@ -166,70 +166,70 @@ int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int mgmtKillQuery(char *qidstr, SConnObj *pConn) {
char *temp, *chr, idstr[64];
strcpy(idstr, qidstr);
temp = idstr;
chr = strchr(temp, ':');
if (chr == NULL) goto _error;
*chr = 0;
uint32_t ip = inet_addr(temp);
temp = chr + 1;
chr = strchr(temp, ':');
if (chr == NULL) goto _error;
*chr = 0;
uint16_t port = htons(atoi(temp));
temp = chr + 1;
uint32_t queryId = atoi(temp);
SAcctObj *pAcct = pConn->pAcct;
pthread_mutex_lock(&pAcct->mutex);
pConn = pAcct->pConn;
while (pConn) {
if (pConn->ip == ip && pConn->port == port && pConn->pQList) {
int i;
SQDesc *pQDesc = pConn->pQList->qdesc;
for (i = 0; i < pConn->pQList->numOfQueries; ++i, ++pQDesc) {
if (pQDesc->queryId == queryId) break;
}
if (i < pConn->pQList->numOfQueries) break;
}
pConn = pConn->next;
}
if (pConn) pConn->queryId = queryId;
pthread_mutex_unlock(&pAcct->mutex);
if (pConn == NULL || pConn->pQList == NULL || pConn->pQList->numOfQueries == 0) goto _error;
mTrace("query:%s is there, kill it", qidstr);
return 0;
_error:
mTrace("query:%s is not there", qidstr);
int32_t mgmtKillQuery(char *qidstr, void *pConn) {
// char *temp, *chr, idstr[64];
// strcpy(idstr, qidstr);
//
// temp = idstr;
// chr = strchr(temp, ':');
// if (chr == NULL) goto _error;
// *chr = 0;
// uint32_t ip = inet_addr(temp);
//
// temp = chr + 1;
// chr = strchr(temp, ':');
// if (chr == NULL) goto _error;
// *chr = 0;
// uint16_t port = htons(atoi(temp));
//
// temp = chr + 1;
// uint32_t queryId = atoi(temp);
//
// SAcctObj *pAcct = pConn->pAcct;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pConn = pAcct->pConn;
// while (pConn) {
// if (pConn->ip == ip && pConn->port == port && pConn->pQList) {
// int32_t i;
// SCMQueryDesc *pQDesc = pConn->pQList->qdesc;
// for (i = 0; i < pConn->pQList->numOfQueries; ++i, ++pQDesc) {
// if (pQDesc->queryId == queryId) break;
// }
//
// if (i < pConn->pQList->numOfQueries) break;
// }
//
// pConn = pConn->next;
// }
//
// if (pConn) pConn->queryId = queryId;
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// if (pConn == NULL || pConn->pQList == NULL || pConn->pQList->numOfQueries == 0) goto _error;
//
// mTrace("query:%s is there, kill it", qidstr);
// return 0;
//
//_error:
// mTrace("query:%s is not there", qidstr);
return TSDB_CODE_INVALID_QUERY_ID;
}
int mgmtRetrieveQueries(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
char *pWrite;
int cols = 0;
int32_t cols = 0;
SQueryShow *pQueryShow = (SQueryShow *)pShow->pNode;
if (rows > pQueryShow->numOfQueries - pQueryShow->index) rows = pQueryShow->numOfQueries - pQueryShow->index;
while (numOfRows < rows) {
SQDesc *pNode = pQueryShow->qdesc + pQueryShow->index;
SCMQueryDesc *pNode = pQueryShow->qdesc + pQueryShow->index;
SCDesc *pCDesc = pQueryShow->cdesc[pQueryShow->index];
cols = 0;
......@@ -269,55 +269,55 @@ int mgmtRetrieveQueries(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
return numOfRows;
}
int mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) {
SAcctObj * pAcct = pConn->pAcct;
SStreamShow *pStreamShow;
pthread_mutex_lock(&pAcct->mutex);
pStreamShow = malloc(sizeof(SSDesc) * pAcct->acctInfo.numOfStreams + sizeof(SQueryShow));
pStreamShow->numOfStreams = 0;
pStreamShow->index = 0;
pStreamShow->connInfo = NULL;
pStreamShow->cdesc = NULL;
if (pAcct->acctInfo.numOfStreams > 0) {
pStreamShow->connInfo = (SCDesc *)malloc(pAcct->acctInfo.numOfConns * sizeof(SCDesc));
pStreamShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfStreams * sizeof(SCDesc *));
pConn = pAcct->pConn;
SSDesc * pSdesc = pStreamShow->sdesc;
SCDesc * pCDesc = pStreamShow->connInfo;
SCDesc **ppCDesc = pStreamShow->cdesc;
while (pConn) {
if (pConn->pSList && pConn->pSList->numOfStreams > 0) {
pCDesc->ip = pConn->ip;
pCDesc->port = pConn->port;
strcpy(pCDesc->user, pConn->pUser->user);
memcpy(pSdesc, pConn->pSList->sdesc, sizeof(SSDesc) * pConn->pSList->numOfStreams);
pSdesc += pConn->pSList->numOfStreams;
pStreamShow->numOfStreams += pConn->pSList->numOfStreams;
for (int i = 0; i < pConn->pSList->numOfStreams; ++i, ++ppCDesc) *ppCDesc = pCDesc;
pCDesc++;
}
pConn = pConn->next;
}
}
pthread_mutex_unlock(&pAcct->mutex);
// sorting based on useconds
pShow->pNode = pStreamShow;
int32_t mgmtGetStreams(SShowObj *pShow, void *pConn) {
// SAcctObj * pAcct = pConn->pAcct;
// SStreamShow *pStreamShow;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pStreamShow = malloc(sizeof(SCMStreamDesc) * pAcct->acctInfo.numOfStreams + sizeof(SQueryShow));
// pStreamShow->numOfStreams = 0;
// pStreamShow->index = 0;
// pStreamShow->connInfo = NULL;
// pStreamShow->cdesc = NULL;
//
// if (pAcct->acctInfo.numOfStreams > 0) {
// pStreamShow->connInfo = (SCDesc *)malloc(pAcct->acctInfo.numOfConns * sizeof(SCDesc));
// pStreamShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfStreams * sizeof(SCDesc *));
//
// pConn = pAcct->pConn;
// SCMStreamDesc * pSdesc = pStreamShow->sdesc;
// SCDesc * pCDesc = pStreamShow->connInfo;
// SCDesc **ppCDesc = pStreamShow->cdesc;
//
// while (pConn) {
// if (pConn->pSList && pConn->pSList->numOfStreams > 0) {
// pCDesc->ip = pConn->ip;
// pCDesc->port = pConn->port;
// strcpy(pCDesc->user, pConn->pUser->user);
//
// memcpy(pSdesc, pConn->pSList->sdesc, sizeof(SCMStreamDesc) * pConn->pSList->numOfStreams);
// pSdesc += pConn->pSList->numOfStreams;
// pStreamShow->numOfStreams += pConn->pSList->numOfStreams;
// for (int32_t i = 0; i < pConn->pSList->numOfStreams; ++i, ++ppCDesc) *ppCDesc = pCDesc;
//
// pCDesc++;
// }
// pConn = pConn->next;
// }
// }
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// // sorting based on useconds
//
// pShow->pNode = pStreamShow;
return 0;
}
int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0;
int32_t mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = TSDB_USER_LEN;
......@@ -366,7 +366,7 @@ int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 1000000;
pShow->pNode = NULL;
......@@ -376,17 +376,17 @@ int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0;
}
int mgmtRetrieveStreams(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int numOfRows = 0;
int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
char *pWrite;
int cols = 0;
int32_t cols = 0;
SStreamShow *pStreamShow = (SStreamShow *)pShow->pNode;
if (rows > pStreamShow->numOfStreams - pStreamShow->index) rows = pStreamShow->numOfStreams - pStreamShow->index;
while (numOfRows < rows) {
SSDesc *pNode = pStreamShow->sdesc + pStreamShow->index;
SCMStreamDesc *pNode = pStreamShow->sdesc + pStreamShow->index;
SCDesc *pCDesc = pStreamShow->cdesc[pStreamShow->index];
cols = 0;
......@@ -434,101 +434,101 @@ int mgmtRetrieveStreams(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
return numOfRows;
}
int mgmtKillStream(char *qidstr, SConnObj *pConn) {
char *temp, *chr, idstr[64];
strcpy(idstr, qidstr);
temp = idstr;
chr = strchr(temp, ':');
if (chr == NULL) goto _error;
*chr = 0;
uint32_t ip = inet_addr(temp);
temp = chr + 1;
chr = strchr(temp, ':');
if (chr == NULL) goto _error;
*chr = 0;
uint16_t port = htons(atoi(temp));
temp = chr + 1;
uint32_t streamId = atoi(temp);
SAcctObj *pAcct = pConn->pAcct;
pthread_mutex_lock(&pAcct->mutex);
pConn = pAcct->pConn;
while (pConn) {
if (pConn->ip == ip && pConn->port == port && pConn->pSList) {
int i;
SSDesc *pSDesc = pConn->pSList->sdesc;
for (i = 0; i < pConn->pSList->numOfStreams; ++i, ++pSDesc) {
if (pSDesc->streamId == streamId) break;
}
if (i < pConn->pSList->numOfStreams) break;
}
pConn = pConn->next;
}
if (pConn) pConn->streamId = streamId;
pthread_mutex_unlock(&pAcct->mutex);
if (pConn == NULL || pConn->pSList == NULL || pConn->pSList->numOfStreams == 0) goto _error;
mTrace("stream:%s is there, kill it", qidstr);
return 0;
_error:
mTrace("stream:%s is not there", qidstr);
int32_t mgmtKillStream(char *qidstr, void *pConn) {
// char *temp, *chr, idstr[64];
// strcpy(idstr, qidstr);
//
// temp = idstr;
// chr = strchr(temp, ':');
// if (chr == NULL) goto _error;
// *chr = 0;
// uint32_t ip = inet_addr(temp);
//
// temp = chr + 1;
// chr = strchr(temp, ':');
// if (chr == NULL) goto _error;
// *chr = 0;
// uint16_t port = htons(atoi(temp));
//
// temp = chr + 1;
// uint32_t streamId = atoi(temp);
//
// SAcctObj *pAcct = pConn->pAcct;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pConn = pAcct->pConn;
// while (pConn) {
// if (pConn->ip == ip && pConn->port == port && pConn->pSList) {
// int32_t i;
// SCMStreamDesc *pSDesc = pConn->pSList->sdesc;
// for (i = 0; i < pConn->pSList->numOfStreams; ++i, ++pSDesc) {
// if (pSDesc->streamId == streamId) break;
// }
//
// if (i < pConn->pSList->numOfStreams) break;
// }
//
// pConn = pConn->next;
// }
//
// if (pConn) pConn->streamId = streamId;
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// if (pConn == NULL || pConn->pSList == NULL || pConn->pSList->numOfStreams == 0) goto _error;
//
// mTrace("stream:%s is there, kill it", qidstr);
// return 0;
//
//_error:
// mTrace("stream:%s is not there", qidstr);
return TSDB_CODE_INVALID_STREAM_ID;
}
int mgmtKillConnection(char *qidstr, SConnObj *pConn) {
SConnObj *pConn1 = NULL;
char * temp, *chr, idstr[64];
strcpy(idstr, qidstr);
temp = idstr;
chr = strchr(temp, ':');
if (chr == NULL) goto _error;
*chr = 0;
uint32_t ip = inet_addr(temp);
temp = chr + 1;
uint16_t port = htons(atoi(temp));
SAcctObj *pAcct = pConn->pAcct;
pthread_mutex_lock(&pAcct->mutex);
pConn = pAcct->pConn;
while (pConn) {
if (pConn->ip == ip && pConn->port == port) {
// there maybe two connections from a shell
if (pConn1 == NULL)
pConn1 = pConn;
else
break;
}
pConn = pConn->next;
}
if (pConn1) pConn1->killConnection = 1;
if (pConn) pConn->killConnection = 1;
pthread_mutex_unlock(&pAcct->mutex);
if (pConn1 == NULL) goto _error;
mTrace("connection:%s is there, kill it", qidstr);
return 0;
_error:
mTrace("connection:%s is not there", qidstr);
int32_t mgmtKillConnection(char *qidstr, void *pConn) {
// void *pConn1 = NULL;
// char * temp, *chr, idstr[64];
// strcpy(idstr, qidstr);
//
// temp = idstr;
// chr = strchr(temp, ':');
// if (chr == NULL) goto _error;
// *chr = 0;
// uint32_t ip = inet_addr(temp);
//
// temp = chr + 1;
// uint16_t port = htons(atoi(temp));
// SAcctObj *pAcct = pConn->pAcct;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pConn = pAcct->pConn;
// while (pConn) {
// if (pConn->ip == ip && pConn->port == port) {
// // there maybe two connections from a shell
// if (pConn1 == NULL)
// pConn1 = pConn;
// else
// break;
// }
//
// pConn = pConn->next;
// }
//
// if (pConn1) pConn1->killConnection = 1;
// if (pConn) pConn->killConnection = 1;
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// if (pConn1 == NULL) goto _error;
//
// mTrace("connection:%s is there, kill it", qidstr);
// return 0;
//
//_error:
// mTrace("connection:%s is not there", qidstr);
return TSDB_CODE_INVALID_CONNECTION;
}
此差异已折叠。
......@@ -457,130 +457,130 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) {
return TSDB_CODE_SUCCESS;
}
int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t cols = 0;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = TSDB_METER_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tables");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = pDb->numOfMetrics;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
//
// if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
//
// SSchema *pSchema = tsGetSchema(pMeta);
//
// pShow->bytes[cols] = TSDB_METER_NAME_LEN;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "name");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 8;
// pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
// strcpy(pSchema[cols].name, "created_time");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 2;
// pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
// strcpy(pSchema[cols].name, "columns");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 2;
// pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
// strcpy(pSchema[cols].name, "tags");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 4;
// pSchema[cols].type = TSDB_DATA_TYPE_INT;
// strcpy(pSchema[cols].name, "tables");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pMeta->numOfColumns = htons(cols);
// pShow->numOfColumns = cols;
//
// pShow->offset[0] = 0;
// for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
//
// pShow->numOfRows = pDb->numOfMetrics;
// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
char * pWrite;
int32_t cols = 0;
SSuperTableObj *pTable = NULL;
char prefix[20] = {0};
int32_t prefixLen;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) {
pDb = mgmtGetDb(pConn->pDb->name);
}
if (pDb == NULL) {
return 0;
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && strcmp(pConn->pUser->user, "monitor") != 0 ) {
return 0;
}
}
strcpy(prefix, pDb->name);
strcat(prefix, TS_PATH_DELIMITER);
prefixLen = strlen(prefix);
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char metricName[TSDB_METER_NAME_LEN] = {0};
while (numOfRows < rows) {
pTable = (SSuperTableObj *)pShow->pNode;
if (pTable == NULL) break;
//pShow->pNode = (void *)pTable->next;
if (strncmp(pTable->tableId, prefix, prefixLen)) {
continue;
}
memset(metricName, 0, tListLen(metricName));
extractTableName(pTable->tableId, metricName);
if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, metricName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
continue;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
extractTableName(pTable->tableId, pWrite);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pTable->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pTable->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pTable->numOfTags;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pTable->numOfTables;
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
// char * pWrite;
// int32_t cols = 0;
// SSuperTableObj *pTable = NULL;
// char prefix[20] = {0};
// int32_t prefixLen;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) {
// pDb = mgmtGetDb(pConn->pDb->name);
// }
//
// if (pDb == NULL) {
// return 0;
// }
//
// if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
// if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && strcmp(pConn->pUser->user, "monitor") != 0 ) {
// return 0;
// }
// }
//
// strcpy(prefix, pDb->name);
// strcat(prefix, TS_PATH_DELIMITER);
// prefixLen = strlen(prefix);
//
// SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
// char metricName[TSDB_METER_NAME_LEN] = {0};
//
// while (numOfRows < rows) {
// pTable = (SSuperTableObj *)pShow->pNode;
// if (pTable == NULL) break;
// //pShow->pNode = (void *)pTable->next;
//
// if (strncmp(pTable->tableId, prefix, prefixLen)) {
// continue;
// }
//
// memset(metricName, 0, tListLen(metricName));
// extractTableName(pTable->tableId, metricName);
//
// if (pShow->payloadLen > 0 &&
// patternMatch(pShow->payload, metricName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
// continue;
//
// cols = 0;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// extractTableName(pTable->tableId, pWrite);
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int64_t *)pWrite = pTable->createdTime;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int16_t *)pWrite = pTable->numOfColumns;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int16_t *)pWrite = pTable->numOfTags;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int32_t *)pWrite = pTable->numOfTables;
// cols++;
//
// numOfRows++;
// }
//
// pShow->numOfReads += numOfRows;
return numOfRows;
}
......
......@@ -229,54 +229,54 @@ void mgmtCleanUpMeters() {
mgmtCleanUpSuperTables();
}
int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t cols = 0;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) {
pDb = mgmtGetDb(pConn->pDb->name);
}
if (pDb == NULL) {
return TSDB_CODE_DB_NOT_SELECTED;
}
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = TSDB_METER_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "table_name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_METER_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "stable");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = pDb->numOfTables;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) {
// pDb = mgmtGetDb(pConn->pDb->name);
// }
//
// if (pDb == NULL) {
// return TSDB_CODE_DB_NOT_SELECTED;
// }
//
// SSchema *pSchema = tsGetSchema(pMeta);
//
// pShow->bytes[cols] = TSDB_METER_NAME_LEN;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "table_name");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 8;
// pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
// strcpy(pSchema[cols].name, "created_time");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 2;
// pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
// strcpy(pSchema[cols].name, "columns");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = TSDB_METER_NAME_LEN;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "stable");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pMeta->numOfColumns = htons(cols);
// pShow->numOfColumns = cols;
//
// pShow->offset[0] = 0;
// for (int32_t i = 1; i < cols; ++i) {
// pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
// }
//
// pShow->numOfRows = pDb->numOfTables;
// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
......@@ -292,117 +292,117 @@ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_
}
}
int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
int32_t numOfRead = 0;
int32_t cols = 0;
void *pTable = NULL;
char *pWrite = NULL;
int16_t numOfColumns;
int64_t createdTime;
char *tableId;
char *superTableId;
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) {
pDb = mgmtGetDb(pConn->pDb->name);
}
if (pDb == NULL) {
return 0;
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 &&
strcmp(pConn->pUser->user, "monitor") != 0) {
return 0;
}
}
char prefix[20] = {0};
strcpy(prefix, pDb->name);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = strlen(prefix);
while (numOfRows < rows) {
void *pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable);
if (pTable != NULL) {
SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable;
pShow->pNode = pNormalTableNode;
tableId = pNormalTable->tableId;
superTableId = NULL;
createdTime = pNormalTable->createdTime;
numOfColumns = pNormalTable->numOfColumns;
} else {
void *pStreamTableNode = sdbFetchRow(tsStreamTableSdb, pShow->pNode, (void **) &pTable);
if (pTable != NULL) {
SStreamTableObj *pChildTable = (SStreamTableObj *) pTable;
pShow->pNode = pStreamTableNode;
tableId = pChildTable->tableId;
superTableId = NULL;
createdTime = pChildTable->createdTime;
numOfColumns = pChildTable->numOfColumns;
} else {
void *pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable);
if (pTable != NULL) {
SChildTableObj *pChildTable = (SChildTableObj *) pTable;
pShow->pNode = pChildTableNode;
tableId = pChildTable->tableId;
superTableId = NULL;
createdTime = pChildTable->createdTime;
numOfColumns = pChildTable->superTable->numOfColumns;
} else {
break;
}
}
}
// not belong to current db
if (strncmp(tableId, prefix, prefixLen)) {
continue;
}
char meterName[TSDB_METER_NAME_LEN] = {0};
memset(meterName, 0, tListLen(meterName));
numOfRead++;
// pattern compare for meter name
extractTableName(tableId, meterName);
if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, meterName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH) {
continue;
}
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strncpy(pWrite, meterName, TSDB_METER_NAME_LEN);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *) pWrite = createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *) pWrite = numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (superTableId != NULL) {
extractTableName(superTableId, pWrite);
}
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRead;
const int32_t NUM_OF_COLUMNS = 4;
mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
// int32_t numOfRead = 0;
// int32_t cols = 0;
// void *pTable = NULL;
// char *pWrite = NULL;
//
// int16_t numOfColumns;
// int64_t createdTime;
// char *tableId;
// char *superTableId;
// SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) {
// pDb = mgmtGetDb(pConn->pDb->name);
// }
//
// if (pDb == NULL) {
// return 0;
// }
//
// if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
// if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 &&
// strcmp(pConn->pUser->user, "monitor") != 0) {
// return 0;
// }
// }
//
// char prefix[20] = {0};
// strcpy(prefix, pDb->name);
// strcat(prefix, TS_PATH_DELIMITER);
// int32_t prefixLen = strlen(prefix);
//
// while (numOfRows < rows) {
// void *pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable);
// if (pTable != NULL) {
// SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable;
// pShow->pNode = pNormalTableNode;
// tableId = pNormalTable->tableId;
// superTableId = NULL;
// createdTime = pNormalTable->createdTime;
// numOfColumns = pNormalTable->numOfColumns;
// } else {
// void *pStreamTableNode = sdbFetchRow(tsStreamTableSdb, pShow->pNode, (void **) &pTable);
// if (pTable != NULL) {
// SStreamTableObj *pChildTable = (SStreamTableObj *) pTable;
// pShow->pNode = pStreamTableNode;
// tableId = pChildTable->tableId;
// superTableId = NULL;
// createdTime = pChildTable->createdTime;
// numOfColumns = pChildTable->numOfColumns;
// } else {
// void *pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable);
// if (pTable != NULL) {
// SChildTableObj *pChildTable = (SChildTableObj *) pTable;
// pShow->pNode = pChildTableNode;
// tableId = pChildTable->tableId;
// superTableId = NULL;
// createdTime = pChildTable->createdTime;
// numOfColumns = pChildTable->superTable->numOfColumns;
// } else {
// break;
// }
// }
// }
//
// // not belong to current db
// if (strncmp(tableId, prefix, prefixLen)) {
// continue;
// }
//
// char meterName[TSDB_METER_NAME_LEN] = {0};
// memset(meterName, 0, tListLen(meterName));
// numOfRead++;
//
// // pattern compare for meter name
// extractTableName(tableId, meterName);
//
// if (pShow->payloadLen > 0 &&
// patternMatch(pShow->payload, meterName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH) {
// continue;
// }
//
// cols = 0;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// strncpy(pWrite, meterName, TSDB_METER_NAME_LEN);
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int64_t *) pWrite = createdTime;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int16_t *) pWrite = numOfColumns;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// if (superTableId != NULL) {
// extractTableName(superTableId, pWrite);
// }
// cols++;
//
// numOfRows++;
// }
//
// pShow->numOfReads += numOfRead;
// const int32_t NUM_OF_COLUMNS = 4;
//
// mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
return numOfRows;
}
......@@ -153,75 +153,75 @@ void mgmtCleanUpUsers() {
sdbCloseTable(tsUserSdb);
}
int32_t mgmtGetUserMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = TSDB_USER_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 6;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "privilege");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = pConn->pAcct->acctInfo.numOfUsers;
pShow->pNode = pConn->pAcct->pUser;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
int32_t mgmtGetUserMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
// SSchema *pSchema = tsGetSchema(pMeta);
//
// pShow->bytes[cols] = TSDB_USER_LEN;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "name");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 6;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "privilege");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 8;
// pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
// strcpy(pSchema[cols].name, "created time");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pMeta->numOfColumns = htons(cols);
// pShow->numOfColumns = cols;
//
// pShow->offset[0] = 0;
// for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
//
// pShow->numOfRows = pConn->pAcct->acctInfo.numOfUsers;
// pShow->pNode = pConn->pAcct->pUser;
// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SUserObj *pUser = NULL;
char * pWrite;
int32_t cols = 0;
while (numOfRows < rows) {
pUser = (SUserObj *)pShow->pNode;
if (pUser == NULL) break;
pShow->pNode = (void *)pUser->next;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pUser->user);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pUser->superAuth) {
strcpy(pWrite, "super");
} else if (pUser->writeAuth) {
strcpy(pWrite, "write");
} else {
strcpy(pWrite, "read");
}
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pUser->createdTime;
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
// SUserObj *pUser = NULL;
// char * pWrite;
// int32_t cols = 0;
//
// while (numOfRows < rows) {
// pUser = (SUserObj *)pShow->pNode;
// if (pUser == NULL) break;
// pShow->pNode = (void *)pUser->next;
//
// cols = 0;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// strcpy(pWrite, pUser->user);
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// if (pUser->superAuth) {
// strcpy(pWrite, "super");
// } else if (pUser->writeAuth) {
// strcpy(pWrite, "write");
// } else {
// strcpy(pWrite, "read");
// }
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int64_t *)pWrite = pUser->createdTime;
// cols++;
//
// numOfRows++;
// }
// pShow->numOfReads += numOfRows;
return numOfRows;
}
......@@ -279,3 +279,9 @@ void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t *ssize)
tfree(row);
return NULL;
}
SUserObj *mgmtGetUserFromConn(void *pConn) {
SRpcConnInfo connInfo;
rpcGetConnInfo(pConn, &connInfo);
return mgmtGetUser(connInfo.user);
}
......@@ -248,168 +248,168 @@ void mgmtSetVgroupIdPool() {
void mgmtCleanUpVgroups() { sdbCloseTable(tsVgroupSdb); }
int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t cols = 0;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "vgId");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "meters");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 9;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vgroup status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
int32_t maxReplica = 0;
SVgObj *pVgroup = NULL;
STableInfo *pTable = NULL;
if (pShow->payloadLen > 0 ) {
pTable = mgmtGetTable(pShow->payload);
if (NULL == pTable) {
return TSDB_CODE_INVALID_TABLE_ID;
}
pVgroup = mgmtGetVgroup(pTable->vgId);
if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
} else {
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
pVgroup = pVgroup->next;
}
}
for (int32_t i = 0; i < maxReplica; ++i) {
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "vnode");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 9;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vnode status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
}
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
if (NULL == pTable) {
pShow->numOfRows = pDb->numOfVgroups;
pShow->pNode = pDb->pHead;
} else {
pShow->numOfRows = 1;
pShow->pNode = pVgroup;
}
int32_t mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
//
// if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
//
// SSchema *pSchema = tsGetSchema(pMeta);
//
// pShow->bytes[cols] = 4;
// pSchema[cols].type = TSDB_DATA_TYPE_INT;
// strcpy(pSchema[cols].name, "vgId");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 4;
// pSchema[cols].type = TSDB_DATA_TYPE_INT;
// strcpy(pSchema[cols].name, "meters");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 9;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "vgroup status");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// int32_t maxReplica = 0;
// SVgObj *pVgroup = NULL;
// STableInfo *pTable = NULL;
// if (pShow->payloadLen > 0 ) {
// pTable = mgmtGetTable(pShow->payload);
// if (NULL == pTable) {
// return TSDB_CODE_INVALID_TABLE_ID;
// }
//
// pVgroup = mgmtGetVgroup(pTable->vgId);
// if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;
//
// maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
// } else {
// SVgObj *pVgroup = pDb->pHead;
// while (pVgroup != NULL) {
// maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
// pVgroup = pVgroup->next;
// }
// }
//
// for (int32_t i = 0; i < maxReplica; ++i) {
// pShow->bytes[cols] = 16;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "ip");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 2;
// pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
// strcpy(pSchema[cols].name, "vnode");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 9;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "vnode status");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 16;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "public ip");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
// }
//
// pMeta->numOfColumns = htons(cols);
// pShow->numOfColumns = cols;
//
// pShow->offset[0] = 0;
// for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
//
// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
//
// if (NULL == pTable) {
// pShow->numOfRows = pDb->numOfVgroups;
// pShow->pNode = pDb->pHead;
// } else {
// pShow->numOfRows = 1;
// pShow->pNode = pVgroup;
// }
return 0;
}
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
char * pWrite;
int32_t cols = 0;
char ipstr[20];
int32_t maxReplica = 0;
SDbObj *pDb = NULL;
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
assert(pDb != NULL);
pVgroup = pDb->pHead;
while (pVgroup != NULL) {
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
pVgroup = pVgroup->next;
}
while (numOfRows < rows) {
// pShow->pNode = sdbFetchRow(tsVgroupSdb, pShow->pNode, (void **)&pVgroup);
pVgroup = (SVgObj *)pShow->pNode;
if (pVgroup == NULL) break;
pShow->pNode = (void *)pVgroup->next;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pVgroup->vgId;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pVgroup->numOfTables;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus));
cols++;
for (int32_t i = 0; i < maxReplica; ++i) {
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pVgroup->vnodeGid[i].vnode;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pVgroup->vnodeGid[i].ip != 0) {
char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i);
strcpy(pWrite, vnodeStatus);
} else {
strcpy(pWrite, "null");
}
cols++;
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
}
numOfRows++;
}
pShow->numOfReads += numOfRows;
// SVgObj *pVgroup = NULL;
// char * pWrite;
// int32_t cols = 0;
// char ipstr[20];
//
// int32_t maxReplica = 0;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
// assert(pDb != NULL);
//
// pVgroup = pDb->pHead;
// while (pVgroup != NULL) {
// maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
// pVgroup = pVgroup->next;
// }
//
// while (numOfRows < rows) {
// // pShow->pNode = sdbFetchRow(tsVgroupSdb, pShow->pNode, (void **)&pVgroup);
// pVgroup = (SVgObj *)pShow->pNode;
// if (pVgroup == NULL) break;
// pShow->pNode = (void *)pVgroup->next;
//
// cols = 0;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int32_t *)pWrite = pVgroup->vgId;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int32_t *)pWrite = pVgroup->numOfTables;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// strcpy(pWrite, taosGetVgroupLbStatusStr(pVgroup->lbStatus));
// cols++;
//
// for (int32_t i = 0; i < maxReplica; ++i) {
// tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip);
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// strcpy(pWrite, ipstr);
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int16_t *)pWrite = pVgroup->vnodeGid[i].vnode;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// if (pVgroup->vnodeGid[i].ip != 0) {
// char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i);
// strcpy(pWrite, vnodeStatus);
// } else {
// strcpy(pWrite, "null");
// }
// cols++;
//
// tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp);
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// strcpy(pWrite, ipstr);
// cols++;
// }
//
// numOfRows++;
// }
//
// pShow->numOfReads += numOfRows;
return numOfRows;
}
......
......@@ -216,7 +216,8 @@ void monitorInitDatabaseCb(void *param, TAOS_RES *result, int code) {
if (-code == TSDB_CODE_TABLE_ALREADY_EXIST || -code == TSDB_CODE_DB_ALREADY_EXIST || code >= 0) {
monitorTrace("monitor:%p, sql success, code:%d, %s", monitor->conn, code, monitor->sql);
if (monitor->cmdIndex == MONITOR_CMD_CREATE_TB_LOG) {
taosLogFp = monitorSaveLog;
//TODO
//taosLogFp = monitorSaveLog;
taosLogSqlFp = monitorExecuteSQL;
taosLogAcctFp = monitorSaveAcctLog;
monitorLPrint("dnode:%s is started", tsPrivateIp);
......
......@@ -796,6 +796,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
}
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc = pConn->pRpc;
pHead = rpcDecompressRpcMsg(pHead);
......
......@@ -24,8 +24,9 @@
extern char version[];
const int16_t sdbFileVersion = 0;
int sdbExtConns = 0;
SIpList *pSdbIpList = NULL;
SIpList *pSdbPublicIpList = NULL;
SRpcIpSet *pSdbIpList = NULL;
SRpcIpSet *pSdbPublicIpList = NULL;
SSdbPeer * sdbPeer[SDB_MAX_PEERS]; // first slot for self
#ifdef CLUSTER
int sdbMaster = 0;
......@@ -57,6 +58,17 @@ int64_t sdbGetVersion() {
return sdbVersion;
};
int32_t sdbGetRunStatus() {
if (!tsIsCluster) {
return SDB_STATUS_SERVING;
}
if (sdbInited == NULL) {
return SDB_STATUS_OFFLINE;
}
return sdbStatus;
}
void sdbFinishCommit(void *handle) {
SSdbTable *pTable = (SSdbTable *)handle;
uint32_t sdbEcommit = SDB_ENDCOMMIT;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册