提交 29058645 编写于 作者: S slguan

fix compile errors

上级 e9470f0a
...@@ -4,11 +4,11 @@ PROJECT(TDengine) ...@@ -4,11 +4,11 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(os) ADD_SUBDIRECTORY(os)
ADD_SUBDIRECTORY(util) ADD_SUBDIRECTORY(util)
ADD_SUBDIRECTORY(rpc) ADD_SUBDIRECTORY(rpc)
#ADD_SUBDIRECTORY(client) ADD_SUBDIRECTORY(client)
#ADD_SUBDIRECTORY(kit) ADD_SUBDIRECTORY(kit)
#ADD_SUBDIRECTORY(plugins) ADD_SUBDIRECTORY(plugins)
#ADD_SUBDIRECTORY(sdb) ADD_SUBDIRECTORY(sdb)
#ADD_SUBDIRECTORY(mnode) ADD_SUBDIRECTORY(mnode)
#ADD_SUBDIRECTORY(dnode) ADD_SUBDIRECTORY(dnode)
#ADD_SUBDIRECTORY(vnode) #ADD_SUBDIRECTORY(vnode)
#ADD_SUBDIRECTORY(connector/jdbc) #ADD_SUBDIRECTORY(connector/jdbc)
...@@ -57,7 +57,7 @@ void tscPrintMgmtIp() { ...@@ -57,7 +57,7 @@ void tscPrintMgmtIp() {
tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps); tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
} else { } else {
for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) { for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
tscTrace("mgmt index:%d ip:%s", i, tscMgmtIpList.ipStr[i]); tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpList.ip[i]);
} }
} }
} }
...@@ -66,7 +66,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) { ...@@ -66,7 +66,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) {
tscMgmtIpList.numOfIps = pIpList->numOfIps; tscMgmtIpList.numOfIps = pIpList->numOfIps;
if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) { if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) {
for (int i = 0; i < pIpList->numOfIps; ++i) { for (int i = 0; i < pIpList->numOfIps; ++i) {
tinet_ntoa(tscMgmtIpList.ipStr[i], pIpList->ip[i]); //tinet_ntoa(tscMgmtIpList.ipStr[i], pIpList->ip[i]);
tscMgmtIpList.ip[i] = pIpList->ip[i]; tscMgmtIpList.ip[i] = pIpList->ip[i];
} }
tscTrace("cluster mgmt IP list:"); tscTrace("cluster mgmt IP list:");
...@@ -77,9 +77,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) { ...@@ -77,9 +77,7 @@ void tscSetMgmtIpListFromCluster(SIpList *pIpList) {
void tscSetMgmtIpListFromEdge() { void tscSetMgmtIpListFromEdge() {
if (tscMgmtIpList.numOfIps != 2) { if (tscMgmtIpList.numOfIps != 2) {
tscMgmtIpList.numOfIps = 2; tscMgmtIpList.numOfIps = 2;
strcpy(tscMgmtIpList.ipStr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipStr[1], tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
tscTrace("edge mgmt IP list:"); tscTrace("edge mgmt IP list:");
tscPrintMgmtIp(); tscPrintMgmtIp();
...@@ -351,17 +349,14 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -351,17 +349,14 @@ int tscSendMsgToServer(SSqlObj *pSql) {
uint64_t signature = (uint64_t)pSql->signature; uint64_t signature = (uint64_t)pSql->signature;
//if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart); //if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, pStart);
int ret; if (pSql->cmd.command < TSDB_SQL_MGMT) {
if (pSql->cmd.command < TSDB_SQL_MGMT) rpcSendRequest(pTscMgmtConn, tscMgmtIpList, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
ret = rpcSendRequest(pTscMgmtConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); } else {
else SRpcIpSet rpcSet = tscMgmtIpList;
ret = rpcSendRequest(pVnodeConn, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql); rpcSendRequest(pVnodeConn, rpcSet, pSql->cmd.msgType, pStart, pSql->cmd.payloadLen, pSql);
if (ret >= 0) {
code = 0;
} }
tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, signature); tscTrace("%p send msg code:%d sig:%p", pSql, code, signature);
} }
} }
...@@ -1327,7 +1322,7 @@ void tscKillMetricQuery(SSqlObj *pSql) { ...@@ -1327,7 +1322,7 @@ void tscKillMetricQuery(SSqlObj *pSql) {
* sub-queries not correctly released and master sql object of metric query reaches an abnormal state. * sub-queries not correctly released and master sql object of metric query reaches an abnormal state.
*/ */
pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED; pSql->pSubs[i]->res.code = TSDB_CODE_QUERY_CANCELLED;
taosStopRpcConn(pSql->pSubs[i]->thandle); //taosStopRpcConn(pSql->pSubs[i]->thandle);
} }
pSql->numOfSubs = 0; pSql->numOfSubs = 0;
...@@ -1491,9 +1486,9 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { ...@@ -1491,9 +1486,9 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
pMsg = buf + tsRpcHeadSize; pMsg = buf + tsRpcHeadSize;
pShellMsg = (SShellSubmitMsg *)pMsg; pShellMsg = (SShellSubmitMsg *)pMsg;
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); //pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip), //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip),
htons(pShellMsg->vnode)); // htons(pShellMsg->vnode));
} }
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
...@@ -1511,13 +1506,13 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1511,13 +1506,13 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShellMsg = (SShellSubmitMsg *)pMsg; pShellMsg = (SShellSubmitMsg *)pMsg;
pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1); pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1);
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode); //pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode);
pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_SUBMIT;
tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip),
htons(pShellMsg->vnode)); // htons(pShellMsg->vnode));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -65,11 +65,8 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -65,11 +65,8 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
if (ip && ip[0]) { if (ip && ip[0]) {
tscMgmtIpList.numOfIps = 3; tscMgmtIpList.numOfIps = 3;
strcpy(tscMgmtIpList.ipStr[0], ip);
tscMgmtIpList.ip[0] = inet_addr(ip); tscMgmtIpList.ip[0] = inet_addr(ip);
strcpy(tscMgmtIpList.ipStr[1], tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp); tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
strcpy(tscMgmtIpList.ipStr[2], tsSecondIp);
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp); tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
tscMgmtIpList.index = 0; tscMgmtIpList.index = 0;
tscMgmtIpList.port = tsMgmtShellPort; tscMgmtIpList.port = tsMgmtShellPort;
...@@ -907,7 +904,7 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -907,7 +904,7 @@ void taos_stop_query(TAOS_RES *res) {
return; return;
} }
taosStopRpcConn(pSql->thandle); //taosStopRpcConn(pSql->thandle);
tscTrace("%p query is cancelled", res); tscTrace("%p query is cancelled", res);
} }
......
...@@ -99,12 +99,10 @@ void taos_init_imp() { ...@@ -99,12 +99,10 @@ void taos_init_imp() {
tscMgmtIpList.index = 0; tscMgmtIpList.index = 0;
tscMgmtIpList.port = tsMgmtShellPort; tscMgmtIpList.port = tsMgmtShellPort;
tscMgmtIpList.numOfIps = 1; tscMgmtIpList.numOfIps = 1;
strcpy(tscMgmtIpList.ipStr[0], tsMasterIp);
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
if (tsSecondIp[0]) { if (tsSecondIp[0]) {
tscMgmtIpList.numOfIps = 2; tscMgmtIpList.numOfIps = 2;
strcpy(tscMgmtIpList.ipStr[1], tsSecondIp);
tscMgmtIpList.ip[1] = inet_addr(tsSecondIp); tscMgmtIpList.ip[1] = inet_addr(tsSecondIp);
} }
...@@ -131,7 +129,7 @@ void taos_init_imp() { ...@@ -131,7 +129,7 @@ void taos_init_imp() {
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "TSC-vnode"; rpcInit.label = "TSC-vnode";
rpcInit.numOfThreads = tscNumOfThreads; rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.fp = tscProcessMsgFromServer; rpcInit.afp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxVnodeConnections; rpcInit.sessions = tsMaxVnodeConnections;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
pVnodeConn = rpcOpen(&rpcInit); pVnodeConn = rpcOpen(&rpcInit);
...@@ -145,7 +143,7 @@ void taos_init_imp() { ...@@ -145,7 +143,7 @@ void taos_init_imp() {
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt"; rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.fp = tscProcessMsgFromServer; rpcInit.afp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxMgmtConnections; rpcInit.sessions = tsMaxMgmtConnections;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C(); rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
pTscMgmtConn = rpcOpen(&rpcInit); pTscMgmtConn = rpcOpen(&rpcInit);
......
...@@ -9,9 +9,8 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -9,9 +9,8 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(dnode ${SRC}) ADD_EXECUTABLE(taosd ${SRC})
#ADD_EXECUTABLE(taosd ${SRC}) TARGET_LINK_LIBRARIES(taosd mnode sdb taos_static monitor http)
#TARGET_LINK_LIBRARIES(taosd mnode sdb vnode taos_static monitor http)
#IF (TD_CLUSTER) #IF (TD_CLUSTER)
# TARGET_LINK_LIBRARIES(taosd dcluster) # TARGET_LINK_LIBRARIES(taosd dcluster)
......
...@@ -50,7 +50,7 @@ void* dnodeProcessMsgFromShell(int8_t msgType, void *pCont, int32_t contLen, voi ...@@ -50,7 +50,7 @@ void* dnodeProcessMsgFromShell(int8_t msgType, void *pCont, int32_t contLen, voi
} }
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rpcSendSimpleRsp(handle, TSDB_CODE_NOT_READY); rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0);
dTrace("conn:%p, query msg is ignored since dnode not running", handle); dTrace("conn:%p, query msg is ignored since dnode not running", handle);
return NULL; return NULL;
} }
...@@ -83,7 +83,7 @@ int32_t dnodeInitShell() { ...@@ -83,7 +83,7 @@ int32_t dnodeInitShell() {
rpcInit.localPort = tsVnodeShellPort; rpcInit.localPort = tsVnodeShellPort;
rpcInit.label = "DND-shell"; rpcInit.label = "DND-shell";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = numOfThreads;
rpcInit.fp = dnodeProcessMsgFromShell; rpcInit.cfp = dnodeProcessMsgFromShell;
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S(); rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.idleTime = tsShellActivityTimer * 2000; rpcInit.idleTime = tsShellActivityTimer * 2000;
...@@ -118,7 +118,7 @@ void dnodeProcessQueryRequestCb(int code, void *pQInfo, void *pConn) { ...@@ -118,7 +118,7 @@ void dnodeProcessQueryRequestCb(int code, void *pQInfo, void *pConn) {
queryRsp->code = htonl(code); queryRsp->code = htonl(code);
queryRsp->qhandle = (uint64_t) (pQInfo); queryRsp->qhandle = (uint64_t) (pQInfo);
rpcSendResponse(pConn, queryRsp, contLen); rpcSendResponse(pConn, TSDB_CODE_SUCCESS, queryRsp, contLen);
} }
static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn) { static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn) {
...@@ -134,7 +134,7 @@ void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) { ...@@ -134,7 +134,7 @@ void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) {
assert(pConn != NULL); assert(pConn != NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
rpcSendSimpleRsp(pConn, code); rpcSendResponse(pConn, code, 0, 0);
return; return;
} }
...@@ -142,13 +142,13 @@ void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) { ...@@ -142,13 +142,13 @@ void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) {
int32_t contLen = dnodeGetRetrieveDataSize(pQInfo); int32_t contLen = dnodeGetRetrieveDataSize(pQInfo);
SRetrieveMeterRsp *retrieveRsp = (SRetrieveMeterRsp *) rpcMallocCont(contLen); SRetrieveMeterRsp *retrieveRsp = (SRetrieveMeterRsp *) rpcMallocCont(contLen);
if (retrieveRsp == NULL) { if (retrieveRsp == NULL) {
rpcSendSimpleRsp(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY); rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 0);
return; return;
} }
code = dnodeGetRetrieveData(pQInfo, retrieveRsp); code = dnodeGetRetrieveData(pQInfo, retrieveRsp);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
rpcSendSimpleRsp(pConn, TSDB_CODE_INVALID_QHANDLE); rpcSendResponse(pConn, TSDB_CODE_INVALID_QHANDLE, 0, 0);
} }
retrieveRsp->numOfRows = htonl(retrieveRsp->numOfRows); retrieveRsp->numOfRows = htonl(retrieveRsp->numOfRows);
...@@ -156,7 +156,7 @@ void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) { ...@@ -156,7 +156,7 @@ void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) {
retrieveRsp->offset = htobe64(retrieveRsp->offset); retrieveRsp->offset = htobe64(retrieveRsp->offset);
retrieveRsp->useconds = htobe64(retrieveRsp->useconds); retrieveRsp->useconds = htobe64(retrieveRsp->useconds);
rpcSendResponse(pConn, retrieveRsp, contLen); rpcSendResponse(pConn, TSDB_CODE_SUCCESS, retrieveRsp, contLen);
} }
static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn) { static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn) {
...@@ -170,14 +170,14 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) { ...@@ -170,14 +170,14 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) {
assert(result != NULL); assert(result != NULL);
if (result->code != 0) { if (result->code != 0) {
rpcSendSimpleRsp(pConn, result->code); rpcSendResponse(pConn, result->code, 0, 0);
return; return;
} }
int32_t contLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); int32_t contLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock);
SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) rpcMallocCont(contLen); SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) rpcMallocCont(contLen);
if (submitRsp == NULL) { if (submitRsp == NULL) {
rpcSendSimpleRsp(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY); rpcSendResponse(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 0);
return; return;
} }
...@@ -202,7 +202,7 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) { ...@@ -202,7 +202,7 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) {
submitRsp->failedRows = htonl(submitRsp->failedRows); submitRsp->failedRows = htonl(submitRsp->failedRows);
submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks); submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks);
rpcSendResponse(pConn, submitRsp, contLen); rpcSendResponse(pConn, TSDB_CODE_SUCCESS, submitRsp, contLen);
} }
static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn) { static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn) {
......
...@@ -19,14 +19,34 @@ ...@@ -19,14 +19,34 @@
#include "taoserror.h" #include "taoserror.h"
#include "dnodeVnodeMgmt.h" #include "dnodeVnodeMgmt.h"
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) { int32_t dnodeOpenVnodes() {
return TSDB_VN_STATUS_MASTER; return 0;
}
int32_t dnodeCleanupVnodes() {
return 0;
} }
bool dnodeCheckVnodeExist(int32_t vnode) { bool dnodeCheckVnodeExist(int32_t vnode) {
return true; return true;
} }
int32_t dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg) {
return 0;
}
int32_t dnodeDropVnode(int32_t vnode) {
return 0;
}
void* dnodeGetVnode(int vid) {
return NULL;
}
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) {
return TSDB_VN_STATUS_MASTER;
}
bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) { bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) {
return true; return true;
} }
......
...@@ -152,9 +152,7 @@ typedef struct SSuperTableObj { ...@@ -152,9 +152,7 @@ typedef struct SSuperTableObj {
int8_t reserved[7]; int8_t reserved[7];
int8_t updateEnd[1]; int8_t updateEnd[1];
pthread_rwlock_t rwLock; pthread_rwlock_t rwLock;
struct SSuperTableObj *prev, *next;
int16_t nextColId; int16_t nextColId;
int8_t *schema; int8_t *schema;
...@@ -217,11 +215,11 @@ typedef struct _vg_obj { ...@@ -217,11 +215,11 @@ typedef struct _vg_obj {
int32_t lbIp; int32_t lbIp;
int32_t lbTime; int32_t lbTime;
int8_t lbStatus; int8_t lbStatus;
int8_t reserved[16]; int8_t reserved[16];
int8_t updateEnd[1]; int8_t updateEnd[1];
struct _vg_obj *prev, *next; struct _vg_obj *prev, *next;
void * idPool; void * idPool;
STabObj ** meterList; void ** meterList;
} SVgObj; } SVgObj;
typedef struct _db_obj { typedef struct _db_obj {
......
...@@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) ...@@ -2,6 +2,7 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine) PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
......
...@@ -3,6 +3,7 @@ PROJECT(TDengine) ...@@ -3,6 +3,7 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
......
...@@ -3,6 +3,7 @@ PROJECT(TDengine) ...@@ -3,6 +3,7 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
......
...@@ -11,7 +11,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -11,7 +11,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(mnode ${SRC}) ADD_LIBRARY(mnode ${SRC})
#TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread) TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread)
#IF (TD_CLUSTER) #IF (TD_CLUSTER)
# TARGET_LINK_LIBRARIES(mnode mcluster) # TARGET_LINK_LIBRARIES(mnode mcluster)
......
...@@ -28,8 +28,6 @@ int mgmtUseDb(SConnObj *pConn, char *name); ...@@ -28,8 +28,6 @@ int mgmtUseDb(SConnObj *pConn, char *name);
int mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup); int mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup);
int mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup); int mgmtAddVgroupIntoDbTail(SDbObj *pDb, SVgObj *pVgroup);
int mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup); int mgmtRemoveVgroupFromDb(SDbObj *pDb, SVgObj *pVgroup);
int mgmtAddMetricIntoDb(SDbObj *pDb, STabObj *pMetric);
int mgmtRemoveMetricFromDb(SDbObj *pDb, STabObj *pMetric);
int mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup); int mgmtMoveVgroupToTail(SDbObj *pDb, SVgObj *pVgroup);
int mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup); int mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup);
int mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); int mgmtGetDbMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
......
...@@ -38,8 +38,8 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid); ...@@ -38,8 +38,8 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid);
char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type); char *taosBuildRspMsgToDnode(SDnodeObj *pObj, char type);
char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type); char *taosBuildReqMsgToDnode(SDnodeObj *pObj, char type);
extern int32_t (*mgmtSendSimpleRspToDnode)(int32_t msgType, int32_t code); extern int32_t (*mgmtSendSimpleRspToDnode)(void *pConn, int32_t msgType, int32_t code);
extern int32_t (*mgmtSendMsgToDnode)(SDnodeObj *pObj, char *msg, int msgLen); extern int32_t (*mgmtSendMsgToDnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
extern int32_t (*mgmtInitDnodeInt)(); extern int32_t (*mgmtInitDnodeInt)();
extern void (*mgmtCleanUpDnodeInt)(); extern void (*mgmtCleanUpDnodeInt)();
extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId); extern void (*mgmtProcessDnodeStatus)(void *handle, void *tmrId);
......
...@@ -34,7 +34,7 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char * ...@@ -34,7 +34,7 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *
SNormalTableObj* mgmtGetNormalTable(char *tableId); SNormalTableObj* mgmtGetNormalTable(char *tableId);
SSchema* mgmtGetNormalTableSchema(SNormalTableObj *pTable); SSchema* mgmtGetNormalTableSchema(SNormalTableObj *pTable);
int8_t * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int8_t *pMsg, int32_t vnode); int8_t * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int32_t vnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -33,6 +33,7 @@ typedef struct { ...@@ -33,6 +33,7 @@ typedef struct {
int mgmtInitMeters(); int mgmtInitMeters();
STableObj mgmtGetTable(char *tableId); STableObj mgmtGetTable(char *tableId);
STableObj mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
STabObj *mgmtGetTableInfo(char *src, char *tags[]); STabObj *mgmtGetTableInfo(char *src, char *tags[]);
int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SSuperTableMetaMsg *pInfo); int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
......
...@@ -28,8 +28,7 @@ extern "C" { ...@@ -28,8 +28,7 @@ extern "C" {
bool mgmtTableCreateFromSuperTable(STabObj *pTableObj); bool mgmtTableCreateFromSuperTable(STabObj *pTableObj);
bool mgmtIsSuperTable(STabObj *pTableObj); bool mgmtIsSuperTable(STabObj *pTableObj);
bool mgmtIsNormalTable(STabObj *pTableObj); bool mgmtIsNormalTable(STabObj *pTableObj);
char* mgmtTableGetTag(STabObj* pTable, int32_t col, SSchema* pTagColSchema); int32_t mgmtGetTagsLength(SSuperTableObj* pSuperTable, int32_t col);
int32_t mgmtGetTagsLength(STabObj* pSuperTable, int32_t col);
bool mgmtCheckIsMonitorDB(char *db, char *monitordb); bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate); int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "mgmtTable.h" #include "mgmtTable.h"
#include "mgmtUser.h" #include "mgmtUser.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "taoserror.h"
extern void *userSdb; extern void *userSdb;
extern void *dbSdb; extern void *dbSdb;
...@@ -156,7 +157,7 @@ int32_t mgmtCheckDbLimitImp(SAcctObj *pAcct) { ...@@ -156,7 +157,7 @@ int32_t mgmtCheckDbLimitImp(SAcctObj *pAcct) {
int numOfDbs = sdbGetNumOfRows(dbSdb); int numOfDbs = sdbGetNumOfRows(dbSdb);
if (numOfDbs >= tsMaxDbs) { if (numOfDbs >= tsMaxDbs) {
mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs); mWarn("numOfDbs:%d, exceed tsMaxDbs:%d", numOfDbs, tsMaxDbs);
return TSDB_CODE_TOO_MANY_DATABSES; return TSDB_CODE_TOO_MANY_DATABASES;
} }
return 0; return 0;
} }
......
...@@ -208,86 +208,86 @@ void mgmtCleanUpChildTables() { ...@@ -208,86 +208,86 @@ void mgmtCleanUpChildTables() {
int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32_t vnode, int32_t tagDataLen, int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32_t vnode, int32_t tagDataLen,
int8_t *pTagData) { int8_t *pTagData) {
SCreateChildTableMsg *pCreateTable = (SCreateChildTableMsg *) pMsg; SCreateTableMsg *pCreateTable = (SCreateTableMsg *) pMsg;
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); // memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN); // memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN);
pCreateTable->vnode = htonl(vnode); // pCreateTable->vnode = htonl(vnode);
pCreateTable->sid = htonl(pTable->sid); // pCreateTable->sid = htonl(pTable->sid);
pCreateTable->uid = pTable->uid; // pCreateTable->uid = pTable->uid;
pCreateTable->createdTime = htobe64(pTable->createdTime); // pCreateTable->createdTime = htobe64(pTable->createdTime);
pCreateTable->sversion = htonl(pTable->superTable->sversion); // pCreateTable->sversion = htonl(pTable->superTable->sversion);
pCreateTable->numOfColumns = htons(pTable->superTable->numOfColumns); // pCreateTable->numOfColumns = htons(pTable->superTable->numOfColumns);
pCreateTable->numOfTags = htons(pTable->superTable->numOfTags); // pCreateTable->numOfTags = htons(pTable->superTable->numOfTags);
//
SSchema *pSchema = pTable->superTable->schema; // SSchema *pSchema = pTable->superTable->schema;
int32_t totalCols = pCreateTable->numOfColumns + pCreateTable->numOfTags; // int32_t totalCols = pCreateTable->numOfColumns + pCreateTable->numOfTags;
//
for (int32_t col = 0; col < totalCols; ++col) { // for (int32_t col = 0; col < totalCols; ++col) {
SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; // SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
colData->type = pSchema[col].type; // colData->type = pSchema[col].type;
colData->bytes = htons(pSchema[col].bytes); // colData->bytes = htons(pSchema[col].bytes);
colData->colId = htons(pSchema[col].colId); // colData->colId = htons(pSchema[col].colId);
} // }
//
int32_t totalColsSize = sizeof(SMColumn *) * totalCols; // int32_t totalColsSize = sizeof(SMColumn *) * totalCols;
pMsg = pCreateTable->data + totalColsSize + tagDataLen; // pMsg = pCreateTable->data + totalColsSize + tagDataLen;
//
memcpy(pCreateTable->data + totalColsSize, pTagData, tagDataLen); // memcpy(pCreateTable->data + totalColsSize, pTagData, tagDataLen);
pCreateTable->tagDataLen = htonl(tagDataLen); // pCreateTable->tagDataLen = htonl(tagDataLen);
return pMsg; return pMsg;
} }
int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int numOfTables = sdbGetNumOfRows(tsChildTableSdb); // int numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= tsMaxTables) { // if (numOfTables >= tsMaxTables) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables); // mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables);
return TSDB_CODE_TOO_MANY_TABLES; // return TSDB_CODE_TOO_MANY_TABLES;
} // }
//
char *pTagData = (char *) pCreate->schema; // it is a tag key // char *pTagData = (char *) pCreate->schema; // it is a tag key
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); // SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData);
if (pSuperTable == NULL) { // if (pSuperTable == NULL) {
mError("table:%s, corresponding super table does not exist", pCreate->meterId); // mError("table:%s, corresponding super table does not exist", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE; // return TSDB_CODE_INVALID_TABLE;
} // }
//
SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1); // SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1);
if (pTable == NULL) { // if (pTable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY; // return TSDB_CODE_SERV_OUT_OF_MEMORY;
} // }
strcpy(pTable->tableId, pCreate->meterId); // strcpy(pTable->tableId, pCreate->meterId);
strcpy(pTable->superTableId, pSuperTable->tableId); // strcpy(pTable->superTableId, pSuperTable->tableId);
pTable->createdTime = taosGetTimestampMs(); // pTable->createdTime = taosGetTimestampMs();
pTable->superTable = pSuperTable; // pTable->superTable = pSuperTable;
pTable->vgId = pVgroup->vgId; // pTable->vgId = pVgroup->vgId;
pTable->sid = sid; // pTable->sid = sid;
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + // pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); // ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
//
SVariableMsg tags = {0}; // SVariableMsg tags = {0};
tags.size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t) TSDB_TABLE_ID_LEN; // tags.size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t) TSDB_TABLE_ID_LEN;
tags.data = (char *) calloc(1, tags.size); // tags.data = (char *) calloc(1, tags.size);
if (tags.data == NULL) { // if (tags.data == NULL) {
free(pTable); // free(pTable);
mError("table:%s, corresponding super table schema is null", pCreate->meterId); // mError("table:%s, corresponding super table schema is null", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE; // return TSDB_CODE_INVALID_TABLE;
} // }
memcpy(tags.data, pTagData, tags.size); // memcpy(tags.data, pTagData, tags.size);
//
if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) { // if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->meterId); // mError("table:%s, update sdb error", pCreate->meterId);
return TSDB_CODE_SDB_ERROR; // return TSDB_CODE_SDB_ERROR;
} // }
//
mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1); // mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1);
//
mgmtSendCreateChildTableMsg(pTable, pVgroup, tags.size, tags.data); // mgmtSendCreateChildTableMsg(pTable, pVgroup, tags.size, tags.data);
//
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" // mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%"
PRIu64 // PRIu64
" db:%s", // " db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); // pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
return 0; return 0;
} }
...@@ -323,14 +323,14 @@ SChildTableObj* mgmtGetChildTable(char *tableId) { ...@@ -323,14 +323,14 @@ SChildTableObj* mgmtGetChildTable(char *tableId) {
} }
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) { int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) {
int col = mgmtFindTagCol(pTable->superTable, tagName); // int col = mgmtFindTagCol(pTable->superTable, tagName);
if (col < 0 || col > pTable->superTable->numOfTags) { // if (col < 0 || col > pTable->superTable->numOfTags) {
return TSDB_CODE_APP_ERROR; // return TSDB_CODE_APP_ERROR;
} // }
//
//TODO send msg to dnode // //TODO send msg to dnode
mTrace("Succeed to modify tag column %d of table %s", col, pTable->tableId); // mTrace("Succeed to modify tag column %d of table %s", col, pTable->tableId);
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
// int rowSize = 0; // int rowSize = 0;
// SSchema *schema = (SSchema *)(pSuperTable->schema + (pSuperTable->numOfColumns + col) * sizeof(SSchema)); // SSchema *schema = (SSchema *)(pSuperTable->schema + (pSuperTable->numOfColumns + col) * sizeof(SSchema));
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "tschemautil.h" #include "tschemautil.h"
#include "tstatus.h" #include "tstatus.h"
#include "mnode.h" #include "mnode.h"
#include "taoserror.h"
void *dbSdb = NULL; void *dbSdb = NULL;
extern void *vgSdb; extern void *vgSdb;
...@@ -324,7 +325,7 @@ int mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) { ...@@ -324,7 +325,7 @@ int mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) {
} }
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
return TSDB_CODE_MONITOR_DB_FORBEIDEN; return TSDB_CODE_MONITOR_DB_FORBIDDEN;
} }
return mgmtDropDb(pDb); return mgmtDropDb(pDb);
...@@ -479,33 +480,6 @@ int mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup) { ...@@ -479,33 +480,6 @@ int mgmtMoveVgroupToHead(SDbObj *pDb, SVgObj *pVgroup) {
return 0; return 0;
} }
int mgmtAddMetricIntoDb(SDbObj *pDb, STabObj *pMetric) {
pMetric->next = pDb->pMetric;
pMetric->prev = NULL;
if (pDb->pMetric) pDb->pMetric->prev = pMetric;
pDb->pMetric = pMetric;
pDb->numOfMetrics++;
return 0;
}
int mgmtRemoveMetricFromDb(SDbObj *pDb, STabObj *pMetric) {
if (pMetric->prev) pMetric->prev->next = pMetric->next;
if (pMetric->next) pMetric->next->prev = pMetric->prev;
if (pMetric->prev == NULL) pDb->pMetric = pMetric->next;
pDb->numOfMetrics--;
if (pMetric->pSkipList != NULL) {
pMetric->pSkipList = tSkipListDestroy(pMetric->pSkipList);
}
return 0;
}
int mgmtShowTables(SAcctObj *pAcct, char *db) { int mgmtShowTables(SAcctObj *pAcct, char *db) {
int code; int code;
......
此差异已折叠。
...@@ -39,7 +39,7 @@ ...@@ -39,7 +39,7 @@
#include "mgmtNormalTable.h" #include "mgmtNormalTable.h"
void *tsSuperTableSdb; void *tsNormalTableSdb;
void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize); void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize);
...@@ -218,8 +218,9 @@ void mgmtCleanUpNormalTables() { ...@@ -218,8 +218,9 @@ void mgmtCleanUpNormalTables() {
sdbCloseTable(tsNormalTableSdb); sdbCloseTable(tsNormalTableSdb);
} }
int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int8_t *pMsg, int32_t vnode) { int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int32_t vnode) {
SCreateNormalTableMsg *pCreateTable = (SCreateNormalTableMsg *) pMsg; int8_t *pMsg = NULL;
SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg;
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
pCreateTable->vnode = htobe32(vnode); pCreateTable->vnode = htobe32(vnode);
pCreateTable->sid = htobe32(pTable->sid); pCreateTable->sid = htobe32(pTable->sid);
...@@ -231,15 +232,15 @@ int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int8_t *pMsg, int ...@@ -231,15 +232,15 @@ int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int8_t *pMsg, int
SSchema *pSchema = pTable->schema; SSchema *pSchema = pTable->schema;
int32_t totalCols = pCreateTable->numOfColumns; int32_t totalCols = pCreateTable->numOfColumns;
for (int32_t col = 0; col < totalCols; ++col) { // for (int32_t col = 0; col < totalCols; ++col) {
SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; // SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
colData->type = pSchema[col].type; // colData->type = pSchema[col].type;
colData->bytes = htons(pSchema[col].bytes); // colData->bytes = htons(pSchema[col].bytes);
colData->colId = htons(pSchema[col].colId); // colData->colId = htons(pSchema[col].colId);
} // }
int32_t totalColsSize = sizeof(SMColumn *) * totalCols; // int32_t totalColsSize = sizeof(SMColumn *) * totalCols;
pMsg = pCreateTable->data + totalColsSize; // pMsg = pCreateTable->data + totalColsSize;
return pMsg; return pMsg;
} }
......
此差异已折叠。
...@@ -39,7 +39,7 @@ ...@@ -39,7 +39,7 @@
#include "mgmtStreamTable.h" #include "mgmtStreamTable.h"
void *tsSuperTableSdb; void *tsStreamTableSdb;
void *(*mgmtStreamTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); void *(*mgmtStreamTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize); void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize);
...@@ -78,7 +78,7 @@ void *mgmtStreamTableActionReset(void *row, char *str, int size, int *ssize) { ...@@ -78,7 +78,7 @@ void *mgmtStreamTableActionReset(void *row, char *str, int size, int *ssize) {
} }
void *mgmtStreamTableActionDestroy(void *row, char *str, int size, int *ssize) { void *mgmtStreamTableActionDestroy(void *row, char *str, int size, int *ssize) {
SSuperTableObj *pTable = (STabObj *)row; SStreamTableObj *pTable = (SStreamTableObj *)row;
mgmtDestroyStreamTable(pTable); mgmtDestroyStreamTable(pTable);
return NULL; return NULL;
} }
...@@ -230,7 +230,7 @@ void mgmtCleanUpStreamTables() { ...@@ -230,7 +230,7 @@ void mgmtCleanUpStreamTables() {
} }
int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int32_t vnode) { int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int32_t vnode) {
SCreateStreamTableMsg *pCreateTable = (SCreateStreamTableMsg *) pMsg; SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg;
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
pCreateTable->vnode = htonl(vnode); pCreateTable->vnode = htonl(vnode);
pCreateTable->sid = htonl(pTable->sid); pCreateTable->sid = htonl(pTable->sid);
...@@ -238,23 +238,23 @@ int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int ...@@ -238,23 +238,23 @@ int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int
pCreateTable->createdTime = htobe64(pTable->createdTime); pCreateTable->createdTime = htobe64(pTable->createdTime);
pCreateTable->sversion = htonl(pTable->sversion); pCreateTable->sversion = htonl(pTable->sversion);
pCreateTable->numOfColumns = htons(pTable->numOfColumns); pCreateTable->numOfColumns = htons(pTable->numOfColumns);
pCreateTable->sqlLen = htons(pTable->sqlLen); //pCreateTable->sqlLen = htons(pTable->sqlLen);
SSchema *pSchema = pTable->schema; SSchema *pSchema = pTable->schema;
int32_t totalCols = pCreateTable->numOfColumns; int32_t totalCols = pCreateTable->numOfColumns;
for (int32_t col = 0; col < totalCols; ++col) { // for (int32_t col = 0; col < totalCols; ++col) {
SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; // SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
colData->type = pSchema[col].type; // colData->type = pSchema[col].type;
colData->bytes = htons(pSchema[col].bytes); // colData->bytes = htons(pSchema[col].bytes);
colData->colId = htons(pSchema[col].colId); // colData->colId = htons(pSchema[col].colId);
} // }
int32_t totalColsSize = sizeof(SMColumn *) * totalCols; // int32_t totalColsSize = sizeof(SMColumn *) * totalCols;
pMsg = pCreateTable->data + totalColsSize + pTable->sqlLen; // pMsg = pCreateTable->data + totalColsSize + pTable->sqlLen;
char *sql = pTable->schema + pTable->schemaSize; // char *sql = pTable->schema + pTable->schemaSize;
memcpy(pCreateTable->data + totalColsSize, pTable->sqlLen, sql); // memcpy(pCreateTable->data + totalColsSize, pTable->sqlLen, sql);
return pMsg; return pMsg;
} }
...@@ -345,6 +345,6 @@ int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) { ...@@ -345,6 +345,6 @@ int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) {
return 0; return 0;
} }
SStreamTableObj* mgmtGetStreamTable(char *tableId); { SStreamTableObj* mgmtGetStreamTable(char *tableId) {
return (SStreamTableObj *)sdbGetRow(tsStreamTableSdb, tableId); return (SStreamTableObj *)sdbGetRow(tsStreamTableSdb, tableId);
} }
\ No newline at end of file
...@@ -93,7 +93,7 @@ void *mgmtSuperTableActionReset(void *row, char *str, int size, int *ssize) { ...@@ -93,7 +93,7 @@ void *mgmtSuperTableActionReset(void *row, char *str, int size, int *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row; SSuperTableObj *pTable = (SSuperTableObj *) row;
int tsize = pTable->updateEnd - (int8_t *) pTable; int tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(pTable, str, tsize); memcpy(pTable, str, tsize);
pTable->schema = (char *) realloc(pTable->schema, pTable->schemaSize); pTable->schema = realloc(pTable->schema, pTable->schemaSize);
memcpy(pTable->schema, str + tsize, pTable->schemaSize); memcpy(pTable->schema, str + tsize, pTable->schemaSize);
return NULL; return NULL;
} }
...@@ -105,20 +105,10 @@ void *mgmtSuperTableActionDestroy(void *row, char *str, int size, int *ssize) { ...@@ -105,20 +105,10 @@ void *mgmtSuperTableActionDestroy(void *row, char *str, int size, int *ssize) {
} }
void *mgmtSuperTableActionInsert(void *row, char *str, int size, int *ssize) { void *mgmtSuperTableActionInsert(void *row, char *str, int size, int *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row;
SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId);
if (pDb) {
mgmtAddMetricIntoDb(pDb, pTable);
}
return NULL; return NULL;
} }
void *mgmtSuperTableActionDelete(void *row, char *str, int size, int *ssize) { void *mgmtSuperTableActionDelete(void *row, char *str, int size, int *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row;
SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId);
if (pDb) {
mgmtRemoveMetricFromDb(pDb, pTable);
}
return NULL; return NULL;
} }
...@@ -159,7 +149,7 @@ void *mgmtSuperTableActionDecode(void *row, char *str, int size, int *ssize) { ...@@ -159,7 +149,7 @@ void *mgmtSuperTableActionDecode(void *row, char *str, int size, int *ssize) {
} }
memcpy(pTable, str, tsize); memcpy(pTable, str, tsize);
pTable->schema = (char *)malloc(pTable->schemaSize); pTable->schema = malloc(pTable->schemaSize);
if (pTable->schema == NULL) { if (pTable->schema == NULL) {
mgmtDestroySuperTable(pTable); mgmtDestroySuperTable(pTable);
return NULL; return NULL;
...@@ -179,95 +169,38 @@ void *mgmtSuperTableAction(char action, void *row, char *str, int size, int *ssi ...@@ -179,95 +169,38 @@ void *mgmtSuperTableAction(char action, void *row, char *str, int size, int *ssi
int32_t mgmtInitSuperTables() { int32_t mgmtInitSuperTables() {
void * pNode = NULL; void * pNode = NULL;
void * pLastNode = NULL; void * pLastNode = NULL;
SVgObj * pVgroup = NULL; SSuperTableObj * pTable = NULL;
STabObj * pTable = NULL;
STabObj * pMetric = NULL;
SDbObj * pDb = NULL;
SAcctObj *pAcct = NULL;
// TODO: Make sure this function only run once // TODO: Make sure this function only run once
mgmtSuperTableActionInit(); mgmtSuperTableActionInit();
tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN, tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN,
"meters", SDB_KEYTYPE_STRING, mgmtDirectory, mgmtSuperTableAction); "meters", SDB_KEYTYPE_STRING, mgmtDirectory, mgmtSuperTableAction);
if (meterSdb == NULL) { if (tsSuperTableSdb == NULL) {
mError("failed to init meter data"); mError("failed to init meter data");
return -1; return -1;
} }
pNode = NULL; pNode = NULL;
while (1) { while (1) {
pNode = sdbFetchRow(meterSdb, pNode, (void **)&pTable); pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) break; if (pTable == NULL) {
if (mgmtIsSuperTable(pTable)) pTable->numOfMeters = 0; break;
} }
pNode = NULL;
while (1) {
pLastNode = pNode;
pNode = sdbFetchRow(meterSdb, pNode, (void **)&pTable);
if (pTable == NULL) break;
pDb = mgmtGetDbByMeterId(pTable->meterId); SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId);
if (pDb == NULL) { if (pDb == NULL) {
mError("meter:%s, failed to get db, discard it", pTable->meterId, pTable->gid.vgId, pTable->gid.sid); mError("super table:%s, failed to get db, discard it", pTable->tableId);
pTable->gid.vgId = 0; sdbDeleteRow(tsSuperTableSdb, pTable);
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode; pNode = pLastNode;
continue; continue;
} }
pTable->numOfMeters = 0;
if (mgmtIsNormalTable(pTable)) {
pVgroup = mgmtGetVgroup(pTable->gid.vgId);
if (pVgroup == NULL) {
mError("meter:%s, failed to get vgroup:%d sid:%d, discard it", pTable->meterId, pTable->gid.vgId, pTable->gid.sid);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
if (strcmp(pVgroup->dbName, pDb->name) != 0) {
mError("meter:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
pTable->meterId, pDb->name, pTable->gid.vgId, pVgroup->dbName, pTable->gid.sid);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
if ( pVgroup->meterList == NULL) {
mError("meter:%s, vgroup:%d meterlist is null", pTable->meterId, pTable->gid.vgId);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
pVgroup->meterList[pTable->gid.sid] = pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->gid.sid, 1);
if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) {
pTable->pSql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns;
}
if (mgmtTableCreateFromSuperTable(pTable)) {
pTable->pTagData = (char *)pTable->schema; // + sizeof(SSchema)*pTable->numOfColumns;
pMetric = mgmtGetTable(pTable->pTagData);
if (pMetric) mgmtAddMeterIntoMetric(pMetric, pTable);
}
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct) mgmtAddMeterStatisticToAcct(pTable, pAcct);
} else {
if (pDb) mgmtAddMetricIntoDb(pDb, pTable);
}
} }
mgmtSetVgroupIdPool(); mgmtSetVgroupIdPool();
mTrace("meter is initialized"); mTrace("super table is initialized");
return 0; return 0;
} }
...@@ -321,11 +254,8 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) { ...@@ -321,11 +254,8 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
} }
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) { int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) {
SChildTableObj *pMetric; //TODO drop all child tables
while ((pMetric = pSuperTable->pHead) != NULL) { return sdbDeleteRow(tsSuperTableSdb, pSuperTable);
mgmtDropChildTable(pDb, pMetric);
}
sdbDeleteRow(tsSuperTableSdb, pMetric);
} }
SSuperTableObj* mgmtGetSuperTable(char *tableId) { SSuperTableObj* mgmtGetSuperTable(char *tableId) {
...@@ -477,7 +407,7 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pMetric, char *oldTagN ...@@ -477,7 +407,7 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pMetric, char *oldTagN
} }
static int32_t mgmtFindSuperTableColumnIndex(SNormalTableObj *pMetric, char *colName) { static int32_t mgmtFindSuperTableColumnIndex(SSuperTableObj *pMetric, char *colName) {
SSchema *schema = (SSchema *) pMetric->schema; SSchema *schema = (SSchema *) pMetric->schema;
for (int32_t i = 0; i < pMetric->numOfColumns; i++) { for (int32_t i = 0; i < pMetric->numOfColumns; i++) {
if (strcasecmp(schema[i].name, colName) == 0) { if (strcasecmp(schema[i].name, colName) == 0) {
...@@ -650,7 +580,7 @@ int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pCo ...@@ -650,7 +580,7 @@ int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pCo
while (numOfRows < rows) { while (numOfRows < rows) {
pTable = (SSuperTableObj *)pShow->pNode; pTable = (SSuperTableObj *)pShow->pNode;
if (pTable == NULL) break; if (pTable == NULL) break;
pShow->pNode = (void *)pTable->next; //pShow->pNode = (void *)pTable->next;
if (strncmp(pTable->tableId, prefix, prefixLen)) { if (strncmp(pTable->tableId, prefix, prefixLen)) {
continue; continue;
...@@ -706,8 +636,6 @@ int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable) { ...@@ -706,8 +636,6 @@ int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable) {
pMetric->pHead = pTable; pMetric->pHead = pTable;
pMetric->numOfMeters++; pMetric->numOfMeters++;
addMeterIntoMetricIndex(pMetric, pTable);
pthread_rwlock_unlock(&(pMetric->rwLock)); pthread_rwlock_unlock(&(pMetric->rwLock));
return 0; return 0;
...@@ -724,8 +652,6 @@ int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable) { ...@@ -724,8 +652,6 @@ int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable) {
pMetric->numOfMeters--; pMetric->numOfMeters--;
removeMeterFromMetricIndex(pMetric, pTable);
pthread_rwlock_unlock(&(pMetric->rwLock)); pthread_rwlock_unlock(&(pMetric->rwLock));
return 0; return 0;
......
...@@ -44,21 +44,6 @@ ...@@ -44,21 +44,6 @@
extern int64_t sdbVersion; extern int64_t sdbVersion;
void *meterSdb = NULL;
void *(*mgmtMeterActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
int32_t mgmtMeterAddTags(STabObj *pMetric, SSchema schema[], int ntags);
static void removeMeterFromMetricIndex(STabObj *pMetric, STabObj *pTable);
static void addMeterIntoMetricIndex(STabObj *pMetric, STabObj *pTable);
int32_t mgmtMeterDropTagByName(STabObj *pMetric, char *name);
int32_t mgmtMeterModifyTagNameByName(STabObj *pMetric, const char *oname, const char *nname);
int32_t mgmtMeterModifyTagValueByName(STabObj *pTable, char *tagName, char *nContent);
int32_t mgmtMeterAddColumn(STabObj *pTable, SSchema schema[], int ncols);
int32_t mgmtMeterDropColumnByName(STabObj *pTable, const char *name);
static int dropMeterImp(SDbObj *pDb, STabObj * pTable, SAcctObj *pAcct);
static void dropAllMetersOfMetric(SDbObj *pDb, STabObj * pMetric, SAcctObj *pAcct);
static int32_t mgmtGetReqTagsLength(STabObj *pMetric, int16_t *cols, int32_t numOfCols) { static int32_t mgmtGetReqTagsLength(STabObj *pMetric, int16_t *cols, int32_t numOfCols) {
assert(mgmtIsSuperTable(pMetric) && numOfCols >= 0 && numOfCols <= TSDB_MAX_TAGS + 1); assert(mgmtIsSuperTable(pMetric) && numOfCols >= 0 && numOfCols <= TSDB_MAX_TAGS + 1);
...@@ -87,98 +72,27 @@ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_ ...@@ -87,98 +72,27 @@ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_
} }
int mgmtInitMeters() { int mgmtInitMeters() {
void * pNode = NULL; int32_t code = mgmtInitSuperTables();
void * pLastNode = NULL; if (code != TSDB_CODE_SUCCESS) {
SVgObj * pVgroup = NULL; return code;
STabObj * pTable = NULL;
STabObj * pMetric = NULL;
SDbObj * pDb = NULL;
SAcctObj *pAcct = NULL;
// TODO: Make sure this function only run once
mgmtMeterActionInit();
meterSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN,
"meters", SDB_KEYTYPE_STRING, mgmtDirectory, mgmtMeterAction);
if (meterSdb == NULL) {
mError("failed to init meter data");
return -1;
} }
pNode = NULL; code = mgmtInitNormalTables();
while (1) { if (code != TSDB_CODE_SUCCESS) {
pNode = sdbFetchRow(meterSdb, pNode, (void **)&pTable); return code;
if (pTable == NULL) break;
if (mgmtIsSuperTable(pTable)) pTable->numOfMeters = 0;
} }
pNode = NULL; code = mgmtInitStreamTables();
while (1) { if (code != TSDB_CODE_SUCCESS) {
pLastNode = pNode; return code;
pNode = sdbFetchRow(meterSdb, pNode, (void **)&pTable);
if (pTable == NULL) break;
pDb = mgmtGetDbByMeterId(pTable->meterId);
if (pDb == NULL) {
mError("meter:%s, failed to get db, discard it", pTable->meterId, pTable->gid.vgId, pTable->gid.sid);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
if (mgmtIsNormalTable(pTable)) {
pVgroup = mgmtGetVgroup(pTable->gid.vgId);
if (pVgroup == NULL) {
mError("meter:%s, failed to get vgroup:%d sid:%d, discard it", pTable->meterId, pTable->gid.vgId, pTable->gid.sid);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
if (strcmp(pVgroup->dbName, pDb->name) != 0) {
mError("meter:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
pTable->meterId, pDb->name, pTable->gid.vgId, pVgroup->dbName, pTable->gid.sid);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
if ( pVgroup->meterList == NULL) {
mError("meter:%s, vgroup:%d meterlist is null", pTable->meterId, pTable->gid.vgId);
pTable->gid.vgId = 0;
sdbDeleteRow(meterSdb, pTable);
pNode = pLastNode;
continue;
}
pVgroup->meterList[pTable->gid.sid] = pTable;
taosIdPoolMarkStatus(pVgroup->idPool, pTable->gid.sid, 1);
if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) {
pTable->pSql = (char *)pTable->schema + sizeof(SSchema) * pTable->numOfColumns;
}
if (mgmtTableCreateFromSuperTable(pTable)) {
pTable->pTagData = (char *)pTable->schema; // + sizeof(SSchema)*pTable->numOfColumns;
pMetric = mgmtGetTable(pTable->pTagData);
if (pMetric) mgmtAddMeterIntoMetric(pMetric, pTable);
}
pAcct = mgmtGetAcct(pDb->cfg.acct);
if (pAcct) mgmtAddMeterStatisticToAcct(pTable, pAcct);
} else {
if (pDb) mgmtAddMetricIntoDb(pDb, pTable);
}
} }
mgmtSetVgroupIdPool(); code = mgmtInitChildTables();
if (code != TSDB_CODE_SUCCESS) {
return code;
}
mTrace("meter is initialized"); return TSDB_CODE_SUCCESS;
return 0;
} }
STableObj mgmtGetTable(char *tableId) { STableObj mgmtGetTable(char *tableId) {
...@@ -211,6 +125,11 @@ STableObj mgmtGetTable(char *tableId) { ...@@ -211,6 +125,11 @@ STableObj mgmtGetTable(char *tableId) {
return table; return table;
} }
STableObj mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid) {
STableObj table = {0};
return table;
}
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
STableObj table = mgmtGetTable(pCreate->meterId); STableObj table = mgmtGetTable(pCreate->meterId);
if (table.obj != NULL) { if (table.obj != NULL) {
...@@ -293,7 +212,7 @@ int mgmtDropTable(SDbObj *pDb, char *tableId, int ignore) { ...@@ -293,7 +212,7 @@ int mgmtDropTable(SDbObj *pDb, char *tableId, int ignore) {
} }
int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) { int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) {
STableObj table = mgmtGetTable(tableId); STableObj table = mgmtGetTable(pAlter->meterId);
if (table.obj == NULL) { if (table.obj == NULL) {
return TSDB_CODE_INVALID_TABLE; return TSDB_CODE_INVALID_TABLE;
} }
...@@ -503,9 +422,9 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon ...@@ -503,9 +422,9 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pTable->pTagData) { // if (pTable->pTagData) {
extractTableName(superTableId, pWrite); // extractTableName(superTableId, pWrite);
} // }
cols++; cols++;
numOfRows++; numOfRows++;
......
...@@ -31,34 +31,32 @@ bool mgmtIsSuperTable(STabObj* pTableObj) { ...@@ -31,34 +31,32 @@ bool mgmtIsSuperTable(STabObj* pTableObj) {
bool mgmtIsNormalTable(STabObj* pTableObj) { bool mgmtIsNormalTable(STabObj* pTableObj) {
return !mgmtIsSuperTable(pTableObj); return !mgmtIsSuperTable(pTableObj);
} }
//
///**
// * TODO: the tag offset value should be kept in memory to avoid dynamically calculating the value
// *
// * @param pTable
// * @param col
// * @param pTagColSchema
// * @return
// */
//char* mgmtTableGetTag(STabObj* pTable, int32_t col, SSchema* pTagColSchema) {
// if (!mgmtTableCreateFromSuperTable(pTable)) {
// return NULL;
// }
//
// STabObj* pSuperTable = mgmtGetTable(pTable->pTagData);
// int32_t offset = mgmtGetTagsLength(pSuperTable, col) + TSDB_TABLE_ID_LEN;
// assert(offset > 0);
//
// if (pTagColSchema != NULL) {
// *pTagColSchema = ((SSchema*)pSuperTable->schema)[pSuperTable->numOfColumns + col];
// }
//
// return (pTable->pTagData + offset);
//}
/** int32_t mgmtGetTagsLength(SSuperTableObj* pSuperTable, int32_t col) { // length before column col
* TODO: the tag offset value should be kept in memory to avoid dynamically calculating the value
*
* @param pTable
* @param col
* @param pTagColSchema
* @return
*/
char* mgmtTableGetTag(STabObj* pTable, int32_t col, SSchema* pTagColSchema) {
if (!mgmtTableCreateFromSuperTable(pTable)) {
return NULL;
}
STabObj* pSuperTable = mgmtGetTable(pTable->pTagData);
int32_t offset = mgmtGetTagsLength(pSuperTable, col) + TSDB_TABLE_ID_LEN;
assert(offset > 0);
if (pTagColSchema != NULL) {
*pTagColSchema = ((SSchema*)pSuperTable->schema)[pSuperTable->numOfColumns + col];
}
return (pTable->pTagData + offset);
}
int32_t mgmtGetTagsLength(STabObj* pSuperTable, int32_t col) { // length before column col
assert(mgmtIsSuperTable(pSuperTable) && col >= 0);
int32_t len = 0; int32_t len = 0;
int32_t tagColumnIndexOffset = pSuperTable->numOfColumns; int32_t tagColumnIndexOffset = pSuperTable->numOfColumns;
......
...@@ -74,7 +74,7 @@ int mgmtInitVgroups() { ...@@ -74,7 +74,7 @@ int mgmtInitVgroups() {
mgmtVgroupActionInit(); mgmtVgroupActionInit();
SVgObj tObj; SVgObj tObj;
tsVgUpdateSize = tObj.updateEnd - (char *)&tObj; tsVgUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
vgSdb = sdbOpenTable(tsMaxVGroups, sizeof(SVgObj), "vgroups", SDB_KEYTYPE_AUTO, mgmtDirectory, mgmtVgroupAction); vgSdb = sdbOpenTable(tsMaxVGroups, sizeof(SVgObj), "vgroups", SDB_KEYTYPE_AUTO, mgmtDirectory, mgmtVgroupAction);
if (vgSdb == NULL) { if (vgSdb == NULL) {
...@@ -294,14 +294,14 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { ...@@ -294,14 +294,14 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
STabObj *pTable = NULL; STabObj *pTable = NULL;
if (pShow->payloadLen > 0 ) { if (pShow->payloadLen > 0 ) {
pTable = mgmtGetTable(pShow->payload); // pTable = mgmtGetTable(pShow->payload);
if (NULL == pTable) { // if (NULL == pTable) {
return TSDB_CODE_INVALID_TABLE_ID; // return TSDB_CODE_INVALID_TABLE_ID;
} // }
//
pVgroup = mgmtGetVgroup(pTable->gid.vgId); // pVgroup = mgmtGetVgroup(pTable->gid.vgId);
if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID; // if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;
//
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
} else { } else {
SVgObj *pVgroup = pDb->pHead; SVgObj *pVgroup = pDb->pHead;
...@@ -476,7 +476,7 @@ void *mgmtVgroupActionUpdate(void *row, char *str, int size, int *ssize) { ...@@ -476,7 +476,7 @@ void *mgmtVgroupActionUpdate(void *row, char *str, int size, int *ssize) {
} }
void *mgmtVgroupActionEncode(void *row, char *str, int size, int *ssize) { void *mgmtVgroupActionEncode(void *row, char *str, int size, int *ssize) {
SVgObj *pVgroup = (SVgObj *)row; SVgObj *pVgroup = (SVgObj *)row;
int tsize = pVgroup->updateEnd - (char *)pVgroup; int tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
if (size < tsize) { if (size < tsize) {
*ssize = -1; *ssize = -1;
} else { } else {
...@@ -491,7 +491,7 @@ void *mgmtVgroupActionDecode(void *row, char *str, int size, int *ssize) { ...@@ -491,7 +491,7 @@ void *mgmtVgroupActionDecode(void *row, char *str, int size, int *ssize) {
if (pVgroup == NULL) return NULL; if (pVgroup == NULL) return NULL;
memset(pVgroup, 0, sizeof(SVgObj)); memset(pVgroup, 0, sizeof(SVgObj));
int tsize = pVgroup->updateEnd - (char *)pVgroup; int tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
memcpy(pVgroup, str, tsize); memcpy(pVgroup, str, tsize);
return (void *)pVgroup; return (void *)pVgroup;
...@@ -501,7 +501,7 @@ void *mgmtVgroupActionBatchUpdate(void *row, char *str, int size, int *ssize) { ...@@ -501,7 +501,7 @@ void *mgmtVgroupActionBatchUpdate(void *row, char *str, int size, int *ssize) {
void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; } void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssize) { return NULL; }
void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize) { void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize) {
SVgObj *pVgroup = (SVgObj *)row; SVgObj *pVgroup = (SVgObj *)row;
int tsize = pVgroup->updateEnd - (char *)pVgroup; int tsize = pVgroup->updateEnd - (int8_t *)pVgroup;
memcpy(pVgroup, str, tsize); memcpy(pVgroup, str, tsize);
......
...@@ -4,6 +4,7 @@ PROJECT(TDengine) ...@@ -4,6 +4,7 @@ PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "cJSON.h" #include "cJSON.h"
#include "gcJson.h" #include "gcJson.h"
#include "taosdef.h" #include "taosdef.h"
#include "tlog.h"
static HttpDecodeMethod gcDecodeMethod = {"grafana", gcProcessRequest}; static HttpDecodeMethod gcDecodeMethod = {"grafana", gcProcessRequest};
static HttpEncodeMethod gcHeartBeatMethod = {NULL, gcSendHeartBeatResp, NULL, NULL, NULL, NULL, NULL, NULL}; static HttpEncodeMethod gcHeartBeatMethod = {NULL, gcSendHeartBeatResp, NULL, NULL, NULL, NULL, NULL, NULL};
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "tglobalcfg.h" #include "tglobalcfg.h"
#include "tsocket.h" #include "tsocket.h"
#include "ttimer.h" #include "ttimer.h"
#include "tlog.h"
void httpToLowerUrl(char* url) { void httpToLowerUrl(char* url) {
/*ignore case */ /*ignore case */
......
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
#include "httpJson.h" #include "httpJson.h"
#include "httpResp.h" #include "httpResp.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h"
#include "taoserror.h"
#define MAX_NUM_STR_SZ 25 #define MAX_NUM_STR_SZ 25
...@@ -451,7 +453,7 @@ void httpJsonPairStatus(JsonBuf* buf, int code) { ...@@ -451,7 +453,7 @@ void httpJsonPairStatus(JsonBuf* buf, int code) {
} else if (code == TSDB_CODE_INVALID_TABLE) { } else if (code == TSDB_CODE_INVALID_TABLE) {
httpJsonPair(buf, "desc", 4, "failed to create table", 22); httpJsonPair(buf, "desc", 4, "failed to create table", 22);
} else } else
httpJsonPair(buf, "desc", 4, tstrerror(code), (int)strlen(tstrerror(code))); httpJsonPair(buf, "desc", 4, (char*)tstrerror(code), (int)strlen(tstrerror(code)));
} }
} }
} }
\ No newline at end of file
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
#include "httpCode.h" #include "httpCode.h"
#include "httpJson.h" #include "httpJson.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h"
#include "taoserror.h"
const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"}; const char *httpKeepAliveStr[] = {"", "Connection: Keep-Alive\r\n", "Connection: Close\r\n"};
...@@ -182,7 +184,7 @@ void httpSendErrorResp(HttpContext *pContext, int errNo) { httpSendErrorRespWith ...@@ -182,7 +184,7 @@ void httpSendErrorResp(HttpContext *pContext, int errNo) { httpSendErrorRespWith
void httpSendTaosdErrorResp(HttpContext *pContext, int errCode) { void httpSendTaosdErrorResp(HttpContext *pContext, int errCode) {
int httpCode = 400; int httpCode = 400;
httpSendErrorRespImp(pContext, httpCode, "Bad Request", errCode, tstrerror(errCode)); httpSendErrorRespImp(pContext, httpCode, "Bad Request", errCode, (char*)tstrerror(errCode));
} }
void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char* errMsg) { void httpSendTaosdInvalidSqlErrorResp(HttpContext *pContext, char* errMsg) {
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "taos.h" #include "taos.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "tlog.h"
void httpAccessSession(HttpContext *pContext) { void httpAccessSession(HttpContext *pContext) {
HttpServer *server = pContext->pThread->pServer; HttpServer *server = pContext->pThread->pServer;
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "httpHandle.h" #include "httpHandle.h"
#include "restHandle.h" #include "restHandle.h"
#include "tgHandle.h" #include "tgHandle.h"
#include "tlog.h"
#ifdef CLUSTER #ifdef CLUSTER
void adminInitHandle(HttpServer* pServer); void adminInitHandle(HttpServer* pServer);
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "shash.h" #include "shash.h"
#include "taos.h" #include "taos.h"
#include "tlog.h"
bool httpCheckUsedbSql(char *sql) { bool httpCheckUsedbSql(char *sql) {
if (strstr(sql, "use ") != NULL) { if (strstr(sql, "use ") != NULL) {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "restHandle.h" #include "restHandle.h"
#include "restJson.h" #include "restJson.h"
#include "tlog.h"
static HttpDecodeMethod restDecodeMethod = {"rest", restProcessRequest}; static HttpDecodeMethod restDecodeMethod = {"rest", restProcessRequest};
static HttpDecodeMethod restDecodeMethod2 = {"restful", restProcessRequest}; static HttpDecodeMethod restDecodeMethod2 = {"restful", restProcessRequest};
...@@ -70,8 +71,7 @@ bool restProcessSqlRequest(HttpContext* pContext, int timestampFmt) { ...@@ -70,8 +71,7 @@ bool restProcessSqlRequest(HttpContext* pContext, int timestampFmt) {
/* /*
* for async test * for async test
* / *
/*
if (httpCheckUsedbSql(sql)) { if (httpCheckUsedbSql(sql)) {
httpSendErrorResp(pContext, HTTP_NO_EXEC_USEDB); httpSendErrorResp(pContext, HTTP_NO_EXEC_USEDB);
return false; return false;
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include <sys/time.h> #include <sys/time.h>
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
#include "tlog.h"
#include "httpJson.h" #include "httpJson.h"
#include "restHandle.h" #include "restHandle.h"
#include "restJson.h" #include "restJson.h"
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tgJson.h" #include "tgJson.h"
#include "taosdef.h" #include "taosdef.h"
#include "tlog.h"
/* /*
* taos.telegraf.cfg formats like * taos.telegraf.cfg formats like
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tgHandle.h" #include "tgHandle.h"
#include "tgJson.h" #include "tgJson.h"
#include "tlog.h"
void tgInitQueryJson(HttpContext *pContext) { void tgInitQueryJson(HttpContext *pContext) {
JsonBuf *jsonBuf = httpMallocJsonBuf(pContext); JsonBuf *jsonBuf = httpMallocJsonBuf(pContext);
......
...@@ -4,6 +4,7 @@ PROJECT(TDengine) ...@@ -4,6 +4,7 @@ PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(./src SRC) AUX_SOURCE_DIRECTORY(./src SRC)
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <stdarg.h> #include <stdarg.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "dnode.h"
#include "monitorSystem.h" #include "monitorSystem.h"
#include "tsclient.h" #include "tsclient.h"
#include "taosdef.h" #include "taosdef.h"
......
...@@ -53,6 +53,10 @@ SSdbTable *tableList[20]; ...@@ -53,6 +53,10 @@ SSdbTable *tableList[20];
int sdbNumOfTables; int sdbNumOfTables;
int64_t sdbVersion; int64_t sdbVersion;
int64_t sdbGetVersion() {
return sdbVersion;
};
void sdbFinishCommit(void *handle) { void sdbFinishCommit(void *handle) {
SSdbTable *pTable = (SSdbTable *)handle; SSdbTable *pTable = (SSdbTable *)handle;
uint32_t sdbEcommit = SDB_ENDCOMMIT; uint32_t sdbEcommit = SDB_ENDCOMMIT;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册