diff --git a/src/inc/mnode.h b/src/inc/mnode.h index ef97bdfcb162ebe0aea5a2146bd4c450c880fdc7..5c1ac25bc16b9daf6d4c569ba877129bd960cddb 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -195,6 +195,7 @@ typedef struct { int32_t sversion; int32_t numOfColumns; int32_t schemaSize; + int16_t sqlLen; char reserved[3]; char updateEnd[1]; int16_t nextColId; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 811b9590bba4912ebc3cfb9ed6f5119bd2f6189c..f6e117d426b6a50143ff1fdd99515ba75cd138ab 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -25,6 +25,7 @@ extern "C" { #include "tsdb.h" #include "taoserror.h" +#include "taosdef.h" // message type #define TSDB_MSG_TYPE_REG 1 @@ -292,9 +293,9 @@ typedef struct SSchema { } SSchema; typedef struct SMColumn { - char type; - short colId; - short bytes; + int8_t type; + int16_t colId; + int16_t bytes; } SMColumn; typedef struct { @@ -323,6 +324,44 @@ typedef struct { } SCreateMsg; +typedef struct { + int32_t vnode; + int32_t sid; + uint64_t uid; + char tableId[TSDB_TABLE_ID_LEN + 1]; + char superTableId[TSDB_TABLE_ID_LEN + 1]; + uint64_t createdTime; + int32_t sversion; + int16_t numOfColumns; + int16_t numOfTags; + int32_t tagDataLen; + int8_t data[]; +} SCreateChildTableMsg; + +typedef struct { + int32_t vnode; + int32_t sid; + uint64_t uid; + char tableId[TSDB_TABLE_ID_LEN + 1]; + uint64_t createdTime; + int32_t sversion; + int16_t numOfColumns; + int8_t data[]; +} SCreateNormalTableMsg; + +typedef struct { + int32_t vnode; + int32_t sid; + uint64_t uid; + char tableId[TSDB_TABLE_ID_LEN + 1]; + uint64_t createdTime; + int32_t sversion; + int16_t numOfColumns; + int32_t sqlLen; + int8_t data[]; +} SCreateStreamTableMsg; + + typedef struct { char db[TSDB_TABLE_ID_LEN]; uint8_t ignoreNotExists; diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index b9fc0585466d391840c544a57509bef8632baba4..397f8ac72fd1c420661d19799853e059d3af5e3d 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -28,14 +28,13 @@ extern "C" { int32_t mgmtInitChildTables(); void mgmtCleanUpChildTables(); -int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid); +int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); SChildTableObj* mgmtGetChildTable(char *tableId); SSchema* mgmtGetChildTableSchema(SChildTableObj *pTable); - - +int8_t * mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32_t vnode, int32_t tagDataLen, int8_t *pTagData); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index 20de92b6ef08730779f36c6e94222fa5521b3013..a81b197f2c75902c4312453c5320aed3651d861c 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -26,7 +26,10 @@ extern "C" { extern void *mgmtStatusTimer; -int mgmtSendCreateMsgToVgroup(STabObj *pTable, SVgObj *pVgroup); +int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int32_t tagDataLen, int8_t *pTagData); +int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup); +int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup); + int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup); int mgmtSendVPeersMsg(SVgObj *pVgroup); int mgmtSendFreeVnodeMsg(SVgObj *pVgroup); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index 17cc5721cd0955d23a0474247959c22523441c97..a8cf30bdbdac6f8bcad855f73c7d362f08fc8a91 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -27,13 +27,14 @@ extern "C" { int32_t mgmtInitNormalTables(); void mgmtCleanUpNormalTables(); -int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid); +int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); SNormalTableObj* mgmtGetNormalTable(char *tableId); SSchema* mgmtGetNormalTableSchema(SNormalTableObj *pTable); +int8_t * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int8_t *pMsg, int32_t vnode); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtStreamTable.h b/src/mnode/inc/mgmtStreamTable.h index b67f05771af0a15714f6df2fdca888e30edc0efd..e9f636482c03fdd8fbeebb7bf120358f13eaa9b9 100644 --- a/src/mnode/inc/mgmtStreamTable.h +++ b/src/mnode/inc/mgmtStreamTable.h @@ -27,12 +27,14 @@ extern "C" { int32_t mgmtInitStreamTables(); void mgmtCleanUpStreamTables(); -int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid); +int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable); int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter); SStreamTableObj* mgmtGetStreamTable(char *tableId); SSchema* mgmtGetStreamTableSchema(SStreamTableObj *pTable); +int8_t * mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int32_t vnode); + #ifdef __cplusplus diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index a5396ce4734b0b5181d91c8eea96d287d1ad5842..b94282cf572cb63de0911dd220c62706e2f41361 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -49,40 +49,68 @@ int32_t mgmtInitChildTables() { void mgmtCleanUpChildTables() { } -char *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, char *pMsg, int vnode) { - +int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32_t vnode, int32_t tagDataLen, + int8_t *pTagData) { + SCreateChildTableMsg *pCreateTable = (SCreateChildTableMsg *) pMsg; + memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); + memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN); + pCreateTable->vnode = htonl(vnode); + pCreateTable->sid = htonl(pTable->sid); + pCreateTable->uid = pTable->uid; + pCreateTable->createdTime = htobe64(pTable->createdTime); + pCreateTable->sversion = htonl(pTable->superTable->sversion); + pCreateTable->numOfColumns = htons(pTable->superTable->numOfColumns); + pCreateTable->numOfTags = htons(pTable->superTable->numOfTags); + + SSchema *pSchema = pTable->superTable->schema; + int32_t totalCols = pCreateTable->numOfColumns + pCreateTable->numOfTags; + + for (int32_t col = 0; col < totalCols; ++col) { + SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; + colData->type = pSchema[col].type; + colData->bytes = htons(pSchema[col].bytes); + colData->colId = htons(pSchema[col].colId); + } + + int32_t totalColsSize = sizeof(SMColumn *) * totalCols; + pMsg = pCreateTable->data + totalColsSize + tagDataLen; + + memcpy(pCreateTable->data + totalColsSize, pTagData, tagDataLen); + pCreateTable->tagDataLen = htonl(tagDataLen); + + return pMsg; } -int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) { +int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { int numOfTables = sdbGetNumOfRows(tsChildTableSdb); if (numOfTables >= tsMaxTables) { - mError("child table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables); + mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables); return TSDB_CODE_TOO_MANY_TABLES; } - char *pTagData = (char *)pCreate->schema; // it is a tag key + char *pTagData = (char *) pCreate->schema; // it is a tag key SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); if (pSuperTable == NULL) { mError("table:%s, corresponding super table does not exist", pCreate->meterId); return TSDB_CODE_INVALID_TABLE; } - SChildTableObj *pTable = (SChildTableObj *)calloc(sizeof(SChildTableObj), 1); + SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1); if (pTable == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } strcpy(pTable->tableId, pCreate->meterId); strcpy(pTable->superTableId, pSuperTable->tableId); pTable->createdTime = taosGetTimestampMs(); - pTable->superTable = pSuperTable; - pTable->vgId = vgId; - pTable->sid = sid; - pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + - ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); + pTable->superTable = pSuperTable; + pTable->vgId = pVgroup->vgId; + pTable->sid = sid; + pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); SVariableMsg tags = {0}; - tags.size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t)TSDB_TABLE_ID_LEN; - tags.data = (char *)calloc(1, tags.size); + tags.size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t) TSDB_TABLE_ID_LEN; + tags.data = (char *) calloc(1, tags.size); if (tags.data == NULL) { free(pTable); mError("table:%s, corresponding super table schema is null", pCreate->meterId); @@ -96,10 +124,13 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId } mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1); - mgmtSendCreateMsgToVgroup(pTable, pVgroup); - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", - pTable->tableId, vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); + mgmtSendCreateChildTableMsg(pTable, pVgroup, tags.size, tags.data); + + mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" + PRIu64 + " db:%s", + pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); return 0; } diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 3576003d339fc000e4cfec61cea7ec1fc65506e9..7738c2f28b11857ae7c8a5f3978665e31dca199d 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -31,6 +31,11 @@ #include "dnodeSystem.h" + +#include "mgmtChildTable.h" +#include "mgmtNormalTable.h" +#include "mgmtStreamTable.h" + void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj); int mgmtSendVPeersMsg(SVgObj *pVgroup); char *mgmtBuildVpeersIe(char *pMsg, SVgObj *pVgroup, int vnode); @@ -229,30 +234,75 @@ char *mgmtBuildCreateMeterIe(STabObj *pTable, char *pMsg, int vnode) { return pMsg; } -int mgmtSendCreateMsgToVgroup(STabObj table, SVgObj *pVgroup) { - char * pMsg, *pStart; - int i, msgLen = 0; - SDnodeObj *pObj; - uint64_t timeStamp; +int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int32_t tagDataLen, int8_t *pTagData) { + uint64_t timeStamp = taosGetTimestampMs(); - timeStamp = taosGetTimestampMs(); + for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) { + SDnodeObj *pObj = mgmtGetDnode(pVgroup->vnodeGid[index].ip); + if (pObj == NULL) { + continue; + } - for (i = 0; i < pVgroup->numOfVnodes; ++i) { - //if (pVgroup->vnodeGid[i].ip == 0) continue; + int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); + if (pStart == NULL) { + continue; + } - pObj = mgmtGetDnode(pVgroup->vnodeGid[i].ip); - if (pObj == NULL) continue; + int8_t *pMsg = mgmtBuildCreateChildTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode, tagDataLen, pTagData); + int32_t msgLen = pMsg - pStart; - pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); - if (pStart == NULL) continue; - pMsg = mgmtBuildCreateMeterIe(pTable, pStart, pVgroup->vnodeGid[i].vnode); - msgLen = pMsg - pStart; + taosSendMsgToDnode(pObj, pStart, msgLen); + } + + pVgroup->lastCreate = timeStamp; + return 0; +} + +int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) { + uint64_t timeStamp = taosGetTimestampMs(); + + for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) { + SDnodeObj *pObj = mgmtGetDnode(pVgroup->vnodeGid[index].ip); + if (pObj == NULL) { + continue; + } + + int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); + if (pStart == NULL) { + continue; + } + + int8_t *pMsg = mgmtBuildCreateStreamTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode); + int32_t msgLen = pMsg - pStart; taosSendMsgToDnode(pObj, pStart, msgLen); } pVgroup->lastCreate = timeStamp; + return 0; +} +int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { + uint64_t timeStamp = taosGetTimestampMs(); + + for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) { + SDnodeObj *pObj = mgmtGetDnode(pVgroup->vnodeGid[index].ip); + if (pObj == NULL) { + continue; + } + + int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); + if (pStart == NULL) { + continue; + } + + int8_t *pMsg = mgmtBuildCreateNormalTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode); + int32_t msgLen = pMsg - pStart; + + taosSendMsgToDnode(pObj, pStart, msgLen); + } + + pVgroup->lastCreate = timeStamp; return 0; } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index c4e2cfbfe185517488abe7669c63988871396e1c..a4ed37004308c7f9e1df0dbe0e8e192d8adc05b1 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -47,29 +47,55 @@ void mgmtCleanUpNormalTables() { sdbCloseTable(tsNormalTableSdb); } -int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) { +int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int8_t *pMsg, int32_t vnode) { + SCreateNormalTableMsg *pCreateTable = (SCreateNormalTableMsg *) pMsg; + memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); + pCreateTable->vnode = htobe32(vnode); + pCreateTable->sid = htobe32(pTable->sid); + pCreateTable->uid = htobe64(pTable->uid); + pCreateTable->createdTime = htobe64(pTable->createdTime); + pCreateTable->sversion = htobe32(pTable->sversion); + pCreateTable->numOfColumns = htobe16(pTable->numOfColumns); + + SSchema *pSchema = pTable->schema; + int32_t totalCols = pCreateTable->numOfColumns; + + for (int32_t col = 0; col < totalCols; ++col) { + SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; + colData->type = pSchema[col].type; + colData->bytes = htons(pSchema[col].bytes); + colData->colId = htons(pSchema[col].colId); + } + + int32_t totalColsSize = sizeof(SMColumn *) * totalCols; + pMsg = pCreateTable->data + totalColsSize; + + return pMsg; +} + +int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { int numOfTables = sdbGetNumOfRows(tsChildTableSdb); if (numOfTables >= TSDB_MAX_TABLES) { mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES); return TSDB_CODE_TOO_MANY_TABLES; } - SNormalTableObj *pTable = (SNormalTableObj *)calloc(sizeof(SNormalTableObj), 1); + SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1); if (pTable == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } strcpy(pTable->tableId, pCreate->meterId); - pTable->createdTime = taosGetTimestampMs(); - pTable->vgId = vgId; - pTable->sid = sid; - pTable->uid = (((uint64_t)pTable->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul)); - pTable->sversion = 0; + pTable->createdTime = taosGetTimestampMs(); + pTable->vgId = pVgroup->vgId; + pTable->sid = sid; + pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); + pTable->sversion = 0; pTable->numOfColumns = pCreate->numOfColumns; int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; pTable->schemaSize = numOfCols * sizeof(SSchema); - pTable->schema = (int8_t *)calloc(1, pTable->schemaSize); + pTable->schema = (int8_t *) calloc(1, pTable->schemaSize); if (pTable->schema == NULL) { free(pTable); mError("table:%s, no schema input", pCreate->meterId); @@ -79,7 +105,7 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI pTable->nextColId = 0; for (int col = 0; col < pCreate->numOfColumns; col++) { - SSchema *tschema = (SSchema *)pTable->schema; + SSchema *tschema = (SSchema *) pTable->schema; tschema[col].colId = pTable->nextColId++; } @@ -88,14 +114,14 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI return TSDB_CODE_SDB_ERROR; } -// mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d", -// pTable->meterId, pTable->gid.vgId, pTable->gid.sid, pVgroup->vnodeGid[0].vnode); -// -// mgmtAddTimeSeries(pTable->numOfColumns - 1); -// mgmtSendCreateMsgToVgroup(pTable, pVgroup); + mgmtAddTimeSeries(pTable->numOfColumns - 1); + + mgmtSendCreateNormalTableMsg(pTable, pVgroup); - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", - pTable->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); + mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" + PRIu64 + " db:%s", + pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); return 0; } diff --git a/src/mnode/src/mgmtStreamTable.c b/src/mnode/src/mgmtStreamTable.c index 82d93372eeb6dcbae564a1bda1196093ebf7444f..d6958db0750ec1ab8e8b34491a6bd193b1ea15f4 100644 --- a/src/mnode/src/mgmtStreamTable.c +++ b/src/mnode/src/mgmtStreamTable.c @@ -46,29 +46,59 @@ int32_t mgmtInitStreamTables() { void mgmtCleanUpStreamTables() { } -int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) { +int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int32_t vnode) { + SCreateStreamTableMsg *pCreateTable = (SCreateStreamTableMsg *) pMsg; + memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); + pCreateTable->vnode = htonl(vnode); + pCreateTable->sid = htonl(pTable->sid); + pCreateTable->uid = pTable->uid; + pCreateTable->createdTime = htobe64(pTable->createdTime); + pCreateTable->sversion = htonl(pTable->sversion); + pCreateTable->numOfColumns = htons(pTable->numOfColumns); + pCreateTable->sqlLen = htons(pTable->sqlLen); + + SSchema *pSchema = pTable->schema; + int32_t totalCols = pCreateTable->numOfColumns; + + for (int32_t col = 0; col < totalCols; ++col) { + SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; + colData->type = pSchema[col].type; + colData->bytes = htons(pSchema[col].bytes); + colData->colId = htons(pSchema[col].colId); + } + + int32_t totalColsSize = sizeof(SMColumn *) * totalCols; + pMsg = pCreateTable->data + totalColsSize + pTable->sqlLen; + + char *sql = pTable->schema + pTable->schemaSize; + memcpy(pCreateTable->data + totalColsSize, pTable->sqlLen, sql); + + return pMsg; +} + +int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { int numOfTables = sdbGetNumOfRows(tsStreamTableSdb); if (numOfTables >= TSDB_MAX_TABLES) { mError("stream table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES); return TSDB_CODE_TOO_MANY_TABLES; } - SStreamTableObj *pTable = (SStreamTableObj *)calloc(sizeof(SStreamTableObj), 1); + SStreamTableObj *pTable = (SStreamTableObj *) calloc(sizeof(SStreamTableObj), 1); if (pTable == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } strcpy(pTable->tableId, pCreate->meterId); - pTable->createdTime = taosGetTimestampMs(); - pTable->vgId = vgId; - pTable->sid = sid; - pTable->uid = (((uint64_t)pTable->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul)); - pTable->sversion = 0; + pTable->createdTime = taosGetTimestampMs(); + pTable->vgId = pVgroup->vgId; + pTable->sid = sid; + pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); + pTable->sversion = 0; pTable->numOfColumns = pCreate->numOfColumns; int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; pTable->schemaSize = numOfCols * sizeof(SSchema) + pCreate->sqlLen; - pTable->schema = (int8_t *)calloc(1, pTable->schemaSize); + pTable->schema = (int8_t *) calloc(1, pTable->schemaSize); if (pTable->schema == NULL) { free(pTable); mError("table:%s, no schema input", pCreate->meterId); @@ -78,12 +108,12 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI pTable->nextColId = 0; for (int col = 0; col < pCreate->numOfColumns; col++) { - SSchema *tschema = (SSchema *)pTable->schema; + SSchema *tschema = (SSchema *) pTable->schema; tschema[col].colId = pTable->nextColId++; } pTable->pSql = pTable->schema + numOfCols * sizeof(SSchema); - memcpy(pTable->pSql, (char *)(pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen); + memcpy(pTable->pSql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen); pTable->pSql[pCreate->sqlLen - 1] = 0; mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->pSql); @@ -92,14 +122,14 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI return TSDB_CODE_SDB_ERROR; } -// mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d", -// pTable->meterId, pTable->gid.vgId, pTable->gid.sid, pVgroup->vnodeGid[0].vnode); -// -// mgmtAddTimeSeries(pTable->numOfColumns - 1); -// mgmtSendCreateMsgToVgroup(pTable, pVgroup); + mgmtAddTimeSeries(pTable->numOfColumns - 1); + + mgmtSendCreateStreamTableMsg(pTable, pVgroup); - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", - pTable->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); + mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" + PRIu64 + " db:%s", + pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); return 0; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 29d4fb03ac263aa50bb90157c4569895c9a13cc3..06f13d1f56f17c0bd13ea76fbd29afc34bedc260 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -570,11 +570,11 @@ int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { } if (pCreate->numOfColumns == 0) { - return mgmtCreateChildTable(pDb, pCreate, pVgroup->vgId, sid); + return mgmtCreateChildTable(pDb, pCreate, pVgroup, sid); } else if (pCreate->sqlLen > 0) { - return mgmtCreateStreamTable(pDb, pCreate, pVgroup->vgId, sid); + return mgmtCreateStreamTable(pDb, pCreate, pVgroup, sid); } else { - return mgmtCreateNormalTable(pDb, pCreate, pVgroup->vgId, sid); + return mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid); } } else { return mgmtCreateSuperTable(pDb, pCreate); @@ -755,25 +755,23 @@ int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { } pShow->numOfRows = pDb->numOfTables; - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; return 0; } int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int32_t numOfRows = 0; - STabObj *pTable = NULL; - char * pWrite; - int32_t cols = 0; - int32_t prefixLen; - int32_t numOfRead = 0; - char prefix[20] = {0}; - int16_t numOfColumns; - char * tableId; - char * superTableId; - int64_t createdTime; - void * pNormalTableNode; - void * pChildTableNode; + int32_t numOfRows = 0; + int32_t numOfRead = 0; + int32_t cols = 0; + void *pTable = NULL; + char *pWrite = NULL; + + int16_t numOfColumns; + int64_t createdTime; + char *tableId; + char *superTableId; + SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SDbObj *pDb = NULL; if (pConn->pDb != NULL) { @@ -791,33 +789,41 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon } } + char prefix[20] = {0}; strcpy(prefix, pDb->name); strcat(prefix, TS_PATH_DELIMITER); - prefixLen = strlen(prefix); - - SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; - char meterName[TSDB_METER_NAME_LEN] = {0}; + int32_t prefixLen = strlen(prefix); while (numOfRows < rows) { - pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable); + void *pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable); if (pTable != NULL) { - pShow->pNode = pNormalTableNode; SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable; - tableId = pNormalTable->tableId; + pShow->pNode = pNormalTableNode; + tableId = pNormalTable->tableId; superTableId = NULL; - createdTime = pNormalTable->createdTime; + createdTime = pNormalTable->createdTime; numOfColumns = pNormalTable->numOfColumns; } else { - pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable); + void *pStreamTableNode = sdbFetchRow(tsStreamTableSdb, pShow->pNode, (void **) &pTable); if (pTable != NULL) { - pShow->pNode = pChildTableNode; - SChildTableObj *pChildTable = (SChildTableObj *) pTable; - tableId = pChildTable->tableId; + SStreamTableObj *pChildTable = (SStreamTableObj *) pTable; + pShow->pNode = pStreamTableNode; + tableId = pChildTable->tableId; superTableId = NULL; - createdTime = pChildTable->createdTime; - numOfColumns = pChildTable->superTable->numOfColumns; + createdTime = pChildTable->createdTime; + numOfColumns = pChildTable->numOfColumns; } else { - break; + void *pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable); + if (pTable != NULL) { + SChildTableObj *pChildTable = (SChildTableObj *) pTable; + pShow->pNode = pChildTableNode; + tableId = pChildTable->tableId; + superTableId = NULL; + createdTime = pChildTable->createdTime; + numOfColumns = pChildTable->superTable->numOfColumns; + } else { + break; + } } } @@ -826,8 +832,9 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon continue; } - numOfRead++; + char meterName[TSDB_METER_NAME_LEN] = {0}; memset(meterName, 0, tListLen(meterName)); + numOfRead++; // pattern compare for meter name extractTableName(tableId, meterName); @@ -867,19 +874,3 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon return numOfRows; } - -SSchema *mgmtGetTableSchema(STabObj *pTable) { - if (pTable == NULL) { - return NULL; - } - - if (!mgmtTableCreateFromSuperTable(pTable)) { - return (SSchema *)pTable->schema; - } - - STabObj *pMetric = mgmtGetTable(pTable->pTagData); - assert(pMetric != NULL); - - return (SSchema *)pMetric->schema; -} -