提交 a05776b0 编写于 作者: S slguan

create table msg sync process to dnode

上级 aa84e61e
......@@ -38,7 +38,7 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe
* Create table with specified configuration and open it
* if table already exist, update its schema and tag
*/
int32_t dnodeCreateTable(SDCreateTableMsg *table);
int32_t dnodeCreateTable(SDCreateTableMsg *pTable);
/*
* Remove table from local repository
......
......@@ -33,7 +33,7 @@ void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgTyp
void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL;
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn);
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn);
static void dnodeInitProcessShellMsg();
static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) {
......@@ -41,17 +41,20 @@ static void dnodeSendMsgToMnodeQueueFp(SSchedMsg *sched) {
int32_t code = *(int32_t *) (sched->msg - 8);
int8_t msgType = *(int8_t *) (sched->msg - 9);
void *handle = sched->ahandle;
int8_t *pCont = sched->msg;
int8_t *pCont = sched->msg;
mgmtProcessMsgFromDnode(msgType, pCont, contLen, handle, code);
rpcFreeCont(sched->msg);
}
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) {
dTrace("msg:%s is sent to mnode", taosMsg[msgType]);
dTrace("msg:%d:%s is sent to mnode", msgType, taosMsg[msgType]);
if (dnodeSendMsgToMnodeFp) {
dnodeSendMsgToMnodeFp(msgType, pCont, contLen);
} else {
if (pCont == NULL) {
pCont = rpcMallocCont(1);
contLen = 0;
}
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
schedMsg.msg = pCont;
......@@ -63,13 +66,19 @@ void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen) {
}
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) {
dTrace("rsp:%s is sent to mnode", taosMsg[msgType]);
dTrace("rsp:%d:%s is sent to mnode, pConn:%p", msgType, taosMsg[msgType], pConn);
if (dnodeSendRspToMnodeFp) {
dnodeSendRspToMnodeFp(pConn, code, pCont, contLen);
} else {
//hack way
if (pCont == NULL) {
pCont = rpcMallocCont(1);
contLen = 0;
}
SSchedMsg schedMsg = {0};
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
schedMsg.msg = pCont;
schedMsg.fp = dnodeSendMsgToMnodeQueueFp;
schedMsg.msg = pCont;
schedMsg.ahandle = pConn;
*(int32_t *) (pCont - 4) = contLen;
*(int32_t *) (pCont - 8) = code;
*(int8_t *) (pCont - 9) = msgType;
......@@ -95,65 +104,79 @@ void dnodeInitMgmtIp() {
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
dError("invalid msg type:%d", msgType);
return;
}
dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[msgType], pConn);
if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) {
dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn);
}
if (dnodeProcessMgmtMsgFp[msgType]) {
(*dnodeProcessMgmtMsgFp[msgType])(pCont, contLen, msgType, pConn);
} else {
if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) {
dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn);
}
if (dnodeProcessMgmtMsgFp[msgType]) {
(*dnodeProcessMgmtMsgFp[msgType])(pCont, contLen, msgType, pConn);
} else {
dError("%s is not processed", taosMsg[msgType]);
}
dError("%s is not processed", taosMsg[msgType]);
}
//rpcFreeCont(pCont);
}
int32_t dnodeProcessTableCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
void dnodeProcessTableCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
int32_t code = htonl(*((int32_t *) pCont));
if (code == TSDB_CODE_SUCCESS) {
SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t));
return dnodeCreateTable(table);
dnodeCreateTable(table);
} else if (code == TSDB_CODE_INVALID_TABLE_ID) {
SDRemoveTableMsg *table = (SDRemoveTableMsg *) (pCont + sizeof(int32_t));
int32_t vnode = htonl(table->vnode);
int32_t sid = htonl(table->sid);
uint64_t uid = htobe64(table->uid);
dError("vnode:%d, sid:%d table is not configured, remove it", vnode, sid);
return dnodeDropTable(vnode, sid, uid);
dnodeDropTable(vnode, sid, uid);
} else {
dError("code:%d invalid message", code);
return TSDB_CODE_INVALID_MSG;
}
}
int32_t dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
void dnodeProcessCreateTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SDCreateTableMsg *pTable = (SDCreateTableMsg *) pCont;
pTable->vnode = htonl(pTable->vnode);
pTable->sid = htonl(pTable->sid);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->tableType = htonl(pTable->tableType);
pTable->sversion = htonl(pTable->sversion);
pTable->numOfColumns = htons(pTable->numOfColumns);
pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
pTable->sversion = htonl(pTable->sversion);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->contLen = htonl(pTable->contLen);
pTable->numOfVPeers = htonl(pTable->numOfVPeers);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->createdTime = htobe64(pTable->createdTime);
for (int i = 0; i < pTable->numOfVPeers; ++i) {
pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
}
int32_t totalCols = pTable->numOfColumns + pTable->numOfTags;
SSchema *pSchema = (SSchema *) pTable->data;
for (int32_t col = 0; col < totalCols; ++col) {
pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = htons(pSchema->colId);
pSchema++;
}
int32_t code = dnodeCreateTable(pTable);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
int32_t dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
void dnodeProcessAlterStreamRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont;
int32_t code = dnodeCreateStream(stream);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
void dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SDRemoveTableMsg *table = (SDRemoveTableMsg *) pCont;
int32_t vnode = htonl(table->vnode);
int32_t sid = htonl(table->sid);
......@@ -162,28 +185,26 @@ int32_t dnodeProcessRemoveTableRequest(int8_t *pCont, int32_t contLen, int8_t ms
dPrint("vnode:%d, sid:%d table is not configured, remove it", vnode, sid);
int32_t code = dnodeDropTable(vnode, sid, uid);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
int32_t dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
void dnodeProcessVPeerCfgRsp(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
int32_t code = htonl(*((int32_t *) pCont));
if (code == TSDB_CODE_SUCCESS) {
SVPeersMsg *vpeer = (SVPeersMsg *) (pCont + sizeof(int32_t));
int32_t vnode = htonl(vpeer->vnode);
return dnodeCreateVnode(vnode, vpeer);
dnodeCreateVnode(vnode, vpeer);
} else if (code == TSDB_CODE_INVALID_VNODE_ID) {
SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) (pCont + sizeof(int32_t));
int32_t vnode = htonl(vpeer->vnode);
dError("vnode:%d, not exist, remove it", vnode);
return dnodeDropVnode(vnode);
dnodeDropVnode(vnode);
} else {
dError("code:%d invalid message", code);
return TSDB_CODE_INVALID_MSG;
}
}
int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
void dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SVPeersMsg *vpeer = (SVPeersMsg *) pCont;
int32_t vnode = htonl(vpeer->vnode);
......@@ -191,10 +212,9 @@ int32_t dnodeProcessVPeersMsg(int8_t *pCont, int32_t contLen, int8_t msgType, vo
int32_t code = dnodeCreateVnode(vnode, vpeer);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
void dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SFreeVnodeMsg *vpeer = (SFreeVnodeMsg *) pCont;
int32_t vnode = htonl(vpeer->vnode);
......@@ -202,15 +222,12 @@ int32_t dnodeProcessFreeVnodeRequest(int8_t *pCont, int32_t contLen, int8_t msgT
int32_t code = dnodeDropVnode(vnode);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
int32_t dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
void dnodeProcessDnodeCfgRequest(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) {
SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont;
int32_t code = tsCfgDynamicOptions(pCfg->config);
dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0);
return code;
}
void dnodeSendVpeerCfgMsg(int32_t vnode) {
......
......@@ -17,6 +17,7 @@
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "tutil.h"
#include "dnodeWrite.h"
#include "dnodeVnodeMgmt.h"
......@@ -33,7 +34,34 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe
//TODO: submit implementation
}
int32_t dnodeCreateTable(SDCreateTableMsg *table) {
int32_t dnodeCreateTable(SDCreateTableMsg *pTable) {
if (pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) {
dTrace("table:%s, start to create child table, stable:%s", pTable->tableId, pTable->superTableId);
} else if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE){
dTrace("table:%s, start to create normal table", pTable->tableId);
} else if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE){
dTrace("table:%s, start to create stream table", pTable->tableId);
} else {
dError("table:%s, invalid table type:%d", pTable->tableType);
}
for (int i = 0; i < pTable->numOfVPeers; ++i) {
dTrace("table:%s ip:%s vnode:%d sid:%d", pTable->tableId, taosIpStr(pTable->vpeerDesc[i].ip),
pTable->vpeerDesc[i].vnode, pTable->sid);
}
SSchema *pSchema = (SSchema *) pTable->data;
for (int32_t col = 0; col < pTable->numOfColumns; ++col) {
dTrace("table:%s col index:%d colId:%d bytes:%d type:%d name:%s",
pTable->tableId, col, pSchema->colId, pSchema->bytes, pSchema->type, pSchema->name);
pSchema++;
}
for (int32_t col = 0; col < pTable->numOfTags; ++col) {
dTrace("table:%s tag index:%d colId:%d bytes:%d type:%d name:%s",
pTable->tableId, col, pSchema->colId, pSchema->bytes, pSchema->type, pSchema->name);
pSchema++;
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -100,6 +100,7 @@ typedef struct {
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
......@@ -111,6 +112,7 @@ struct _vg_obj;
typedef struct SSuperTableObj {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
int8_t dirty;
uint64_t uid;
int32_t sid;
int32_t vgId;
......
......@@ -230,21 +230,27 @@ typedef struct SSchema {
} SSchema;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
uint64_t superTableUid;
int32_t tableType;
int32_t sversion;
int16_t numOfColumns;
int16_t numOfTags;
int32_t tagDataLen;
int32_t sqlDataLen;
int32_t contLen;
uint64_t createdTime;
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
char data[];
int32_t vnode; //the index of vnode
uint32_t ip;
} SVPeerDesc;
typedef struct {
int8_t tableType;
int16_t numOfColumns;
int16_t numOfTags;
int32_t sid;
int32_t sversion;
int32_t tagDataLen;
int32_t sqlDataLen;
int32_t contLen;
int32_t numOfVPeers;
uint64_t uid;
uint64_t superTableUid;
uint64_t createdTime;
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS];
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
char data[];
} SDCreateTableMsg;
typedef struct {
......@@ -607,12 +613,6 @@ typedef struct {
char cipheringKey[TSDB_KEY_LEN];
} SSecIe;
typedef struct {
uint32_t dnode; //the ip of dnode
int32_t vnode; //the index of vnode
uint32_t ip;
} SVPeerDesc;
typedef struct {
int32_t numOfVPeers;
SVPeerDesc vpeerDesc[];
......
......@@ -31,11 +31,11 @@ void mgmtCleanUpChildTables();
void * mgmtGetChildTable(char *tableId);
int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
SCreateTableMsg *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable);
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *pMeta, bool usePublicIp);
......
......@@ -26,11 +26,11 @@ extern "C" {
extern void *mgmtStatusTimer;
void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendAlterStreamMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *table_info);
void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle);
......
......@@ -29,7 +29,7 @@ void mgmtCleanUpNormalTables();
void * mgmtGetNormalTable(char *tableId);
int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid, SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut);
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
......
......@@ -28,13 +28,13 @@ int32_t mgmtInitShell();
void mgmtCleanUpShell();
extern int32_t (*mgmtCheckRedirectMsg)(void *pConn);
extern int32_t (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
#ifdef __cplusplus
}
......
......@@ -31,7 +31,7 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp);
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate, void *thandle);
void mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle);
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore);
int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
......@@ -40,8 +40,8 @@ void mgmtCleanUpMeters();
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
void mgmtSetTableDirty(STableInfo *pTable, bool isDirty);
SDCreateTableMsg *mgmtBuildCreateTableMsg(STableInfo *pTable);
SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable);
SDRemoveSuperTableMsg *mgmtBuildRemoveSuperTableMsg(STableInfo *pTable);
......
......@@ -37,7 +37,7 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtSetVgroupIdPool();
SVgObj *mgmtGetAvailVgroup(SDbObj *pDb, int32_t *sid);
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, STableInfo *pTable);
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, STableInfo *pTable);
......
......@@ -274,38 +274,50 @@ void mgmtCleanUpChildTables() {
sdbCloseTable(tsChildTableSdb);
}
SCreateTableMsg *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable) {
// SCreateTableMsg *pCreateTable = (SCreateTableMsg *) pMsg;
// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
// memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN);
// pCreateTable->vnode = htonl(vnode);
// pCreateTable->sid = htonl(pTable->sid);
// pCreateTable->uid = pTable->uid;
// pCreateTable->createdTime = htobe64(pTable->createdTime);
// pCreateTable->sversion = htonl(pTable->superTable->sversion);
// pCreateTable->numOfColumns = htons(pTable->superTable->numOfColumns);
// pCreateTable->numOfTags = htons(pTable->superTable->numOfTags);
//
// SSchema *pSchema = pTable->superTable->schema;
// int32_t totalCols = pCreateTable->numOfColumns + pCreateTable->numOfTags;
//
// for (int32_t col = 0; col < totalCols; ++col) {
// SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
// colData->type = pSchema[col].type;
// colData->bytes = htons(pSchema[col].bytes);
// colData->colId = htons(pSchema[col].colId);
// }
//
// int32_t totalColsSize = sizeof(SMColumn *) * totalCols;
// pMsg = pCreateTable->data + totalColsSize + tagDataLen;
//
// memcpy(pCreateTable->data + totalColsSize, pTagData, tagDataLen);
// pCreateTable->tagDataLen = htonl(tagDataLen);
static void *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, void *pTagData, int32_t tagDataLen) {
int32_t totalCols = pTable->superTable->numOfColumns + pTable->superTable->numOfTags;
int32_t contLen = sizeof(SCreateTableMsg) + totalCols * sizeof(SSchema) + tagDataLen;
return NULL;
SDCreateTableMsg *pCreateTable = rpcMallocCont(contLen);
if (pCreateTable == NULL) {
return NULL;
}
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN);
pCreateTable->tableType = pTable->type;
pCreateTable->numOfColumns = htons(pTable->superTable->numOfColumns);
pCreateTable->numOfTags = htons(pTable->superTable->numOfTags);
pCreateTable->sid = htonl(pTable->sid);
pCreateTable->sversion = htonl(pTable->superTable->sversion);
pCreateTable->tagDataLen = htonl(tagDataLen);
pCreateTable->sqlDataLen = 0;
pCreateTable->contLen = htonl(contLen);
pCreateTable->numOfVPeers = htonl(pVgroup->numOfVnodes);
pCreateTable->uid = htobe64(pTable->uid);
pCreateTable->superTableUid = htobe64(pTable->superTable->uid);
pCreateTable->createdTime = htobe64(pTable->createdTime);
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
pCreateTable->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip);
pCreateTable->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
SSchema *pSchema = (SSchema *) pCreateTable->data;
memcpy(pSchema, pTable->superTable->schema, totalCols * sizeof(SSchema));
for (int32_t col = 0; col < totalCols; ++col) {
pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = htons(pSchema->colId);
pSchema++;
}
memcpy(pCreateTable->data + totalCols * sizeof(SSchema), pTagData, tagDataLen);
return pCreateTable;
}
int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= tsMaxTables) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, tsMaxTables);
......@@ -321,6 +333,7 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr
SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1);
if (pTable == NULL) {
mError("table:%s, failed to alloc memory", pCreate->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pTable->tableId, pCreate->tableId);
......@@ -333,21 +346,23 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
int32_t size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t) TSDB_TABLE_ID_LEN;
SSchema * schema = (SSchema *) calloc(1, size);
if (schema == NULL) {
free(pTable);
mError("table:%s, corresponding super table schema is null", pCreate->tableId);
return TSDB_CODE_INVALID_TABLE;
}
if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->tableId);
return TSDB_CODE_SDB_ERROR;
}
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
pTagData += (TSDB_TABLE_ID_LEN + 1);
int32_t tagDataLen = contLen - sizeof(SCreateTableMsg) - TSDB_TABLE_ID_LEN - 1;
*pDCreateOut = mgmtBuildCreateChildTableMsg(pTable, pVgroup, pTagData, tagDataLen);
if (*pDCreateOut == NULL) {
mError("table:%s, failed to build create table message", pCreate->tableId);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
*pTableOut = pTable;
mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
return TSDB_CODE_SUCCESS;
}
......
......@@ -42,14 +42,17 @@ static void mgmtSendMsgToDnodeQueueFp(SSchedMsg *sched) {
int8_t *pCont = sched->msg;
dnodeProcessMsgFromMgmt(msgType, pCont, contLen, ahandle, code);
rpcFreeCont(sched->msg);
}
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t contLen, void *ahandle) {
mTrace("msg:%s is sent to dnode", taosMsg[msgType]);
mTrace("msg:%d:%s is sent to dnode, ahandle:%p", msgType, taosMsg[msgType], ahandle);
if (mgmtSendMsgToDnodeFp) {
mgmtSendMsgToDnodeFp(ipSet, msgType, pCont, contLen, ahandle);
} else {
if (pCont == NULL) {
pCont = rpcMallocCont(1);
contLen = 0;
}
SSchedMsg schedMsg = {0};
schedMsg.fp = mgmtSendMsgToDnodeQueueFp;
schedMsg.msg = pCont;
......@@ -62,10 +65,14 @@ void mgmtSendMsgToDnode(SRpcIpSet *ipSet, int8_t msgType, void *pCont, int32_t c
}
void mgmtSendRspToDnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen) {
mTrace("rsp:%s is sent to dnode", taosMsg[msgType]);
mTrace("rsp:%d:%s is sent to dnode", msgType, taosMsg[msgType]);
if (mgmtSendRspToDnodeFp) {
mgmtSendRspToDnodeFp(pConn, code, pCont, contLen);
} else {
if (pCont == NULL) {
pCont = rpcMallocCont(1);
contLen = 0;
}
SSchedMsg schedMsg = {0};
schedMsg.fp = mgmtSendMsgToDnodeQueueFp;
schedMsg.msg = pCont;
......@@ -126,16 +133,27 @@ static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe
}
static void mgmtProcessCreateTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
mTrace("create table rsp received, handle:%p code:%d", thandle, code);
mTrace("create table rsp received, thandle:%p code:%d", thandle, code);
if (thandle == NULL) return;
SProcessInfo *info = thandle;
if (info->type == TSDB_PROCESS_CREATE_TABLE) {
rpcSendResponse(info->thandle, code, NULL, 0);
assert(info->type == TSDB_PROCESS_CREATE_TABLE);
STableInfo *pTable = info->ahandle;
if (code != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId);
mgmtSetTableDirty(pTable, true);
} else {
mError("create table rsp received, handle:%p code:%d, invalid type:%d", info->type);
rpcSendResponse(info->thandle, TSDB_CODE_INVALID_MSG, NULL, 0);
mTrace("table:%s, created in dnode", pTable->tableId);
mgmtSetTableDirty(pTable, false);
}
rpcSendResponse(info->thandle, code, NULL, 0);
}
void mgmtSendCreateTableMsg(SDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), ahandle);
}
static void mgmtProcessRemoveTableRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *thandle, int32_t code) {
......@@ -147,89 +165,83 @@ static void mgmtProcessFreeVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contL
}
static void mgmtProcessCreateVnodeRsp(int8_t msgType, int8_t *pCont, int32_t contLen, void *ahandle, int32_t code) {
mTrace("create vnode received, vgroup_info:%p code:%d", ahandle, code);
if (ahandle == NULL) {
SProcessInfo *vgprocess = ahandle;
mTrace("create vnode received, vgprocess:%p code:%d", vgprocess, code);
if (vgprocess == NULL) {
rpcFreeCont(pCont);
return;
}
SProcessInfo *vgroup_info = ahandle;
assert(vgroup_info->type == TSDB_PROCESS_CREATE_VGROUP);
vgroup_info->received++;
SVgObj *pVgroup = vgroup_info->ahandle;
mTrace("vgroup:%d, received:%d numOfVnodes:%d table_info:%p",
pVgroup->vgId, vgroup_info->received, pVgroup->numOfVnodes, vgroup_info->thandle);
if (vgroup_info->received == pVgroup->numOfVnodes) {
SProcessInfo *table_info = vgroup_info->thandle;
assert(table_info->type == TSDB_PROCESS_CREATE_VGROUP_AND_TABLE);
STableInfo *pTable = table_info->ahandle;
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_TABLE;
info->thandle = table_info->thandle;
info->ahandle = pTable;
assert(vgprocess->type == TSDB_PROCESS_CREATE_VGROUP);
vgprocess->received++;
SVgObj *pVgroup = vgprocess->ahandle;
mTrace("vgroup:%d, received:%d numOfVnodes:%d vgprocess:%p tbprocess::%p",
pVgroup->vgId, vgprocess->received, pVgroup->numOfVnodes, vgprocess->thandle);
if (vgprocess->received == pVgroup->numOfVnodes) {
STableInfo *pTable = vgprocess->ahandle;
SProcessInfo *tbprocess = calloc(1, sizeof(SProcessInfo));
tbprocess->type = TSDB_PROCESS_CREATE_TABLE;
tbprocess->thandle = tbprocess->thandle;
tbprocess->ahandle = pTable;
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mgmtSendCreateTableMsg(pTable, &ipSet, info);
mgmtSendCreateTableMsg(pTable, &ipSet, tbprocess);
free(vgroup_info);
free(table_info);
free(vgprocess);
}
rpcFreeCont(pCont);
}
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *table_info) {
SProcessInfo *vgroup_info = calloc(1, sizeof(SProcessInfo));
vgroup_info->type = TSDB_PROCESS_CREATE_VGROUP;
vgroup_info->thandle = table_info;
vgroup_info->ahandle = pVgroup;
mTrace("vgroup:%d, send create all vnodes msg, table_info:%p vgroup_info:%p ", pVgroup->vgId, table_info, vgroup_info);
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *table_info) {
mTrace("vgroup:%d, send create all vnodes msg, table_info:%p", pVgroup->vgId, table_info);
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, vgroup_info);
mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, table_info);
}
}
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *vgroup_info) {
mTrace("vgroup:%d, send create vnode:%d msg, vgroup_info:%p", pVgroup->vgId, vnode, vgroup_info);
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *table_info) {
mTrace("vgroup:%d, send create vnode:%d msg, table_info:%p", pVgroup->vgId, vnode, table_info);
SVPeersMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode);
if (pVpeer != NULL) {
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), vgroup_info);
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_VPEERS, pVpeer, sizeof(SVPeersMsg), table_info);
}
}
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *handle, int32_t code) {
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
mError("invalid msg type:%d", msgType);
return;
}
mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[msgType], pConn);
if (msgType == TSDB_MSG_TYPE_TABLE_CFG) {
mgmtProcessTableCfgMsg(msgType, pCont, contLen, handle);
mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn);
} else if (msgType == TSDB_MSG_TYPE_VNODE_CFG) {
mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, handle);
mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, pConn);
} else if (msgType == TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP) {
mgmtProcessCreateTableRsp(msgType, pCont, contLen, handle, code);
mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP) {
mgmtProcessRemoveTableRsp(msgType, pCont, contLen, handle, code);
mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) {
mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, handle, code);
mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) {
mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, handle, code);
mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, pConn, code);
} else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) {
} else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
} else {
mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]);
}
}
void mgmtSendCreateTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
mTrace("table:%s, sid:%d send create table msg, handle:%p", pTable->tableId, pTable->sid);
SDCreateTableMsg *pCreate = mgmtBuildCreateTableMsg(pTable);
if (pCreate != NULL) {
mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_TABLE, pCreate, htonl(pCreate->contLen), ahandle);
}
//rpcFreeCont(pCont);
}
void mgmtSendRemoveTableMsg(STableInfo *pTable, SRpcIpSet *ipSet, void *ahandle) {
mTrace("table:%s, sid:%d send remove table msg, handle:%p", pTable->tableId, pTable->sid);
......
......@@ -319,7 +319,7 @@ int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) {
return NULL;
}
int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid, SDCreateTableMsg **pDCreateOut, STableInfo **pTableOut) {
int32_t numOfTables = sdbGetNumOfRows(tsNormalTableSdb);
if (numOfTables >= TSDB_MAX_NORMAL_TABLES) {
mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->tableId, numOfTables, TSDB_MAX_NORMAL_TABLES);
......@@ -373,8 +373,8 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
return TSDB_CODE_SDB_ERROR;
}
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 ,
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid);
return TSDB_CODE_SUCCESS;
}
......
此差异已折叠。
......@@ -111,67 +111,135 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo
return TSDB_CODE_SUCCESS;
}
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate, void *thandle) {
void mgmtProcessCreateVgroup(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) {
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) {
mError("table:%s, failed to create vgroup, db not found", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_INVALID_DB, NULL, 0);
return;
}
SVgObj *pVgroup = mgmtCreateVgroup(pDb);
if (pVgroup == NULL) {
mError("table:%s, failed to alloc vnode to vgroup", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_NO_ENOUGH_DNODES, NULL, 0);
return;
}
void *cont = rpcMallocCont(contLen);
if (cont == NULL) {
mError("table:%s, failed to create table, can not alloc memory", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return;
}
memcpy(cont, pCreate, contLen);
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_VGROUP;
info->thandle = thandle;
info->ahandle = pVgroup;
info->cont = cont;
info->contLen = contLen;
mgmtSendCreateVgroupMsg(pVgroup, info);
}
void mgmtProcessCreateTable(SVgObj *pVgroup, SCreateTableMsg *pCreate, int32_t contLen, void *thandle) {
assert(pVgroup != NULL);
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mTrace("table:%s, no enough sid in vgroup:%d, start to create a new vgroup", pCreate->tableId, pVgroup->vgId);
mgmtProcessCreateVgroup(pCreate, contLen, thandle);
return;
}
int32_t code;
STableInfo *pTable;
SDCreateTableMsg *pDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, start to create child table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
code = mgmtCreateChildTable(pCreate, contLen, pVgroup, sid, &pDCreate, &pTable);
} else {
mTrace("table:%s, start to create normal table, vgroup:%d sid:%d", pCreate->tableId, pVgroup->vgId, sid);
code = mgmtCreateNormalTable(pCreate, pVgroup, sid, &pDCreate, &pTable);
}
if (code != TSDB_CODE_SUCCESS) {
mTrace("table:%s, failed to create table in vgroup:%d sid:%d ", pCreate->tableId, pVgroup->vgId, sid);
rpcSendResponse(thandle, code, NULL, 0);
return;
}
assert(pDCreate != NULL);
assert(pTable != NULL);
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_TABLE;
info->ahandle = pTable;
info->thandle = thandle;
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mgmtSendCreateTableMsg(pDCreate, &ipSet, info);
}
void mgmtCreateTable(SCreateTableMsg *pCreate, int32_t contLen, void *thandle) {
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb == NULL) {
mError("table:%s, failed to create table, db not selected", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_DB_NOT_SELECTED, NULL, 0);
return;
}
STableInfo *pTable = mgmtGetTable(pCreate->tableId);
if (pTable != NULL) {
if (pCreate->igExists) {
return TSDB_CODE_SUCCESS;
mTrace("table:%s, table is alredy exist, think it success", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_SUCCESS, NULL, 0);
} else {
return TSDB_CODE_TABLE_ALREADY_EXIST;
mError("table:%s, failed to create table, table already exist", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_TABLE_ALREADY_EXIST, NULL, 0);
}
return;
}
SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct);
assert(pAcct != NULL);
int32_t code = mgmtCheckTableLimit(pAcct, pCreate);
if (code != 0) {
mError("table:%s, exceed the limit", pCreate->tableId);
return code;
if (code != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create table, table num exceed the limit", pCreate->tableId);
rpcSendResponse(thandle, code, NULL, 0);
return;
}
if (mgmtCheckExpired()) {
mError("failed to create meter:%s, reason:grant expired", pCreate->tableId);
return TSDB_CODE_GRANT_EXPIRED;
mError("table:%s, failed to create table, grant expired", pCreate->tableId);
rpcSendResponse(thandle, TSDB_CODE_GRANT_EXPIRED, NULL, 0);
return;
}
if (pCreate->numOfTags == 0) {
int32_t grantCode = mgmtCheckTimeSeries(pCreate->numOfColumns);
if (grantCode != 0) {
mError("table:%s, grant expired", pCreate->tableId);
return grantCode;
}
if (pCreate->numOfTags != 0) {
mTrace("table:%s, start to create super table, tags:%d columns:%d", pCreate->tableId, pCreate->numOfTags, pCreate->numOfColumns);
mgmtCreateSuperTable(pDb, pCreate);
return;
}
int32_t sid;
SVgObj *pVgroup = mgmtGetAvailVgroup(pDb, &sid);
if (pVgroup == NULL) {
code = mgmtCreateVgroup(pDb);
if (code == TSDB_CODE_SUCCESS) {
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_VGROUP_AND_TABLE;
info->ahandle = pTable;
info->thandle = thandle;
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mgmtSendCreateVgroupMsg(pVgroup, &ipSet, info);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
} else {
if (pCreate->numOfColumns == 0) {
code = mgmtCreateChildTable(pDb, pCreate, pVgroup, sid);
} else {
code = mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid);
}
if (code == TSDB_CODE_SUCCESS) {
SProcessInfo *info = calloc(1, sizeof(SProcessInfo));
info->type = TSDB_PROCESS_CREATE_TABLE;
info->ahandle = pTable;
info->thandle = thandle;
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mgmtSendCreateTableMsg(pTable, &ipSet, info);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
}
code = mgmtCheckTimeSeries(pCreate->numOfColumns);
if (code != TSDB_CODE_SUCCESS) {
mError("table:%s, failed to create table, timeseries exceed the limit", pCreate->tableId);
return;
}
SVgObj *pVgroup = mgmtGetAvailableVgroup(pDb);
if (pVgroup == NULL) {
mTrace("table:%s, no avaliable vgroup, start to create a new one", pCreate->tableId, pVgroup->vgId);
mgmtProcessCreateVgroup(pCreate, contLen, thandle);
} else {
return mgmtCreateSuperTable(pDb, pCreate);
mTrace("table:%s, try to create table in vgroup:%d", pCreate->tableId, pVgroup->vgId);
mgmtProcessCreateTable(pVgroup, pCreate, contLen, thandle);
}
}
......@@ -409,30 +477,13 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
return numOfRows;
}
SDCreateTableMsg *mgmtBuildCreateTableMsg(STableInfo *pTable) {
SDCreateTableMsg *pCreate = NULL;
if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
pCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
} else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) {
pCreate = mgmtBuildCreateChildTableMsg((SChildTableObj *) pTable);
} else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) {
pCreate = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
} else {
}
return pCreate;
}
SDRemoveTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) {
SDRemoveTableMsg *pRemove = NULL;
if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
pRemove = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
} else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) {
pRemove = mgmtBuildCreateChildTableMsg((SChildTableObj *) pTable);
} else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) {
pRemove = mgmtBuildCreateNormalTableMsg((SNormalTableObj *) pTable);
} else {
}
return pRemove;
}
void mgmtSetTableDirty(STableInfo *pTable, bool isDirty) {
pTable->dirty = isDirty;
}
\ No newline at end of file
......@@ -116,65 +116,25 @@ SVgObj *mgmtGetVgroup(int32_t vgId) {
return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId);
}
//TODO: temp way for debug usage
SVgObj *mgmtGetAvailVgroup(SDbObj *pDb, int32_t *sid) {
SVgObj *pVgroup = pDb->pHead;
if (pVgroup == NULL) {
pVgroup = mgmtCreateVgroup(pDb);
int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) {
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool));
pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS;
mgmtCreateVgroup(pDb);
terrno = TSDB_CODE_ACTION_IN_PROGRESS;
}
*sid = taosAllocateId(pVgroup->idPool);
return pVgroup;
// if (pDb->vgStatus)
//
// if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) {
// terrno = TSDB_CODE_ACTION_IN_PROGRESS;
// return NULL;
// }
//
// if (pDb->vgStatus == TSDB_VG_STATUS_FULL) {
// mError("db:%s, vgroup is full", pDb->name);
// terrno = TSDB_CODE_NO_ENOUGH_DNODES;
// return NULL;
// }
//
// if (pDb->vgStatus == TSDB_VG_STATUS_NO_DISK_PERMISSIONS ||
// pDb->vgStatus == TSDB_VG_STATUS_SERVER_NO_PACE ||
// pDb->vgStatus == TSDB_VG_STATUS_SERV_OUT_OF_MEMORY ||
// pDb->vgStatus == TSDB_VG_STATUS_INIT_FAILED ) {
// mError("db:%s, vgroup init failed, reason:%d %s", pDb->name, pDb->vgStatus, taosGetVgroupStatusStr(pDb->vgStatus));
// terrno = pDb->vgStatus;
// return NULL;
// }
//
// if (pVgroup == NULL) {
// //TODO
// //pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS;
// pDb->vgStatus = TSDB_VG_STATUS_READY;
// mgmtCreateVgroup(pDb);
// mTrace("db:%s, vgroup malloced, wait for create progress finished", pDb->name);
// terrno = TSDB_CODE_ACTION_IN_PROGRESS;
// return NULL;
// }
//
// terrno = 0;
// return pVgroup;
terrno = 0;
return sid;
}
//int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) {
// int32_t sid = taosAllocateId(pVgroup->idPool);
// if (sid < 0) {
// mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool));
// pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS;
// mgmtCreateVgroup(pDb);
// terrno = TSDB_CODE_ACTION_IN_PROGRESS;
// }
//
// terrno = 0;
// return sid;
//}
/*
* TODO: check if there is enough sids
*/
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
return pDb->pHead;
}
void mgmtProcessVgTimer(void *handle, void *tmrId) {
SDbObj *pDb = (SDbObj *)handle;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册