提交 7041b747 编写于 作者: S slguan

add code to dnode module

上级 311b40b6
......@@ -68,6 +68,4 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) {
}
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
}
......@@ -22,6 +22,7 @@
#include "trpc.h"
#include "tstatus.h"
#include "tsdb.h"
#include "ttimer.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "dnodeWrite.h"
......@@ -52,8 +53,11 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void * tsDnodeVnodesHash = NULL;
static void *tsDnodeTmr = NULL;
static void *tsStatusTimer = NULL;
int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
......@@ -68,10 +72,22 @@ int32_t dnodeInitMgmt() {
return -1;
}
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
return -1;
}
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
return dnodeOpenVnodes();
}
void dnodeCleanupMgmt() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
dnodeCleanupVnodes();
taosCleanUpIntHash(tsDnodeVnodesHash);
}
......@@ -309,9 +325,78 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
}
static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
// SDAlterStreamMsg *pStream = pCont;
// pStream->uid = htobe64(pStream->uid);
// pStream->stime = htobe64(pStream->stime);
// pStream->vnode = htonl(pStream->vnode);
// pStream->sid = htonl(pStream->sid);
// pStream->status = htonl(pStream->status);
//
// int32_t code = dnodeCreateStream(pStream);
}
static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
// SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
//
// int32_t code = tsCfgDynamicOptions(pCfg->config);
// dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
}
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
if (tsStatusTimer == NULL) {
dError("failed to start status timer");
return;
}
// int32_t contLen = sizeof(SStatusMsg) + dnodeGetVnodesNum() * sizeof(SVnodeLoad);
// SStatusMsg *pStatus = rpcMallocCont(contLen);
// if (pStatus == NULL) {
// dError("Failed to malloc status message");
// return;
// }
//
// int32_t totalVnodes = dnodeGetVnodesNum();
//
// pStatus->version = htonl(tsVersion);
// pStatus->privateIp = htonl(inet_addr(tsPrivateIp));
// pStatus->publicIp = htonl(inet_addr(tsPublicIp));
// pStatus->lastReboot = htonl(tsRebootTime);
// pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
// pStatus->openVnodes = htons((uint16_t) totalVnodes);
// pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
// pStatus->diskAvailable = tsAvailDataDirGB;
// pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
//
// SVnodeLoad *pLoad = (SVnodeLoad *)pStatus->load;
//TODO loop all vnodes
// for (int32_t vnode = 0, count = 0; vnode <= totalVnodes; ++vnode) {
// if (vnodeList[vnode].cfg.maxSessions <= 0) continue;
//
// SVnodeObj *pVnode = vnodeList + vnode;
// pLoad->vnode = htonl(vnode);
// pLoad->vgId = htonl(pVnode->cfg.vgId);
// pLoad->status = (uint8_t)vnodeList[vnode].vnodeStatus;
// pLoad->syncStatus =(uint8_t)vnodeList[vnode].syncStatus;
// pLoad->accessState = (uint8_t)(pVnode->accessState);
// pLoad->totalStorage = htobe64(pVnode->vnodeStatistic.totalStorage);
// pLoad->compStorage = htobe64(pVnode->vnodeStatistic.compStorage);
// if (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) {
// pLoad->pointsWritten = htobe64(pVnode->vnodeStatistic.pointsWritten);
// } else {
// pLoad->pointsWritten = htobe64(0);
// }
// pLoad++;
//
// if (++count >= tsOpenVnodes) {
// break;
// }
// }
// dnodeSendMsgToMnode(TSDB_MSG_TYPE_STATUS, pStatus, contLen);
}
......@@ -203,9 +203,43 @@ static void dnodeProcessReadResult(SReadMsg *pRead) {
}
static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
void *pQInfo = (void*)100;
dTrace("query msg is disposed, qInfo:%p", pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = 0;
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
.code = 0,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = htobe64(pRetrieve->qhandle);
dTrace("retrieve msg is disposed, qInfo:%p", pQInfo);
assert(pQInfo != NULL);
int32_t contLen = 100;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *) rpcMallocCont(contLen);
pRsp->numOfRows = 0;
pRsp->precision = 0;
pRsp->offset = 0;
pRsp->useconds = 0;
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = contLen,
.code = 0,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
}
......@@ -27,7 +27,10 @@
static void (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg);
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void *tsDnodeShellRpc = NULL;
static int32_t tsDnodeQueryReqNum = 0;
static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeWrite;
......@@ -50,6 +53,7 @@ int32_t dnodeInitShell() {
rpcInit.sessions = TSDB_SESSIONS_PER_DNODE;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1500;
rpcInit.afp =
tsDnodeShellRpc = rpcOpen(&rpcInit);
if (tsDnodeShellRpc == NULL) {
......@@ -64,12 +68,12 @@ int32_t dnodeInitShell() {
void dnodeCleanupShell() {
if (tsDnodeShellRpc) {
rpcClose(tsDnodeShellRpc);
tsDnodeShellRpc = NULL;
}
}
void dnodeProcessMsgFromShell(SRpcMsg *pMsg) {
SRpcMsg rpcMsg;
rpcMsg.handle = pMsg->handle;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
......@@ -82,6 +86,12 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) {
return;
}
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
atomic_fetch_add_32(&tsDnodeQueryReqNum, 1);
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1);
} else {}
if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
(*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
} else {
......@@ -92,4 +102,17 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) {
}
}
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return TSDB_CODE_SUCCESS;
}
SDnodeStatisInfo dnodeGetStatisInfo() {
SDnodeStatisInfo info = {0};
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
//info.httpReqNum = httpGetReqCount();
info.queryReqNum = atomic_exchange_32(&tsDnodeQueryReqNum, 0);
info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0);
}
return info;
}
......@@ -247,21 +247,98 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
}
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
dTrace("submit msg is disposed");
SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg));
pRsp->code = 0;
pRsp->numOfRows = htonl(1);
pRsp->affectedRows = htonl(1);
pRsp->numOfFailedBlocks = 0;
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SShellSubmitRspMsg),
.code = 0,
.msgType = 0
};
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
if (pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) {
dTrace("table:%s, start to create child table, stable:%s", pTable->tableId, pTable->superTableId);
} else if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE){
dTrace("table:%s, start to create normal table", pTable->tableId);
} else if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE){
dTrace("table:%s, start to create stream table", pTable->tableId);
} else {
dError("table:%s, invalid table type:%d", pTable->tableType);
}
// pTable->numOfColumns = htons(pTable->numOfColumns);
// pTable->numOfTags = htons(pTable->numOfTags);
// pTable->sid = htonl(pTable->sid);
// pTable->sversion = htonl(pTable->sversion);
// pTable->tagDataLen = htonl(pTable->tagDataLen);
// pTable->sqlDataLen = htonl(pTable->sqlDataLen);
// pTable->contLen = htonl(pTable->contLen);
// pTable->numOfVPeers = htonl(pTable->numOfVPeers);
// pTable->uid = htobe64(pTable->uid);
// pTable->superTableUid = htobe64(pTable->superTableUid);
// pTable->createdTime = htobe64(pTable->createdTime);
//
// for (int i = 0; i < pTable->numOfVPeers; ++i) {
// pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
// pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
// }
//
// int32_t totalCols = pTable->numOfColumns + pTable->numOfTags;
// SSchema *pSchema = (SSchema *) pTable->data;
// for (int32_t col = 0; col < totalCols; ++col) {
// pSchema->bytes = htons(pSchema->bytes);
// pSchema->colId = htons(pSchema->colId);
// pSchema++;
// }
//
// int32_t code = dnodeCreateTable(pTable);
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont;
dPrint("table:%s, sid:%d is dropped", pTable->tableId, pTable->sid);
// pTable->sid = htonl(pTable->sid);
// pTable->numOfVPeers = htonl(pTable->numOfVPeers);
// pTable->uid = htobe64(pTable->uid);
//
// for (int i = 0; i < pTable->numOfVPeers; ++i) {
// pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
// pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
// }
//
// int32_t code = dnodeDropTable(pTable);
//
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
dPrint("table:%s, sid:%d is alterd", pTable->tableId, pTable->sid);
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
rpcSendResponse(&rpcRsp);
}
static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) {
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
dPrint("stable:%s, is dropped", pTable->tableId);
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
rpcSendResponse(&rpcRsp);
}
......@@ -259,7 +259,7 @@ typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
char data[];
} SDMCreateTableMsg;
} SMDCreateTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
......@@ -346,7 +346,7 @@ typedef struct {
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int64_t uid;
} SDRemoveSuperTableMsg;
} SMDDropSTableMsg;
typedef struct {
int32_t vgId;
......
......@@ -31,7 +31,7 @@ void mgmtCleanUpChildTables();
void * mgmtGetChildTable(char *tableId);
int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SDMCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
......
......@@ -27,7 +27,7 @@ void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
//extern void *mgmtStatusTimer;
//
//void mgmtSendCreateTableMsg(SDMCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendCreateTableMsg(SMDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendDropTableMsg(SMDDropTableMsg *pRemove, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
//void mgmtSendDropVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
......
......@@ -29,7 +29,7 @@ void mgmtCleanUpNormalTables();
void * mgmtGetNormalTable(char *tableId);
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SDMCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
......
......@@ -42,7 +42,7 @@ void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
void mgmtSetTableDirty(STableInfo *pTable, bool isDirty);
SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
SMDDropSTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle);
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle, bool isGetMeta);
......
......@@ -274,9 +274,9 @@ void mgmtCleanUpChildTables() {
static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, void *pTagData, int32_t tagDataLen) {
int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
int32_t contLen = sizeof(SDMCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
SDMCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
SMDCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
if (pCreateTable == NULL) {
return NULL;
}
......@@ -309,13 +309,13 @@ static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgrou
pSchema++;
}
memcpy(pCreateTable + sizeof(SDMCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen);
memcpy(pCreateTable + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTagData, tagDataLen);
return pCreateTable;
}
int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SDMCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= tsMaxTables) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables);
......
......@@ -289,9 +289,9 @@ void mgmtCleanUpNormalTables() {
static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
int32_t totalCols = pTable->numOfColumns;
int32_t contLen = sizeof(SDMCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
SDMCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
SMDCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
if (pCreateTable == NULL) {
return NULL;
}
......@@ -323,13 +323,13 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr
pSchema++;
}
memcpy(pCreateTable + sizeof(SDMCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
memcpy(pCreateTable + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
return pCreateTable;
}
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SDMCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
SMDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb);
if (numOfTables >= TSDB_MAX_NORMAL_TABLES) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES);
......
......@@ -175,7 +175,7 @@ void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *th
mgmtSendCreateVgroupMsg(pVgroup, info);
}
//void mgmtSendCreateTableMsg(SDMCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
//void mgmtSendCreateTableMsg(SMDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle);
// SRpcMsg rpcMsg = {
// .handle = ahandle,
......@@ -201,7 +201,7 @@ void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t c
}
STableInfo *pTable;
SDMCreateTableMsg *pDCreate = NULL;
SMDCreateTableMsg *pDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
......
......@@ -15,5 +15,5 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES(tsdb common tutil)
# Someone has no gtest directory, so comment it
ADD_SUBDIRECTORY(tests)
#ADD_SUBDIRECTORY(tests)
ENDIF ()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册