提交 7fb3fd98 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

Merge branch '2.0' of https://github.com/taosdata/TDengine into 2.0

...@@ -545,7 +545,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { ...@@ -545,7 +545,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
* There is not response callback function for submit response. * There is not response callback function for submit response.
* The actual inserted number of points is the first number. * The actual inserted number of points is the first number.
*/ */
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP) { if (pMsg->msgType == TSDB_MSG_TYPE_DNODE_SUBMIT_RSP) {
pRes->numOfRows += *(int32_t *)pRes->pRsp; pRes->numOfRows += *(int32_t *)pRes->pRsp;
tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
...@@ -1464,7 +1464,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1464,7 +1464,7 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(pQueryInfo->type); pMsg += sizeof(pQueryInfo->type);
pSql->cmd.payloadLen = pMsg - pStart; pSql->cmd.payloadLen = pMsg - pStart;
pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE; pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_RETRIEVE;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1503,7 +1503,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1503,7 +1503,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_SUBMIT;
tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip),
htons(pShellMsg->vnode)); htons(pShellMsg->vnode));
...@@ -1900,7 +1900,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1900,7 +1900,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_QUERY;
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= size);
...@@ -2041,7 +2041,7 @@ int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2041,7 +2041,7 @@ int32_t tscBuildCfgDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(SCfgMsg); pMsg += sizeof(SCfgMsg);
pCmd->payloadLen = pMsg - pStart; pCmd->payloadLen = pMsg - pStart;
pCmd->msgType = TSDB_MSG_TYPE_CFG_PNODE; pCmd->msgType = TSDB_MSG_TYPE_DNODE_CFG;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2480,7 +2480,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2480,7 +2480,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE; pCmd->msgType = TSDB_MSG_TYPE_DNODE_RETRIEVE;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2660,7 +2660,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2660,7 +2660,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_METERINFO; pCmd->msgType = TSDB_MSG_TYPE_TABLE_META;
tfree(tmpData); tfree(tmpData);
...@@ -2698,7 +2698,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2698,7 +2698,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tfree(tmpData); tfree(tmpData);
pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiMeterInfoMsg); pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiMeterInfoMsg);
pCmd->msgType = TSDB_MSG_TYPE_MULTI_METERINFO; pCmd->msgType = TSDB_MSG_TYPE_MULTI_TABLE_META;
assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize); assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);
...@@ -2866,7 +2866,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -2866,7 +2866,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_METRIC_META; pCmd->msgType = TSDB_MSG_TYPE_STABLE_META;
assert(msgLen + minMsgSize() <= size); assert(msgLen + minMsgSize() <= size);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -118,7 +118,7 @@ bool tscQueryOnMetric(SSqlCmd* pCmd) { ...@@ -118,7 +118,7 @@ bool tscQueryOnMetric(SSqlCmd* pCmd) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) && return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) &&
(pCmd->msgType == TSDB_MSG_TYPE_QUERY); (pCmd->msgType == TSDB_MSG_TYPE_DNODE_QUERY);
} }
bool tscQueryMetricTags(SQueryInfo* pQueryInfo) { bool tscQueryMetricTags(SQueryInfo* pQueryInfo) {
......
...@@ -25,10 +25,10 @@ extern "C" { ...@@ -25,10 +25,10 @@ extern "C" {
#include "tsched.h" #include "tsched.h"
#include "dnode.h" #include "dnode.h"
int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj *pObj); void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
extern void *dmQhandle; extern void *dmQhandle;
......
...@@ -24,19 +24,37 @@ extern "C" { ...@@ -24,19 +24,37 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "dnodeShell.h"
void dnodeFreeQInfoInQueue(SShellObj *pShellObj); /*
* Clear query information associated with this connection
*/
void dnodeFreeQInfo(void *pConn);
/*
* Clear all query informations
*/
void dnodeFreeQInfos();
/*
* handle query message, and the result is returned by callback function
*/
void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn));
/* /*
* Dnode handle read messages * Dispose retrieve msg, and the result will passed through callback function
* The processing result is returned by callback function with pShellObj parameter
*/ */
int32_t dnodeReadData(SQueryMeterMsg *msg, void *pShellObj, void (*callback)(SQueryMeterRsp *rspMsg, void *pShellObj)); typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, void *pQInfo, void *pConn);
void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp);
typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, SRetrieveMeterRsp *pRetrieveRspMsg, void *pShellObj); /*
* Fill retrieve result according to query info
*/
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp);
void dnodeRetrieveData(SRetrieveMeterMsg *pMsg, int32_t msgLen, void *pShellObj, SDnodeRetrieveCallbackFp callback); /*
* Get the size of retrieve result according to query info
*/
int32_t dnodeGetRetrieveDataSize(void *pQInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,17 +27,12 @@ extern "C" { ...@@ -27,17 +27,12 @@ extern "C" {
/* /*
* Write data based on dnode, the detail result can be fetched from rsponse * Write data based on dnode, the detail result can be fetched from rsponse
* pSubmitMsg: Data to be written * pSubmit: Data to be written
* pShellObj: Used to pass a communication handle * pConn: Communication handle
* callback: Pass the write result through a callback function, possibly in a different thread space * callback: Pass the write result through a callback function, possibly in a different thread space
* rsp: will not be freed by callback function * rsp: will not be freed by callback function
*/ */
void dnodeWriteData(SShellSubmitMsg *pMsg, void *pShellObj, void (*callback)(SShellSubmitRspMsg *rsp, void *pShellObj)); void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn));
/*
* Check if table already exists
*/
int32_t dnodeCheckTableExist(char *tableId);
/* /*
* Create noraml table with specified configuration and open it * Create noraml table with specified configuration and open it
......
...@@ -22,28 +22,28 @@ ...@@ -22,28 +22,28 @@
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tsched.h" #include "tsched.h"
#include "tsystem.h" #include "tsystem.h"
#include "vnode.h"
#include "vnodeSystem.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
SMgmtObj mgmtObj; SMgmtObj mgmtObj;
extern uint64_t tsCreatedTime; extern uint64_t tsCreatedTime;
int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj); int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen); int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen);
int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj); int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj); int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessMeterCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj); int dnodeProcessTableCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj);
int vnodeProcessCfgDnodeRequest(char *cont, int contLen, SMgmtObj *pMgmtObj); int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj);
int vnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj); int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj);
void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables); void vnodeUpdateHeadFile(int vnode, int oldTables, int newTables);
void vnodeOpenVnode(int vnode); void vnodeOpenVnode(int vnode);
void vnodeCleanUpOneVnode(int vnode); void vnodeCleanUpOneVnode(int vnode);
static int (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeInitProcessShellMsg();
char *taosBuildRspMsgToMnodeWithSizeImp(SMgmtObj *pObj, char type, int size) { char *taosBuildRspMsgToMnodeWithSizeImp(SMgmtObj *pObj, char type, int size) {
char *pStart = (char *)malloc(size); char *pStart = (char *)malloc(size);
if (pStart == NULL) { if (pStart == NULL) {
...@@ -106,48 +106,40 @@ int taosSendSimpleRspToMnodeImp(SMgmtObj *pObj, char rsptype, char code) { ...@@ -106,48 +106,40 @@ int taosSendSimpleRspToMnodeImp(SMgmtObj *pObj, char rsptype, char code) {
} }
int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code) = taosSendSimpleRspToMnodeImp; int (*taosSendSimpleRspToMnode)(SMgmtObj *pObj, char rsptype, char code) = taosSendSimpleRspToMnodeImp;
int32_t dnodeInitMgmtImp() { return 0; } int32_t dnodeInitMgmtImp() {
dnodeInitProcessShellMsg();
return 0;
}
int32_t (*dnodeInitMgmt)() = dnodeInitMgmtImp; int32_t (*dnodeInitMgmt)() = dnodeInitMgmtImp;
void dnodeInitMgmtIpImp() {} void dnodeInitMgmtIpImp() {}
void (*dnodeInitMgmtIp)() = dnodeInitMgmtIpImp; void (*dnodeInitMgmtIp)() = dnodeInitMgmtIpImp;
void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched) { void dnodeProcessMsgFromMgmtImp(SSchedMsg *sched) {
char msgType = *sched->msg; int32_t msgType = *(int32_t*)(sched->msg);
char *content = sched->msg + 1; int8_t *content = sched->msg + sizeof(int32_t);
dTrace("msg:%s is received from mgmt", taosMsg[(uint8_t)msgType]); dTrace("msg:%s is received from mgmt", taosMsg[msgType]);
dnodeDistributeMsgFromMgmt(content, 0, msgType, NULL);
dnodeDistributeMsgFromMgmt(content, 0, msgType, 0);
free(sched->msg); free(sched->msg);
} }
void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj *pObj) { void dnodeDistributeMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn) {
if (msgType == TSDB_MSG_TYPE_CREATE) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
vnodeProcessCreateMeterRequest(content, msgLen, pObj); dError("invalid msg type:%d", msgType);
} else if (msgType == TSDB_MSG_TYPE_VPEERS) {
vnodeProcessVPeersMsg(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_VPEER_CFG_RSP) {
vnodeProcessVPeerCfgRsp(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_METER_CFG_RSP) {
vnodeProcessMeterCfgRsp(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_REMOVE) {
vnodeProcessRemoveMeterRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_FREE_VNODE) {
vnodeProcessFreeVnodeRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_CFG_PNODE) {
vnodeProcessCfgDnodeRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM) {
vnodeProcessAlterStreamRequest(content, msgLen, pObj);
} else if (msgType == TSDB_MSG_TYPE_GRANT_RSP) {
// do nothing
} else { } else {
dError("%s is not processed", taosMsg[msgType]); if (dnodeProcessShellMsgFp[msgType]) {
(*dnodeProcessShellMsgFp[msgType])(pConn, contLen, pConn);
} else {
dError("%s is not processed", taosMsg[msgType]);
}
} }
} }
int vnodeProcessMeterCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) { int dnodeProcessTableCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) {
int code = *pMsg; int code = *pMsg;
if (code == 0) { if (code == 0) {
...@@ -166,7 +158,7 @@ int vnodeProcessMeterCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) { ...@@ -166,7 +158,7 @@ int vnodeProcessMeterCfgRsp(char *pMsg, int msgLen, SMgmtObj *pObj) {
return 0; return 0;
} }
int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { int dnodeProcessCreateTableRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
SCreateMsg *pCreate; SCreateMsg *pCreate;
int code = 0; int code = 0;
int vid; int vid;
...@@ -196,12 +188,12 @@ int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { ...@@ -196,12 +188,12 @@ int vnodeProcessCreateMeterRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
// } // }
_over: _over:
taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_CREATE_RSP, code); taosSendSimpleRspToMnode(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP, code);
return code; return code;
} }
int vnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) { int dnodeProcessAlterStreamRequest(char *pMsg, int msgLen, SMgmtObj *pObj) {
SAlterStreamMsg *pAlter; SAlterStreamMsg *pAlter;
int code = 0; int code = 0;
int vid, sid; int vid, sid;
...@@ -356,7 +348,7 @@ _create_over: ...@@ -356,7 +348,7 @@ _create_over:
return code; return code;
} }
int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { int dnodeProcessRemoveTableRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
SMeterObj * pObj; SMeterObj * pObj;
SRemoveMeterMsg *pRemove; SRemoveMeterMsg *pRemove;
int code = 0; int code = 0;
...@@ -476,7 +468,7 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -476,7 +468,7 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
return 0; return 0;
} }
int vnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) { int dnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
STaosRsp *pRsp; STaosRsp *pRsp;
pRsp = (STaosRsp *)msg; pRsp = (STaosRsp *)msg;
...@@ -497,7 +489,7 @@ int vnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -497,7 +489,7 @@ int vnodeProcessVPeerCfgRsp(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
return 0; return 0;
} }
int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { int dnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
int code = 0; int code = 0;
code = vnodeProcessVPeerCfg(msg, msgLen, pMgmtObj); code = vnodeProcessVPeerCfg(msg, msgLen, pMgmtObj);
...@@ -506,7 +498,7 @@ int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -506,7 +498,7 @@ int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
STaosRsp * pRsp; STaosRsp * pRsp;
SVPeersMsg *pVPeersMsg = (SVPeersMsg *)msg; SVPeersMsg *pVPeersMsg = (SVPeersMsg *)msg;
pStart = taosBuildRspMsgToMnode(pMgmtObj, TSDB_MSG_TYPE_VPEERS_RSP); pStart = taosBuildRspMsgToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP);
if (pStart == NULL) return -1; if (pStart == NULL) return -1;
pRsp = (STaosRsp *)pStart; pRsp = (STaosRsp *)pStart;
...@@ -519,7 +511,7 @@ int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -519,7 +511,7 @@ int vnodeProcessVPeersMsg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
return code; return code;
} }
int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { int dnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
SFreeVnodeMsg *pFree; SFreeVnodeMsg *pFree;
pFree = (SFreeVnodeMsg *)pMsg; pFree = (SFreeVnodeMsg *)pMsg;
...@@ -534,16 +526,16 @@ int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) { ...@@ -534,16 +526,16 @@ int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
int32_t code = vnodeRemoveVnode(pFree->vnode); int32_t code = vnodeRemoveVnode(pFree->vnode);
assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS); assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS);
taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_FREE_VNODE_RSP, code); taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP, code);
return 0; return 0;
} }
int vnodeProcessCfgDnodeRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) { int dnodeProcessDnodeCfgRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) {
SCfgMsg *pCfg = (SCfgMsg *)cont; SCfgMsg *pCfg = (SCfgMsg *)cont;
int code = tsCfgDynamicOptions(pCfg->config); int code = tsCfgDynamicOptions(pCfg->config);
taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_CFG_PNODE_RSP, code); taosSendSimpleRspToMnode(pMgmtObj, TSDB_MSG_TYPE_DNODE_CFG_RSP, code);
return 0; return 0;
} }
...@@ -554,7 +546,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) { ...@@ -554,7 +546,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) {
SVpeerCfgMsg *pCfg; SVpeerCfgMsg *pCfg;
SMgmtObj * pObj = &mgmtObj; SMgmtObj * pObj = &mgmtObj;
pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_VPEER_CFG); pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_VNODE_CFG);
if (pStart == NULL) return; if (pStart == NULL) return;
pMsg = pStart; pMsg = pStart;
...@@ -572,7 +564,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { ...@@ -572,7 +564,7 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
SMeterCfgMsg *pCfg; SMeterCfgMsg *pCfg;
SMgmtObj * pObj = &mgmtObj; SMgmtObj * pObj = &mgmtObj;
pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_METER_CFG); pStart = taosBuildReqMsgToMnode(pObj, TSDB_MSG_TYPE_TABLE_CFG);
if (pStart == NULL) return -1; if (pStart == NULL) return -1;
pMsg = pStart; pMsg = pStart;
...@@ -585,3 +577,18 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { ...@@ -585,3 +577,18 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) {
return taosSendMsgToMnode(pObj, pStart, msgLen); return taosSendMsgToMnode(pObj, pStart, msgLen);
} }
void dnodeInitProcessShellMsg() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE] = dnodeProcessCreateTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE] = dnodeProcessRemoveTableRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_VPEERS] = dnodeProcessVPeersMsg;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_TABLE_CFG_RSP] = dnodeProcessTableCfgRsp;
}
\ No newline at end of file
...@@ -17,45 +17,51 @@ ...@@ -17,45 +17,51 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "dnodeWrite.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeRead.h" #include "dnodeRead.h"
#include "dnodeSystem.h" #include "dnodeSystem.h"
void dnodeFreeQInfoInQueue(SShellObj *pShellObj) { void dnodeFreeQInfo(void *pConn) {}
}
void dnodeFreeQInfos() {}
void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)) {
void *pQInfo = NULL;
int code = TSDB_CODE_SUCCESS;
callback(code, pConn, pQInfo);
}
void dnodeExecuteRetrieveData(SSchedMsg *pSched) { static void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg; SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg;
SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle; SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle;
SShellObj *pObj = (SShellObj *)pSched->ahandle; void *pConn = pSched->ahandle;
SRetrieveMeterRsp result = {0};
//examples
/* int32_t code = TSDB_CODE_INVALID_QHANDLE;
* in case of server restart, apps may hold qhandle created by server before restart, void *pQInfo = NULL; //get from pConn
* which is actually invalid, therefore, signature check is required. (*callback)(code, NULL, pConn);
*/
if (pRetrieve->qhandle != (uint64_t)pObj->qhandle) {
// if free flag is set, client wants to clean the resources
dError("QInfo:%p, qhandle:%p is not matched with saved:%p", pObj->qhandle, pRetrieve->qhandle, pObj->qhandle);
int32_t code = TSDB_CODE_INVALID_QHANDLE;
(*callback)(code, &result, pObj);
}
//TODO build response here //TODO build response here
free(pSched->msg); free(pSched->msg);
} }
void dnodeRetrieveData(SRetrieveMeterMsg *pMsg, int32_t msgLen, void *pShellObj, SDnodeRetrieveCallbackFp callback) { void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) {
int8_t *msg = malloc(msgLen); int8_t *msg = malloc(sizeof(pRetrieve));
memcpy(msg, pMsg, msgLen); memcpy(msg, pRetrieve, sizeof(pRetrieve));
SSchedMsg schedMsg; SSchedMsg schedMsg;
schedMsg.msg = msg; schedMsg.msg = msg;
schedMsg.ahandle = pShellObj; schedMsg.ahandle = pConn;
schedMsg.thandle = callback; schedMsg.thandle = callbackFp;
schedMsg.fp = dnodeExecuteRetrieveData; schedMsg.fp = dnodeExecuteRetrieveData;
taosScheduleTask(tsQueryQhandle, &schedMsg); taosScheduleTask(tsQueryQhandle, &schedMsg);
} }
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp) {
return 0;
}
int32_t dnodeGetRetrieveDataSize(void *pQInfo) {}
...@@ -32,334 +32,155 @@ ...@@ -32,334 +32,155 @@
#include "dnodeUtil.h" #include "dnodeUtil.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
static void dnodeProcessRetrieveRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj); static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeProcessQueryRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj); static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn);
static void dnodeProcessShellSubmitRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj); static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn);
static void *tsDnodeShellServer = NULL;
static SShellObj *tsDnodeShellList = NULL;
static int32_t tsDnodeSelectReqNum = 0;
static int32_t tsDnodeInsertReqNum = 0;
static int32_t tsDnodeShellConns = 0;
#define NUM_OF_SESSIONS_PER_VNODE (300)
#define NUM_OF_SESSIONS_PER_DNODE (NUM_OF_SESSIONS_PER_VNODE * TSDB_MAX_VNODES)
void *dnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
int sid, vnode;
SShellObj *pObj = (SShellObj *)ahandle;
SIntMsg * pMsg = (SIntMsg *)msg;
uint32_t peerId, peerIp;
uint16_t peerPort;
char ipstr[20];
if (msg == NULL) {
if (pObj) {
pObj->thandle = NULL;
dTrace("QInfo:%p %s free qhandle", pObj->qhandle, __FUNCTION__);
dnodeFreeQInfoInQueue(pObj);
pObj->qhandle = NULL;
tsDnodeShellConns--;
dTrace("shell connection:%d is gone, shellConns:%d", pObj->sid, tsDnodeShellConns);
}
return NULL;
}
taosGetRpcConnInfo(thandle, &peerId, &peerIp, &peerPort, &vnode, &sid); static void *tsDnodeShellServer = NULL;
static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
if (pObj == NULL) { void dnodeProcessMsgFromShell(int32_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) {
pObj = tsDnodeShellList + sid; assert(handle != NULL);
pObj->thandle = thandle;
pObj->sid = sid;
pObj->ip = peerIp;
tinet_ntoa(ipstr, peerIp);
tsDnodeShellConns--;
dTrace("shell connection:%d from ip:%s is created, shellConns:%d", sid, ipstr, tsDnodeShellConns);
} else {
if (pObj != tsDnodeShellList + sid) {
dError("shell connection:%d, pObj:%p is not matched with:%p", sid, pObj, tsDnodeShellList + sid);
return NULL;
}
}
dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle); if (pCont == NULL || contLen == 0) {
dnodeFreeQInfo(handle);
dTrace("conn:%p, free query info", handle);
return;
}
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); rpcSendSimpleRsp(handle, TSDB_CODE_NOT_READY);
dTrace("sid:%d, shell query msg is ignored since dnode not running", sid); dTrace("conn:%p, query msg is ignored since dnode not running", handle);
return pObj; return;
} }
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { dTrace("conn:%p, msg:%s is received", handle, taosMsg[msgType]);
dnodeProcessQueryRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj);
} else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { if (msgType == TSDB_MSG_TYPE_DNODE_QUERY) {
dnodeProcessRetrieveRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); dnodeProcessQueryRequest(pCont, contLen, handle);
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { } else if (msgType == TSDB_MSG_TYPE_DNODE_RETRIEVE) {
dnodeProcessShellSubmitRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); dnodeProcessRetrieveRequest(pCont, contLen, handle);
} else if (msgType == TSDB_MSG_TYPE_DNODE_SUBMIT) {
dnodeProcessShellSubmitRequest(pCont, contLen, handle);
} else { } else {
dError("%s is not processed", taosMsg[pMsg->msgType]); dError("conn:%p, msg:%s is not processed", handle, taosMsg[msgType]);
} }
return pObj;
} }
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
SRpcInit rpcInit; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
numOfThreads = (1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0;
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
} }
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp;
rpcInit.localIp = tsAnyIp ? "0.0.0.0" : tsPrivateIp; rpcInit.localPort = tsVnodeShellPort;
rpcInit.label = "DND-shell";
rpcInit.localPort = tsVnodeShellPort;
rpcInit.label = "DND-shell";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = numOfThreads;
rpcInit.fp = dnodeProcessMsgFromShell; rpcInit.fp = dnodeProcessMsgFromShell;
rpcInit.bits = TSDB_SHELL_VNODE_BITS; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
rpcInit.numOfChanns = TSDB_MAX_VNODES; rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.sessionsPerChann = 16; rpcInit.idleTime = tsShellActivityTimer * 2000;
rpcInit.idMgmt = TAOS_ID_FREE;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S();
rpcInit.idleTime = tsShellActivityTimer * 2000;
rpcInit.qhandle = tsRpcQhandle[0];
//rpcInit.efp = vnodeSendVpeerCfgMsg;
tsDnodeShellServer = taosOpenRpc(&rpcInit);
if (tsDnodeShellServer == NULL) {
dError("failed to init connection to shell");
return -1;
}
tsDnodeShellServer = rpcOpen(&rpcInit);
const int32_t size = NUM_OF_SESSIONS_PER_DNODE * sizeof(SShellObj); if (tsDnodeShellServer == NULL) {
tsDnodeShellList = (SShellObj *)malloc(size); dError("failed to init connection from shell");
if (tsDnodeShellList == NULL) {
dError("failed to allocate shellObj, sessions:%d", NUM_OF_SESSIONS_PER_DNODE);
return -1;
}
memset(tsDnodeShellList, 0, size);
// TODO re initialize tsRpcQhandle
if(taosOpenRpcChannWithQ(tsDnodeShellServer, 0, NUM_OF_SESSIONS_PER_DNODE, tsRpcQhandle) != TSDB_CODE_SUCCESS) {
dError("sessions:%d, failed to open shell", NUM_OF_SESSIONS_PER_DNODE);
return -1; return -1;
} }
dError("sessions:%d, shell is opened", NUM_OF_SESSIONS_PER_DNODE); dPrint("shell is opened");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeCleanupShell() { void dnodeCleanupShell() {
if (tsDnodeShellServer) { if (tsDnodeShellServer) {
taosCloseRpc(tsDnodeShellServer); rpcClose(tsDnodeShellServer);
} }
for (int i = 0; i < NUM_OF_SESSIONS_PER_DNODE; ++i) { dnodeFreeQInfos();
dnodeFreeQInfoInQueue(tsDnodeShellList+i);
}
//tfree(tsDnodeShellList);
} }
int vnodeSendQueryRspMsg(SShellObj *pObj, int code, void *qhandle) { void dnodeProcessQueryRequestCb(int code, void *pQInfo, void *pConn) {
char *pMsg, *pStart; int32_t contLen = sizeof(SQueryMeterRsp);
int msgLen; SQueryMeterRsp *queryRsp = (SQueryMeterRsp *) rpcMallocCont(contLen);
if (queryRsp == NULL) {
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_QUERY_RSP, 128); return;
if (pStart == NULL) return -1; }
pMsg = pStart;
*pMsg = code;
pMsg++;
*((uint64_t *)pMsg) = (uint64_t)qhandle; dTrace("conn:%p, query data, code:%d pQInfo:%p", pConn, code, pQInfo);
pMsg += 8;
msgLen = pMsg - pStart; queryRsp->code = htonl(code);
taosSendMsgToPeer(pObj->thandle, pStart, msgLen); queryRsp->qhandle = (uint64_t) (pQInfo);
return msgLen; rpcSendResponse(pConn, queryRsp, contLen);
} }
int32_t dnodeSendShellSubmitRspMsg(SShellObj *pObj, int32_t code, int32_t numOfPoints) { static void dnodeProcessQueryRequest(int8_t *pCont, int32_t contLen, void *pConn) {
char *pMsg, *pStart; atomic_fetch_add_32(&tsDnodeQueryReqNum, 1);
int msgLen; dTrace("conn:%p, start to query data", pConn);
dTrace("code:%d numOfTotalPoints:%d", code, numOfPoints);
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_SUBMIT_RSP, 128);
if (pStart == NULL) return -1;
pMsg = pStart;
*pMsg = code;
pMsg++;
*(int32_t *)pMsg = numOfPoints; SQueryMeterMsg *pQuery = (SQueryMeterMsg *) pCont;
pMsg += sizeof(numOfPoints); dnodeQueryData(pQuery, pConn, dnodeProcessQueryRequestCb);
msgLen = pMsg - pStart;
taosSendMsgToPeer(pObj->thandle, pStart, msgLen);
return msgLen;
} }
int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { void dnodeProcessRetrieveRequestCb(int32_t code, void *pQInfo, void *pConn) {
int ret, code = 0; dTrace("conn:%p, retrieve data, code:%d", pConn, code);
SQueryMeterMsg * pQueryMsg;
SMeterSidExtInfo **pSids = NULL;
int32_t incNumber = 0;
SSqlFunctionExpr * pExprs = NULL;
SSqlGroupbyExpr * pGroupbyExpr = NULL;
SMeterObj ** pMeterObjList = NULL;
pQueryMsg = (SQueryMeterMsg *)pMsg;
if ((code = vnodeConvertQueryMeterMsg(pQueryMsg)) != TSDB_CODE_SUCCESS) {
goto _query_over;
}
if (pQueryMsg->numOfSids <= 0) { assert(pConn != NULL);
dError("Invalid number of meters to query, numOfSids:%d", pQueryMsg->numOfSids); if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_INVALID_QUERY_MSG; rpcSendSimpleRsp(pConn, code);
goto _query_over; return;
}
if (pQueryMsg->vnode >= TSDB_MAX_VNODES || pQueryMsg->vnode < 0) {
dError("qmsg:%p,vid:%d is out of range", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
SVnodeObj *pVnode = &vnodeList[pQueryMsg->vnode];
if (pVnode->cfg.maxSessions == 0) {
dError("qmsg:%p,vid:%d is not activated yet", pQueryMsg, pQueryMsg->vnode);
vnodeSendVpeerCfgMsg(pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _query_over;
}
if (!(pVnode->accessState & TSDB_VN_READ_ACCCESS)) {
dError("qmsg:%p,vid:%d access not allowed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NO_READ_ACCESS;
goto _query_over;
}
if (pVnode->meterList == NULL) {
dError("qmsg:%p,vid:%d has been closed", pQueryMsg, pQueryMsg->vnode);
code = TSDB_CODE_NOT_ACTIVE_VNODE;
goto _query_over;
}
if (pQueryMsg->pSidExtInfo == 0) {
dError("qmsg:%p,SQueryMeterMsg wrong format", pQueryMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
pSids = (SMeterSidExtInfo **)pQueryMsg->pSidExtInfo;
for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) {
if (pSids[i]->sid >= pVnode->cfg.maxSessions || pSids[i]->sid < 0) {
dError("qmsg:%p sid:%d out of range, valid range:[%d,%d]", pQueryMsg, pSids[i]->sid, 0, pVnode->cfg.maxSessions);
code = TSDB_CODE_INVALID_TABLE_ID;
goto _query_over;
}
}
// todo optimize for single table query process
pMeterObjList = (SMeterObj **)calloc(pQueryMsg->numOfSids, sizeof(SMeterObj *));
if (pMeterObjList == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _query_over;
}
//add query ref for all meters. if any meter failed to add ref, rollback whole operation and go to error
pthread_mutex_lock(&pVnode->vmutex);
code = vnodeIncQueryRefCount(pQueryMsg, pSids, pMeterObjList, &incNumber);
assert(incNumber <= pQueryMsg->numOfSids);
pthread_mutex_unlock(&pVnode->vmutex);
if (code != TSDB_CODE_SUCCESS || pQueryMsg->numOfSids == 0) { // all the meters may have been dropped.
goto _query_over;
}
pExprs = vnodeCreateSqlFunctionExpr(pQueryMsg, &code);
if (pExprs == NULL) {
assert(code != TSDB_CODE_SUCCESS);
goto _query_over;
}
pGroupbyExpr = vnodeCreateGroupbyExpr(pQueryMsg, &code);
if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _query_over;
}
if (pObj->qhandle) {
dTrace("QInfo:%p %s free qhandle", pObj->qhandle, __FUNCTION__);
void* qHandle = pObj->qhandle;
pObj->qhandle = NULL;
vnodeDecRefCount(qHandle);
} }
if (QUERY_IS_STABLE_QUERY(pQueryMsg->queryType)) { assert(pQInfo != NULL);
pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); int32_t contLen = dnodeGetRetrieveDataSize(pQInfo);
} else { SRetrieveMeterRsp *retrieveRsp = (SRetrieveMeterRsp *) rpcMallocCont(contLen);
pObj->qhandle = vnodeQueryOnSingleTable(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); if (retrieveRsp == NULL) {
rpcSendSimpleRsp(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY);
return;
} }
_query_over: code = dnodeGetRetrieveData(pQInfo, retrieveRsp);
// if failed to add ref for all meters in this query, abort current query
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
vnodeDecQueryRefCount(pQueryMsg, pMeterObjList, incNumber); rpcSendSimpleRsp(pConn, TSDB_CODE_INVALID_QHANDLE);
} }
tfree(pQueryMsg->pSqlFuncExprs); retrieveRsp->numOfRows = htonl(retrieveRsp->numOfRows);
tfree(pMeterObjList); retrieveRsp->precision = htons(retrieveRsp->precision);
ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle); retrieveRsp->offset = htobe64(retrieveRsp->offset);
retrieveRsp->useconds = htobe64(retrieveRsp->useconds);
tfree(pQueryMsg->pSidExtInfo);
for(int32_t i = 0; i < pQueryMsg->numOfCols; ++i) {
vnodeFreeColumnInfo(&pQueryMsg->colList[i]);
}
atomic_fetch_add_32(&tsDnodeSelectReqNum, 1); rpcSendResponse(pConn, retrieveRsp, contLen);
return ret;
} }
void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { static void dnodeProcessRetrieveRequest(int8_t *pCont, int32_t contLen, void *pConn) {
dTrace("conn:%p, start to retrieve data", pConn);
}
void dnodeProcessRetrieveRequestCb(int code, SRetrieveMeterRsp *result, SShellObj *pObj) { SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *) pCont;
if (pObj == NULL || result == NULL || code == TSDB_CODE_ACTION_IN_PROGRESS) { dnodeRetrieveData(pRetrieve, pConn, dnodeProcessRetrieveRequestCb);
return;
}
} }
static void dnodeProcessRetrieveRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj) { void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) {
SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *) pMsg; assert(result != NULL);
dnodeRetrieveData(pRetrieve, msgLen, pObj, dnodeProcessRetrieveRequestCb);
}
void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) { if (result->code != 0) {
if (pObj == NULL || result == NULL || result->code == TSDB_CODE_ACTION_IN_PROGRESS) { rpcSendSimpleRsp(pConn, result->code);
return; return;
} }
SShellObj *pShellObj = (SShellObj *) pObj; int32_t contLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock);
int32_t msgLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) rpcMallocCont(contLen);
SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) taosBuildRspMsgWithSize(pShellObj->thandle,
TSDB_MSG_TYPE_SUBMIT_RSP, msgLen);
if (submitRsp == NULL) { if (submitRsp == NULL) {
rpcSendSimpleRsp(pConn, TSDB_CODE_SERV_OUT_OF_MEMORY);
return; return;
} }
dTrace("code:%d, numOfRows:%d affectedRows:%d", result->code, result->numOfRows, result->affectedRows); dTrace("code:%d, numOfRows:%d affectedRows:%d", result->code, result->numOfRows, result->affectedRows);
memcpy(submitRsp, result, msgLen); memcpy(submitRsp, result, contLen);
for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) { for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) {
SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i]; SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i];
...@@ -368,6 +189,7 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) { ...@@ -368,6 +189,7 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) {
} else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) { } else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) {
dnodeSendMeterCfgMsg(block->vnode, block->sid); dnodeSendMeterCfgMsg(block->vnode, block->sid);
} }
block->index = htonl(block->index);
block->vnode = htonl(block->vnode); block->vnode = htonl(block->vnode);
block->sid = htonl(block->sid); block->sid = htonl(block->sid);
block->code = htonl(block->code); block->code = htonl(block->code);
...@@ -378,21 +200,21 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) { ...@@ -378,21 +200,21 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) {
submitRsp->failedRows = htonl(submitRsp->failedRows); submitRsp->failedRows = htonl(submitRsp->failedRows);
submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks); submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks);
taosSendMsgToPeer(pShellObj->thandle, (int8_t*)submitRsp, msgLen); rpcSendResponse(pConn, submitRsp, contLen);
} }
static void dnodeProcessShellSubmitRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj) { static void dnodeProcessShellSubmitRequest(int8_t *pCont, int32_t contLen, void *pConn) {
SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pMsg; SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pCont;
dnodeWriteData(pSubmit, pObj, dnodeProcessShellSubmitRequestCb); dnodeWriteData(pSubmit, pConn, dnodeProcessShellSubmitRequestCb);
atomic_fetch_add_32(&tsDnodeInsertReqNum, 1); atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1);
} }
SDnodeStatisInfo dnodeGetStatisInfo() { SDnodeStatisInfo dnodeGetStatisInfo() {
SDnodeStatisInfo info = {0}; SDnodeStatisInfo info = {0};
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
info.httpReqNum = httpGetReqCount(); info.httpReqNum = httpGetReqCount();
info.selectReqNum = atomic_exchange_32(&tsDnodeSelectReqNum, 0); info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0);
info.insertReqNum = atomic_exchange_32(&tsDnodeInsertReqNum, 0); info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0);
} }
return info; return info;
......
...@@ -19,18 +19,14 @@ ...@@ -19,18 +19,14 @@
#include "tlog.h" #include "tlog.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
int32_t dnodeCheckTableExist(char *tableId) { void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) {
return 0;
}
void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pShellObj, void (*callback)(SShellSubmitRspMsg *, void *)) {
SShellSubmitRspMsg result = {0}; SShellSubmitRspMsg result = {0};
int32_t numOfSid = htonl(pSubmit->numOfSid); int32_t numOfSid = htonl(pSubmit->numOfSid);
if (numOfSid <= 0) { if (numOfSid <= 0) {
dError("invalid num of tables:%d", numOfSid); dError("invalid num of tables:%d", numOfSid);
result.code = TSDB_CODE_INVALID_QUERY_MSG; result.code = TSDB_CODE_INVALID_QUERY_MSG;
callback(&result, pShellObj); callback(&result, pConn);
} }
//TODO: submit implementation //TODO: submit implementation
......
...@@ -25,8 +25,8 @@ extern "C" { ...@@ -25,8 +25,8 @@ extern "C" {
#include "tsched.h" #include "tsched.h"
typedef struct { typedef struct {
int32_t selectReqNum; int32_t queryReqNum;
int32_t insertReqNum; int32_t submitReqNum;
int32_t httpReqNum; int32_t httpReqNum;
} SDnodeStatisInfo; } SDnodeStatisInfo;
......
...@@ -238,6 +238,9 @@ extern "C" { ...@@ -238,6 +238,9 @@ extern "C" {
#define TSQL_SO_ASC 1 #define TSQL_SO_ASC 1
#define TSQL_SO_DESC 0 #define TSQL_SO_DESC 0
#define TSDB_SESSIONS_PER_VNODE (300)
#define TSDB_SESSIONS_PER_DNODE (TSDB_SESSIONS_PER_VNODE * TSDB_MAX_VNODES)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,114 +28,118 @@ extern "C" { ...@@ -28,114 +28,118 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
// message type // message type
#define TSDB_MSG_TYPE_REG 1 #define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2 #define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_SUBMIT 3 #define TSDB_MSG_TYPE_DNODE_SUBMIT 3
#define TSDB_MSG_TYPE_SUBMIT_RSP 4 #define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_NWCHANGE 5 #define TSDB_MSG_TYPE_DNODE_QUERY 5
#define TSDB_MSG_TYPE_NWCHANGE_RSP 6 #define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6
#define TSDB_MSG_TYPE_DELIVER 7 #define TSDB_MSG_TYPE_DNODE_RETRIEVE 7
#define TSDB_MSG_TYPE_DELIVER_RSP 8 #define TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE 9
#define TSDB_MSG_TYPE_CREATE 9 #define TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP 10
#define TSDB_MSG_TYPE_CREATE_RSP 10 #define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE 11
#define TSDB_MSG_TYPE_REMOVE 11 #define TSDB_MSG_TYPE_DNODE_CREATE_NORMAL_TABLE_RSP 12
#define TSDB_MSG_TYPE_REMOVE_RSP 12 #define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE 13
#define TSDB_MSG_TYPE_VPEERS 13 #define TSDB_MSG_TYPE_DNODE_CREATE_STREAM_TABLE_RSP 14
#define TSDB_MSG_TYPE_VPEERS_RSP 14 #define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE 15
#define TSDB_MSG_TYPE_FREE_VNODE 15 #define TSDB_MSG_TYPE_DNODE_CREATE_SUPER_TABLE_RSP 16
#define TSDB_MSG_TYPE_FREE_VNODE_RSP 16 #define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE 17
#define TSDB_MSG_TYPE_VPEER_CFG 17 #define TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE_RSP 18
#define TSDB_MSG_TYPE_VPEER_CFG_RSP 18 #define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE 19
#define TSDB_MSG_TYPE_METER_CFG 19 #define TSDB_MSG_TYPE_DNODE_REMOVE_NORMAL_TABLE_RSP 20
#define TSDB_MSG_TYPE_METER_CFG_RSP 20 #define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE 21
#define TSDB_MSG_TYPE_DNODE_REMOVE_STREAM_TABLE_RSP 22
#define TSDB_MSG_TYPE_VPEER_FWD 21 #define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE 23
#define TSDB_MSG_TYPE_VPEER_FWD_RSP 22 #define TSDB_MSG_TYPE_DNODE_REMOVE_SUPER_TABLE_RSP 24
#define TSDB_MSG_TYPE_SYNC 23 #define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE 25
#define TSDB_MSG_TYPE_SYNC_RSP 24 #define TSDB_MSG_TYPE_DNODE_ALTER_CHILD_TABLE_RSP 26
#define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE 27
#define TSDB_MSG_TYPE_INSERT 25 #define TSDB_MSG_TYPE_DNODE_ALTER_NORMAL_TABLE_RSP 28
#define TSDB_MSG_TYPE_INSERT_RSP 26 #define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE 29
#define TSDB_MSG_TYPE_QUERY 27 #define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_TABLE_RSP 30
#define TSDB_MSG_TYPE_QUERY_RSP 28 #define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE 31
#define TSDB_MSG_TYPE_RETRIEVE 29 #define TSDB_MSG_TYPE_DNODE_ALTER_SUPER_TABLE_RSP 32
#define TSDB_MSG_TYPE_RETRIEVE_RSP 30 #define TSDB_MSG_TYPE_DNODE_VPEERS 33
#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 34
#define TSDB_MSG_TYPE_CONNECT 31 #define TSDB_MSG_TYPE_DNODE_FREE_VNODE 35
#define TSDB_MSG_TYPE_CONNECT_RSP 32 #define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 36
#define TSDB_MSG_TYPE_CREATE_ACCT 33 #define TSDB_MSG_TYPE_DNODE_CFG 37
#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34 #define TSDB_MSG_TYPE_DNODE_CFG_RSP 38
#define TSDB_MSG_TYPE_CREATE_USER 35 #define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 39
#define TSDB_MSG_TYPE_CREATE_USER_RSP 36 #define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 40
#define TSDB_MSG_TYPE_DROP_ACCT 37
#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38 #define TSDB_MSG_TYPE_SDB_SYNC 41
#define TSDB_MSG_TYPE_DROP_USER 39 #define TSDB_MSG_TYPE_SDB_SYNC_RSP 42
#define TSDB_MSG_TYPE_DROP_USER_RSP 40 #define TSDB_MSG_TYPE_SDB_FORWARD 43
#define TSDB_MSG_TYPE_ALTER_USER 41 #define TSDB_MSG_TYPE_SDB_FORWARD_RSP 44
#define TSDB_MSG_TYPE_ALTER_USER_RSP 42 #define TSDB_MSG_TYPE_CONNECT 51
#define TSDB_MSG_TYPE_CREATE_MNODE 43 #define TSDB_MSG_TYPE_CONNECT_RSP 52
#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 44 #define TSDB_MSG_TYPE_CREATE_ACCT 53
#define TSDB_MSG_TYPE_DROP_MNODE 45 #define TSDB_MSG_TYPE_CREATE_ACCT_RSP 54
#define TSDB_MSG_TYPE_DROP_MNODE_RSP 46 #define TSDB_MSG_TYPE_ALTER_ACCT 55
#define TSDB_MSG_TYPE_CREATE_DNODE 47 #define TSDB_MSG_TYPE_ALTER_ACCT_RSP 56
#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 48 #define TSDB_MSG_TYPE_DROP_ACCT 57
#define TSDB_MSG_TYPE_DROP_DNODE 49 #define TSDB_MSG_TYPE_DROP_ACCT_RSP 58
#define TSDB_MSG_TYPE_DROP_DNODE_RSP 50 #define TSDB_MSG_TYPE_CREATE_USER 59
#define TSDB_MSG_TYPE_CREATE_DB 51 #define TSDB_MSG_TYPE_CREATE_USER_RSP 60
#define TSDB_MSG_TYPE_CREATE_DB_RSP 52 #define TSDB_MSG_TYPE_ALTER_USER 61
#define TSDB_MSG_TYPE_DROP_DB 53 #define TSDB_MSG_TYPE_ALTER_USER_RSP 62
#define TSDB_MSG_TYPE_DROP_DB_RSP 54 #define TSDB_MSG_TYPE_DROP_USER 63
#define TSDB_MSG_TYPE_USE_DB 55 #define TSDB_MSG_TYPE_DROP_USER_RSP 64
#define TSDB_MSG_TYPE_USE_DB_RSP 56 #define TSDB_MSG_TYPE_CREATE_MNODE 65
#define TSDB_MSG_TYPE_CREATE_TABLE 57 #define TSDB_MSG_TYPE_CREATE_MNODE_RSP 66
#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 58 #define TSDB_MSG_TYPE_DROP_MNODE 67
#define TSDB_MSG_TYPE_DROP_MNODE_RSP 68
#define TSDB_MSG_TYPE_DROP_TABLE 59 #define TSDB_MSG_TYPE_CREATE_DNODE 69
#define TSDB_MSG_TYPE_DROP_TABLE_RSP 60 #define TSDB_MSG_TYPE_CREATE_DNODE_RSP 70
#define TSDB_MSG_TYPE_METERINFO 61 #define TSDB_MSG_TYPE_DROP_DNODE 71
#define TSDB_MSG_TYPE_METERINFO_RSP 62 #define TSDB_MSG_TYPE_DROP_DNODE_RSP 72
#define TSDB_MSG_TYPE_METRIC_META 63 #define TSDB_MSG_TYPE_ALTER_DNODE 73
#define TSDB_MSG_TYPE_METRIC_META_RSP 64 #define TSDB_MSG_TYPE_ALTER_DNODE_RSP 74
#define TSDB_MSG_TYPE_SHOW 65 #define TSDB_MSG_TYPE_CREATE_DB 75
#define TSDB_MSG_TYPE_SHOW_RSP 66 #define TSDB_MSG_TYPE_CREATE_DB_RSP 76
#define TSDB_MSG_TYPE_DROP_DB 77
#define TSDB_MSG_TYPE_FORWARD 67 #define TSDB_MSG_TYPE_DROP_DB_RSP 78
#define TSDB_MSG_TYPE_FORWARD_RSP 68 #define TSDB_MSG_TYPE_USE_DB 79
#define TSDB_MSG_TYPE_USE_DB_RSP 80
#define TSDB_MSG_TYPE_CFG_PNODE 69 #define TSDB_MSG_TYPE_ALTER_DB 81
#define TSDB_MSG_TYPE_CFG_PNODE_RSP 70 #define TSDB_MSG_TYPE_ALTER_DB_RSP 82
#define TSDB_MSG_TYPE_CFG_MNODE 71 #define TSDB_MSG_TYPE_CREATE_TABLE 83
#define TSDB_MSG_TYPE_CFG_MNODE_RSP 72 #define TSDB_MSG_TYPE_CREATE_TABLE_RSP 84
#define TSDB_MSG_TYPE_DROP_TABLE 85
#define TSDB_MSG_TYPE_KILL_QUERY 73 #define TSDB_MSG_TYPE_DROP_TABLE_RSP 86
#define TSDB_MSG_TYPE_KILL_QUERY_RSP 74 #define TSDB_MSG_TYPE_ALTER_TABLE 87
#define TSDB_MSG_TYPE_KILL_STREAM 75 #define TSDB_MSG_TYPE_ALTER_TABLE_RSP 88
#define TSDB_MSG_TYPE_KILL_STREAM_RSP 76 #define TSDB_MSG_TYPE_VNODE_CFG 89
#define TSDB_MSG_TYPE_KILL_CONNECTION 77 #define TSDB_MSG_TYPE_VNODE_CFG_RSP 90
#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 78 #define TSDB_MSG_TYPE_TABLE_CFG 91
#define TSDB_MSG_TYPE_TABLE_CFG_RSP 92
#define TSDB_MSG_TYPE_ALTER_STREAM 79 #define TSDB_MSG_TYPE_TABLE_META 93
#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80 #define TSDB_MSG_TYPE_TABLE_META_RSP 94
#define TSDB_MSG_TYPE_ALTER_TABLE 81 #define TSDB_MSG_TYPE_STABLE_META 95
#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 82 #define TSDB_MSG_TYPE_STABLE_META_RSP 96
#define TSDB_MSG_TYPE_ALTER_DB 83 #define TSDB_MSG_TYPE_MULTI_TABLE_META 97
#define TSDB_MSG_TYPE_ALTER_DB_RSP 84 #define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 98
#define TSDB_MSG_TYPE_ALTER_STREAM 99
#define TSDB_MSG_TYPE_MULTI_METERINFO 85 #define TSDB_MSG_TYPE_ALTER_STREAM_RSP 100
#define TSDB_MSG_TYPE_MULTI_METERINFO_RSP 86 #define TSDB_MSG_TYPE_SHOW 101
#define TSDB_MSG_TYPE_SHOW_RSP 102
#define TSDB_MSG_TYPE_HEARTBEAT 91 #define TSDB_MSG_TYPE_CFG_MNODE 103
#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92 #define TSDB_MSG_TYPE_CFG_MNODE_RSP 104
#define TSDB_MSG_TYPE_STATUS 93 #define TSDB_MSG_TYPE_KILL_QUERY 105
#define TSDB_MSG_TYPE_STATUS_RSP 94 #define TSDB_MSG_TYPE_KILL_QUERY_RSP 106
#define TSDB_MSG_TYPE_GRANT 95 #define TSDB_MSG_TYPE_KILL_STREAM 107
#define TSDB_MSG_TYPE_GRANT_RSP 96 #define TSDB_MSG_TYPE_KILL_STREAM_RSP 108
#define TSDB_MSG_TYPE_KILL_CONNECTION 109
#define TSDB_MSG_TYPE_ALTER_ACCT 97 #define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 110
#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 98 #define TSDB_MSG_TYPE_HEARTBEAT 111
#define TSDB_MSG_TYPE_HEARTBEAT_RSP 112
#define TSDB_MSG_TYPE_MAX 101 #define TSDB_MSG_TYPE_STATUS 113
#define TSDB_MSG_TYPE_STATUS_RSP 114
#define TSDB_MSG_TYPE_GRANT 115
#define TSDB_MSG_TYPE_GRANT_RSP 116
#define TSDB_MSG_TYPE_MAX 117
// IE type // IE type
#define TSDB_IE_TYPE_SEC 1 #define TSDB_IE_TYPE_SEC 1
...@@ -287,13 +291,14 @@ typedef struct { ...@@ -287,13 +291,14 @@ typedef struct {
} SShellSubmitMsg; } SShellSubmitMsg;
typedef struct { typedef struct {
int32_t index; // index of failed block in submit blocks
int32_t vnode; // vnode index of failed block int32_t vnode; // vnode index of failed block
int32_t sid; // table index of failed block int32_t sid; // table index of failed block
int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table
} SShellSubmitRspBlock; } SShellSubmitRspBlock;
typedef struct { typedef struct {
int32_t code; // 0-success, 1-inprogress, > 1 error code int32_t code; // 0-success, > 0 error code
int32_t numOfRows; // number of records the client is trying to write int32_t numOfRows; // number of records the client is trying to write
int32_t affectedRows; // number of records actually written int32_t affectedRows; // number of records actually written
int32_t failedRows; // number of failed records (exclude duplicate records) int32_t failedRows; // number of failed records (exclude duplicate records)
...@@ -301,7 +306,6 @@ typedef struct { ...@@ -301,7 +306,6 @@ typedef struct {
SShellSubmitRspBlock *failedBlocks; SShellSubmitRspBlock *failedBlocks;
} SShellSubmitRspMsg; } SShellSubmitRspMsg;
typedef struct SSchema { typedef struct SSchema {
uint8_t type; uint8_t type;
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN];
......
...@@ -50,16 +50,16 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { ...@@ -50,16 +50,16 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SVgObj * pVgroup; SVgObj * pVgroup;
if (!sdbMaster) { if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_REDIRECT); taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_REDIRECT);
return 0; return 0;
} }
int vnode = htonl(pCfg->vnode); int vnode = htonl(pCfg->vnode);
int sid = htonl(pCfg->sid); int sid = htonl(pCfg->sid);
pStart = taosBuildRspMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, 64000); pStart = taosBuildRspMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, 64000);
if (pStart == NULL) { if (pStart == NULL) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_METER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_TABLE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
...@@ -100,15 +100,15 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) { ...@@ -100,15 +100,15 @@ int mgmtProcessVpeerCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
SVgObj * pVgroup = NULL; SVgObj * pVgroup = NULL;
if (!sdbMaster) { if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_REDIRECT); taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_REDIRECT);
return 0; return 0;
} }
int vnode = htonl(pCfg->vnode); int vnode = htonl(pCfg->vnode);
pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP); pStart = taosBuildRspMsgToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP);
if (pStart == NULL) { if (pStart == NULL) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEER_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VNODE_CFG_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
pMsg = pStart; pMsg = pStart;
...@@ -142,7 +142,7 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) { ...@@ -142,7 +142,7 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
STaosRsp *pRsp = (STaosRsp *)msg; STaosRsp *pRsp = (STaosRsp *)msg;
if (!sdbMaster) { if (!sdbMaster) {
taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_VPEERS_RSP, TSDB_CODE_REDIRECT); taosSendSimpleRspToDnode(pObj, TSDB_MSG_TYPE_DNODE_VPEERS_RSP, TSDB_CODE_REDIRECT);
return 0; return 0;
} }
...@@ -172,19 +172,19 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) { ...@@ -172,19 +172,19 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
} }
void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj) { void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj) {
if (msgType == TSDB_MSG_TYPE_METER_CFG) { if (msgType == TSDB_MSG_TYPE_TABLE_CFG) {
mgmtProcessMeterCfgMsg(content, msgLen - sizeof(SIntMsg), pObj); mgmtProcessMeterCfgMsg(content, msgLen - sizeof(SIntMsg), pObj);
} else if (msgType == TSDB_MSG_TYPE_VPEER_CFG) { } else if (msgType == TSDB_MSG_TYPE_VNODE_CFG) {
mgmtProcessVpeerCfgMsg(content, msgLen - sizeof(SIntMsg), pObj); mgmtProcessVpeerCfgMsg(content, msgLen - sizeof(SIntMsg), pObj);
} else if (msgType == TSDB_MSG_TYPE_CREATE_RSP) { } else if (msgType == TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE_RSP) {
mgmtProcessCreateRsp(content, msgLen - sizeof(SIntMsg), pObj); mgmtProcessCreateRsp(content, msgLen - sizeof(SIntMsg), pObj);
} else if (msgType == TSDB_MSG_TYPE_REMOVE_RSP) { } else if (msgType == TSDB_MSG_TYPE_REMOVE_RSP) {
// do nothing // do nothing
} else if (msgType == TSDB_MSG_TYPE_VPEERS_RSP) { } else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) {
mgmtProcessVPeersRsp(content, msgLen - sizeof(SIntMsg), pObj); mgmtProcessVPeersRsp(content, msgLen - sizeof(SIntMsg), pObj);
} else if (msgType == TSDB_MSG_TYPE_FREE_VNODE_RSP) { } else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) {
mgmtProcessFreeVnodeRsp(content, msgLen - sizeof(SIntMsg), pObj); mgmtProcessFreeVnodeRsp(content, msgLen - sizeof(SIntMsg), pObj);
} else if (msgType == TSDB_MSG_TYPE_CFG_PNODE_RSP) { } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) {
// do nothing; // do nothing;
} else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
// do nothing; // do nothing;
...@@ -243,7 +243,7 @@ int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int ...@@ -243,7 +243,7 @@ int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int
continue; continue;
} }
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000);
if (pStart == NULL) { if (pStart == NULL) {
continue; continue;
} }
...@@ -267,7 +267,7 @@ int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) { ...@@ -267,7 +267,7 @@ int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) {
continue; continue;
} }
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000);
if (pStart == NULL) { if (pStart == NULL) {
continue; continue;
} }
...@@ -291,7 +291,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { ...@@ -291,7 +291,7 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
continue; continue;
} }
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_DNODE_CREATE_CHILD_TABLE, 64000);
if (pStart == NULL) { if (pStart == NULL) {
continue; continue;
} }
...@@ -322,7 +322,7 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { ...@@ -322,7 +322,7 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
pObj = mgmtGetDnode(pVgroup->vnodeGid[i].ip); pObj = mgmtGetDnode(pVgroup->vnodeGid[i].ip);
if (pObj == NULL) continue; if (pObj == NULL) continue;
pStart = taosBuildReqMsgToDnode(pObj, TSDB_MSG_TYPE_REMOVE); pStart = taosBuildReqMsgToDnode(pObj, TSDB_MSG_TYPE_DNODE_REMOVE_CHILD_TABLE);
if (pStart == NULL) continue; if (pStart == NULL) continue;
pMsg = pStart; pMsg = pStart;
...@@ -428,7 +428,7 @@ int mgmtSendVPeersMsg(SVgObj *pVgroup) { ...@@ -428,7 +428,7 @@ int mgmtSendVPeersMsg(SVgObj *pVgroup) {
mgmtUpdateDnode(pDnode); mgmtUpdateDnode(pDnode);
if (pDnode->thandle && pVgroup->numOfVnodes >= 1) { if (pDnode->thandle && pVgroup->numOfVnodes >= 1) {
pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_VPEERS); pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_DNODE_VPEERS);
if (pStart == NULL) continue; if (pStart == NULL) continue;
pMsg = mgmtBuildVpeersIe(pStart, pVgroup, pVgroup->vnodeGid[i].vnode); pMsg = mgmtBuildVpeersIe(pStart, pVgroup, pVgroup->vnodeGid[i].vnode);
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
...@@ -457,7 +457,7 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid) { ...@@ -457,7 +457,7 @@ int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid) {
return -1; return -1;
} }
pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_FREE_VNODE); pStart = taosBuildReqMsgToDnode(pDnode, TSDB_MSG_TYPE_DNODE_FREE_VNODE);
if (pStart == NULL) return -1; if (pStart == NULL) return -1;
pMsg = pStart; pMsg = pStart;
...@@ -539,7 +539,7 @@ int mgmtSendCfgDnodeMsg(char *cont) { ...@@ -539,7 +539,7 @@ int mgmtSendCfgDnodeMsg(char *cont) {
} }
#ifdef CLUSTER #ifdef CLUSTER
pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_CFG_PNODE); pStart = taosBuildReqMsg(pDnode->thandle, TSDB_MSG_TYPE_DNODE_CFG);
if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE; if (pStart == NULL) return TSDB_CODE_NODE_OFFLINE;
pMsg = pStart; pMsg = pStart;
......
...@@ -138,7 +138,7 @@ static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMe ...@@ -138,7 +138,7 @@ static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMe
} }
static char *mgmtAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp **pRsp) { static char *mgmtAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp **pRsp) {
char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, size); char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, size);
if (pStart == NULL) return 0; if (pStart == NULL) return 0;
*pMsg = pStart; *pMsg = pStart;
*pRsp = (STaosRsp *)(*pMsg); *pRsp = (STaosRsp *)(*pMsg);
...@@ -147,7 +147,7 @@ static char *mgmtAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp * ...@@ -147,7 +147,7 @@ static char *mgmtAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp *
} }
static char *mgmtForMultiAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp **pRsp) { static char *mgmtForMultiAllocMsg(SConnObj *pConn, int32_t size, char **pMsg, STaosRsp **pRsp) {
char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_MULTI_METERINFO_RSP, size); char *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, size);
if (pStart == NULL) return 0; if (pStart == NULL) return 0;
*pMsg = pStart; *pMsg = pStart;
*pRsp = (STaosRsp *)(*pMsg); *pRsp = (STaosRsp *)(*pMsg);
...@@ -197,7 +197,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -197,7 +197,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (pDb == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) { if (pDb == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) {
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) { if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
...@@ -212,13 +212,13 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -212,13 +212,13 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// on demand create table from super table if meter does not exists // on demand create table from super table if meter does not exists
if (pMeterObj == NULL && pInfo->createFlag == 1) { if (pMeterObj == NULL && pInfo->createFlag == 1) {
// write operation needs to redirect to master mnode // write operation needs to redirect to master mnode
if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_METERINFO_RSP) != 0) { if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_TABLE_META_RSP) != 0) {
return 0; return 0;
} }
SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData)); SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData));
if (pCreateMsg == NULL) { if (pCreateMsg == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
...@@ -241,7 +241,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -241,7 +241,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) { if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
...@@ -255,7 +255,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -255,7 +255,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
} }
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) { if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
...@@ -364,7 +364,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -364,7 +364,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
char *pNewMsg; char *pNewMsg;
if ((pStart = mgmtForMultiAllocMsg(pConn, size, &pNewMsg, &pRsp)) == NULL) { if ((pStart = mgmtForMultiAllocMsg(pConn, size, &pNewMsg, &pRsp)) == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
...@@ -393,7 +393,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -393,7 +393,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (NULL == pMsgHdr) { if (NULL == pMsgHdr) {
char* pTmp = pStart - sizeof(STaosHeader); char* pTmp = pStart - sizeof(STaosHeader);
tfree(pTmp); tfree(pTmp);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
break; break;
} }
...@@ -523,9 +523,9 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -523,9 +523,9 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
if (pMetric == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) { if (pMetric == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) {
pStart = taosBuildRspMsg(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP); pStart = taosBuildRspMsg(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP);
if (pStart == NULL) { if (pStart == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
...@@ -541,7 +541,7 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -541,7 +541,7 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
} else { } else {
msgLen = mgmtRetrieveMetricMeta(pConn, &pStart, pSuperTableMetaMsg); msgLen = mgmtRetrieveMetricMeta(pConn, &pStart, pSuperTableMetaMsg);
if (msgLen <= 0) { if (msgLen <= 0) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
} }
...@@ -999,14 +999,14 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -999,14 +999,14 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
*/ */
if (pRetrieve->qhandle != pConn->qhandle) { if (pRetrieve->qhandle != pConn->qhandle) {
mError("retrieve:%p, qhandle:%p is not matched with saved:%p", pRetrieve, pRetrieve->qhandle, pConn->qhandle); mError("retrieve:%p, qhandle:%p is not matched with saved:%p", pRetrieve, pRetrieve->qhandle, pConn->qhandle);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED);
return -1; return -1;
} }
pShow = (SShowObj *)pRetrieve->qhandle; pShow = (SShowObj *)pRetrieve->qhandle;
if (pShow->signature != (void *)pShow) { if (pShow->signature != (void *)pShow) {
mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature); mError("pShow:%p, signature:%p, query memory is corrupted", pShow, pShow->signature);
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_MEMORY_CORRUPTED);
return -1; return -1;
} else { } else {
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
...@@ -1024,9 +1024,9 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -1024,9 +1024,9 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
size = pShow->rowSize * rowsToRead; size = pShow->rowSize * rowsToRead;
} }
pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, size + 100); pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, size + 100);
if (pStart == NULL) { if (pStart == NULL) {
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
return 0; return 0;
} }
...@@ -1206,7 +1206,7 @@ int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) { ...@@ -1206,7 +1206,7 @@ int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) {
code = mgmtSendCfgDnodeMsg(pMsg); code = mgmtSendCfgDnodeMsg(pMsg);
} }
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CFG_PNODE_RSP, code); taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DNODE_CFG_RSP, code);
if (code == 0) mTrace("dnode:%s is configured by %s", pCfg->ip, pConn->pUser->user); if (code == 0) mTrace("dnode:%s is configured by %s", pCfg->ip, pConn->pUser->user);
...@@ -1464,9 +1464,9 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -1464,9 +1464,9 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead); int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead);
// read-only request can be executed concurrently // read-only request can be executed concurrently
if ((pMsg->msgType == TSDB_MSG_TYPE_METERINFO && (!mgmtCheckMeterMetaMsgType(cont))) || if ((pMsg->msgType == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(cont))) ||
pMsg->msgType == TSDB_MSG_TYPE_METRIC_META || pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE || pMsg->msgType == TSDB_MSG_TYPE_STABLE_META || pMsg->msgType == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
pMsg->msgType == TSDB_MSG_TYPE_SHOW || pMsg->msgType == TSDB_MSG_TYPE_MULTI_METERINFO) { pMsg->msgType == TSDB_MSG_TYPE_SHOW || pMsg->msgType == TSDB_MSG_TYPE_MULTI_TABLE_META) {
(*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn); (*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn);
} else { } else {
if (mgmtProcessShellMsg[pMsg->msgType]) { if (mgmtProcessShellMsg[pMsg->msgType]) {
...@@ -1498,9 +1498,9 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { ...@@ -1498,9 +1498,9 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
} }
void mgmtInitProcessShellMsg() { void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_METERINFO] = mgmtProcessMeterMetaMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessMeterMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_METRIC_META] = mgmtProcessMetricMetaMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessMetricMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_METERINFO] = mgmtProcessMultiMeterMetaMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiMeterMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DB] = mgmtProcessCreateDbMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DB] = mgmtProcessCreateDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_DB] = mgmtProcessAlterDbMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_DB] = mgmtProcessAlterDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_USER] = mgmtProcessCreateUserMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_USER] = mgmtProcessCreateUserMsg;
...@@ -1516,7 +1516,7 @@ void mgmtInitProcessShellMsg() { ...@@ -1516,7 +1516,7 @@ void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_TABLE] = mgmtProcessAlterTableMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_ALTER_TABLE] = mgmtProcessAlterTableMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_USE_DB] = mgmtProcessUseDbMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_USE_DB] = mgmtProcessUseDbMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_RETRIEVE] = mgmtProcessRetrieveMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg;
...@@ -1525,7 +1525,7 @@ void mgmtInitProcessShellMsg() { ...@@ -1525,7 +1525,7 @@ void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessCreateMnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessCreateMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessDropMnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_MNODE] = mgmtProcessDropMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessCfgMnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_MNODE] = mgmtProcessCfgMnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CFG_PNODE] = mgmtProcessCfgDnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_CFG] = mgmtProcessCfgDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_QUERY] = mgmtProcessKillQueryMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_QUERY] = mgmtProcessKillQueryMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg;
......
...@@ -334,10 +334,10 @@ int monitorBuildBandSql(char *sql) { ...@@ -334,10 +334,10 @@ int monitorBuildBandSql(char *sql) {
int monitorBuildReqSql(char *sql) { int monitorBuildReqSql(char *sql) {
SDnodeStatisInfo info; SDnodeStatisInfo info;
info.httpReqNum = info.insertReqNum = info.selectReqNum = 0; info.httpReqNum = info.submitReqNum = info.queryReqNum = 0;
(*mnodeCountRequestFp)(&info); (*mnodeCountRequestFp)(&info);
return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.selectReqNum, info.insertReqNum); return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum);
} }
int monitorBuildIoSql(char *sql) { int monitorBuildIoSql(char *sql) {
......
...@@ -13,112 +13,134 @@ ...@@ -13,112 +13,134 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
char *taosMsg[] = {"null", char *taosMsg[] = {
"registration", "null",
"registration-rsp", "registration",
"submit", "registration-rsp",
"submit-rsp", "submit",
"nw-change", "submit-rsp",
"nw-change-rsp", "query",
"deliver", "query-rsp",
"deliver-rsp", "retrieve",
"retrieve-rsp",
"create-table",
"create-table-rsp", //10
"create", "create-normal-table",
"create-rsp", "create-normal-table-rsp",
"remove", "create-stream-table",
"remove-rsp", "create-stream-table-rsp",
"vpeers", "create-super-table",
"vpeers-rsp", "create-super-table-rsp",
"free-vnode", "remove-table",
"free-vnode-rsp", "remove-table-rsp",
"vpeer-cfg", "remove-normal-table",
"vpeer-cfg-rsp", "remove-normal-table-rsp", //20
"meter-cfg",
"meter-cfg-rsp",
"vpeer-fwd", "remove-stream-table",
"vpeer-fwd-rsp", "remove-stream-table-rsp",
"sync", "remove-super-table",
"sync-rsp", "remove-super-table-rsp",
"alter-table",
"alter-table-rsp",
"alter-normal-table",
"alter-normal-table-rsp",
"alter-stream-table",
"alter-stream-table-rsp", //30
"insert", "alter-super-table",
"insert-rsp", "alter-super-table-rsp",
"query", "vpeers",
"query-rsp", "vpeers-rsp",
"retrieve", "free-vnode",
"retrieve-rsp", "free-vnode-rsp",
"cfg-dnode",
"cfg-dnode-rsp",
"alter-stream",
"alter-stream-rsp", //40
"connect", "sync",
"connect-rsp", "sync-rsp",
"create-acct", "forward",
"create-acct-rsp", "forward-rsp",
"create-user", "",
"create-user-rsp", "",
"drop-acct", "",
"drop-acct-rsp", "",
"drop-user", "",
"drop-user-rsp", "", //50
"alter-user",
"alter-user-rsp",
"create-mnode",
"create-mnode-rsp",
"drop-mnode",
"drop-mnode-rsp",
"create-dnode",
"create-dnode-rsp",
"drop-dnode",
"drop-dnode-rsp",
"create-db",
"create-db-rsp",
"drop-db",
"drop-db-rsp",
"use-db",
"use-db-rsp",
"create-table",
"create-table-rsp",
"drop-table",
"drop-table-rsp",
"meter-info",
"meter-info-rsp",
"metric-meta",
"metric-meta-rsp",
"show",
"show-rsp",
"forward", "connect",
"forward-rsp", "connect-rsp",
"create-acct",
"create-acct-rsp",
"alter-acct",
"alter-acct-rsp",
"drop-acct",
"drop-acct-rsp",
"create-user",
"create-user-rsp", //60
"cfg-dnode", "alter-user",
"cfg-dnode-rsp", "alter-user-rsp",
"cfg-mnode", "drop-user",
"cfg-mnode-rsp", "drop-user-rsp",
"create-mnode",
"create-mnode-rsp",
"drop-mnode",
"drop-mnode-rsp",
"create-dnode",
"create-dnode-rsp", //70
"kill-query", "drop-dnode",
"kill-query-rsp", "drop-dnode-rsp",
"kill-stream", "alter-dnode",
"kill-stream-rsp", "alter-dnode-rsp",
"kill-connection", "create-db",
"kill-connectoin-rsp", // 78 "create-db-rsp",
"alter-stream", "drop-db",
"alter-stream-rsp", "drop-db-rsp",
"alter-table", "use-db",
"alter-table-rsp", "use-db-rsp", //80
"", "alter-db",
"", "alter-db-rsp",
"", "create-table",
"", "create-table-rsp",
"", "drop-table",
"", "drop-table-rsp",
"", "alter-table",
"", "alter-table-rsp",
"cfg-vnode",
"cfg-vnode-rsp", //90
"heart-beat", // 91 "cfg-table",
"heart-beat-rsp", "cfg-table-rsp",
"status", "table-meta",
"status-rsp", "table-meta-rsp",
"grant", "super-table-meta",
"grant-rsp", "super-stable-meta-rsp",
"alter-acct", "multi-table-meta",
"alter-acct-rsp", "multi-table-meta-rsp",
"invalid"}; "alter-stream",
"alter-stream-rsp", //100
"show",
"show-rsp",
"cfg-mnode",
"cfg-mnode-rsp",
"kill-query",
"kill-query-rsp",
"kill-stream",
"kill-stream-rsp",
"kill-connection",
"kill-connectoin-rsp", //110
"heart-beat",
"heart-beat-rsp",
"status",
"status-rsp",
"grant",
"grant-rsp",
"max"
};
\ No newline at end of file
...@@ -25,7 +25,7 @@ typedef struct _sched_msg { ...@@ -25,7 +25,7 @@ typedef struct _sched_msg {
void (*tfp)(void *, void *); void (*tfp)(void *, void *);
char *msg; int8_t *msg;
void *ahandle; void *ahandle;
void *thandle; void *thandle;
} SSchedMsg; } SSchedMsg;
......
...@@ -4,4 +4,8 @@ project(tsdb) ...@@ -4,4 +4,8 @@ project(tsdb)
add_subdirectory(common) add_subdirectory(common)
add_subdirectory(tsdb) add_subdirectory(tsdb)
\ No newline at end of file
enable_testing()
add_subdirectory(tests)
\ No newline at end of file
...@@ -46,6 +46,7 @@ typedef char * SDataCols; ...@@ -46,6 +46,7 @@ typedef char * SDataCols;
// ----------------- Data column structure // ----------------- Data column structure
// ---- operation on SDataRow; // ---- operation on SDataRow;
#define TD_DATA_ROW_HEADER_SIZE sizeof(int32_t)
#define TD_DATAROW_LEN(pDataRow) (*(int32_t *)(pDataRow)) #define TD_DATAROW_LEN(pDataRow) (*(int32_t *)(pDataRow))
#define TD_DATAROW_DATA(pDataRow) ((pDataRow) + sizeof(int32_t)) #define TD_DATAROW_DATA(pDataRow) ((pDataRow) + sizeof(int32_t))
...@@ -63,5 +64,9 @@ typedef char * SDataCols; ...@@ -63,5 +64,9 @@ typedef char * SDataCols;
#define TD_DATACOLS_NPOINTS(pDataCols) (*(int32_t *)(pDataCols + sizeof(int32_t))) #define TD_DATACOLS_NPOINTS(pDataCols) (*(int32_t *)(pDataCols + sizeof(int32_t)))
// ---- // ----
/**
* Get the maximum
*/
int32_t tdGetMaxDataRowSize(SSchema *pSchema);
#endif // _TD_DATA_FORMAT_H_ #endif // _TD_DATA_FORMAT_H_
#include <stdlib.h> #include <stdlib.h>
#include "dataformat.h" #include "dataformat.h"
int32_t tdGetMaxDataRowSize(SSchema *pSchema) {
int32_t nbytes = 0;
for (int32_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++)
{
SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i);
td_datatype_t type = TD_COLUMN_TYPE(pCol);
nbytes += rowDataLen[type];
switch (type)
{
case TD_DATATYPE_VARCHAR:
nbytes += TD_COLUMN_BYTES(pCol);
break;
case TD_DATATYPE_NCHAR:
nbytes += 4 * TD_COLUMN_BYTES(pCol);
break;
case TD_DATATYPE_BINARY:
nbytes += TD_COLUMN_BYTES(pCol);
break;
}
}
nbytes += TD_DATA_ROW_HEADER_SIZE;
return nbytes;
}
\ No newline at end of file
add_subdirectory(common)
add_subdirectory(tsdb)
\ No newline at end of file
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
message(STATUS "COMMON: ${SOURCE_LIST}")
add_executable(commonTests ${SOURCE_LIST})
target_link_libraries(commonTests gtest gtest_main pthread common)
add_test(
NAME
unit
COMMAND
${CMAKE_CURRENT_BINARY_DIR}/commonTests
)
\ No newline at end of file
#include <gtest/gtest.h>
#include "dataformat.h"
TEST(commonDataTests, createDataRow) {
EXPECT_EQ(1, 2/2);
}
\ No newline at end of file
#include <gtest/gtest.h>
#include <stdlib.h>
#include <stdio.h>
#include "schema.h"
TEST(commonSchemaTests, createSchema) {
EXPECT_EQ(1, 2/2);
}
\ No newline at end of file
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
message(STATUS "TSDB: ${SOURCE_LIST}")
add_executable(tsdbTests ${SOURCE_LIST})
target_link_libraries(tsdbTests gtest gtest_main pthread tsdb)
add_test(
NAME
unit
COMMAND
${CMAKE_CURRENT_BINARY_DIR}/tsdbTests
)
\ No newline at end of file
#include <gtest/gtest.h>
#include <stdlib.h>
#include "tsdb.h"
TEST(TsdbTest, createTsdbRepo) {
STSDBCfg *pCfg = (STSDBCfg *)malloc(sizeof(STSDBCfg));
pCfg->rootDir = "/var/lib/taos/";
int32_t err_num = 0;
tsdb_repo_t *pRepo = tsdbCreateRepo(pCfg, &err_num);
ASSERT_EQ(pRepo, NULL);
}
\ No newline at end of file
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
typedef struct { typedef struct {
int64_t skey; // start key int64_t skey; // start key
int64_t ekey; // end key int64_t ekey; // end key
int32_t numOfRows // numOfRows int32_t numOfRows; // numOfRows
} STableCacheInfo; } STableCacheInfo;
typedef struct _tsdb_cache_block { typedef struct _tsdb_cache_block {
......
#include <pthread.h> #include <pthread.h>
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
// #include "taosdef.h" // #include "taosdef.h"
// #include "disk.h" // #include "disk.h"
#include "tsdbFile.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbCache.h" #include "tsdbCache.h"
#include "tsdbMeta.h" #include "tsdbMeta.h"
...@@ -33,6 +38,11 @@ typedef struct STSDBRepo { ...@@ -33,6 +38,11 @@ typedef struct STSDBRepo {
// Check the correctness of the TSDB configuration // Check the correctness of the TSDB configuration
static int32_t tsdbCheckCfg(STSDBCfg *pCfg) { static int32_t tsdbCheckCfg(STSDBCfg *pCfg) {
if (pCfg->rootDir == NULL) return -1;
if (access(pCfg->rootDir, F_OK|R_OK|W_OK) == -1) {
return -1;
}
// TODO // TODO
return 0; return 0;
} }
...@@ -42,6 +52,7 @@ tsdb_repo_t *tsdbCreateRepo(STSDBCfg *pCfg, int32_t *error) { ...@@ -42,6 +52,7 @@ tsdb_repo_t *tsdbCreateRepo(STSDBCfg *pCfg, int32_t *error) {
err = tsdbCheckCfg(pCfg); err = tsdbCheckCfg(pCfg);
if (err != 0) { if (err != 0) {
// TODO: deal with the error here // TODO: deal with the error here
return NULL;
} }
STSDBRepo *pRepo = (STSDBRepo *)malloc(sizeof(STSDBRepo)); STSDBRepo *pRepo = (STSDBRepo *)malloc(sizeof(STSDBRepo));
...@@ -65,6 +76,12 @@ tsdb_repo_t *tsdbCreateRepo(STSDBCfg *pCfg, int32_t *error) { ...@@ -65,6 +76,12 @@ tsdb_repo_t *tsdbCreateRepo(STSDBCfg *pCfg, int32_t *error) {
return NULL; return NULL;
} }
// Create the Meta data file and data directory
char *pTsdbMetaFName = tsdbGetFileName(pCfg->rootDir, "tsdb", TSDB_FILE_TYPE_META);
// int fd = open(pTsdbMetaFName, )
// if (open)
return (tsdb_repo_t *)pRepo; return (tsdb_repo_t *)pRepo;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册