提交 5e054576 编写于 作者: S slguan

mgmtTable

上级 81fef75f
...@@ -195,6 +195,7 @@ typedef struct { ...@@ -195,6 +195,7 @@ typedef struct {
int32_t sversion; int32_t sversion;
int32_t numOfColumns; int32_t numOfColumns;
int32_t schemaSize; int32_t schemaSize;
int16_t sqlLen;
char reserved[3]; char reserved[3];
char updateEnd[1]; char updateEnd[1];
int16_t nextColId; int16_t nextColId;
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
#include "tsdb.h" #include "tsdb.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosdef.h"
// message type // message type
#define TSDB_MSG_TYPE_REG 1 #define TSDB_MSG_TYPE_REG 1
...@@ -292,9 +293,9 @@ typedef struct SSchema { ...@@ -292,9 +293,9 @@ typedef struct SSchema {
} SSchema; } SSchema;
typedef struct SMColumn { typedef struct SMColumn {
char type; int8_t type;
short colId; int16_t colId;
short bytes; int16_t bytes;
} SMColumn; } SMColumn;
typedef struct { typedef struct {
...@@ -323,6 +324,44 @@ typedef struct { ...@@ -323,6 +324,44 @@ typedef struct {
} SCreateMsg; } SCreateMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
uint64_t createdTime;
int32_t sversion;
int16_t numOfColumns;
int16_t numOfTags;
int32_t tagDataLen;
int8_t data[];
} SCreateChildTableMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1];
uint64_t createdTime;
int32_t sversion;
int16_t numOfColumns;
int8_t data[];
} SCreateNormalTableMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN + 1];
uint64_t createdTime;
int32_t sversion;
int16_t numOfColumns;
int32_t sqlLen;
int8_t data[];
} SCreateStreamTableMsg;
typedef struct { typedef struct {
char db[TSDB_TABLE_ID_LEN]; char db[TSDB_TABLE_ID_LEN];
uint8_t ignoreNotExists; uint8_t ignoreNotExists;
......
...@@ -28,14 +28,13 @@ extern "C" { ...@@ -28,14 +28,13 @@ extern "C" {
int32_t mgmtInitChildTables(); int32_t mgmtInitChildTables();
void mgmtCleanUpChildTables(); void mgmtCleanUpChildTables();
int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid); int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
SChildTableObj* mgmtGetChildTable(char *tableId); SChildTableObj* mgmtGetChildTable(char *tableId);
SSchema* mgmtGetChildTableSchema(SChildTableObj *pTable); SSchema* mgmtGetChildTableSchema(SChildTableObj *pTable);
int8_t * mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32_t vnode, int32_t tagDataLen, int8_t *pTagData);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -26,7 +26,10 @@ extern "C" { ...@@ -26,7 +26,10 @@ extern "C" {
extern void *mgmtStatusTimer; extern void *mgmtStatusTimer;
int mgmtSendCreateMsgToVgroup(STabObj *pTable, SVgObj *pVgroup); int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int32_t tagDataLen, int8_t *pTagData);
int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup);
int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup);
int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup); int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup);
int mgmtSendVPeersMsg(SVgObj *pVgroup); int mgmtSendVPeersMsg(SVgObj *pVgroup);
int mgmtSendFreeVnodeMsg(SVgObj *pVgroup); int mgmtSendFreeVnodeMsg(SVgObj *pVgroup);
......
...@@ -27,13 +27,14 @@ extern "C" { ...@@ -27,13 +27,14 @@ extern "C" {
int32_t mgmtInitNormalTables(); int32_t mgmtInitNormalTables();
void mgmtCleanUpNormalTables(); void mgmtCleanUpNormalTables();
int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid); int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
SNormalTableObj* mgmtGetNormalTable(char *tableId); SNormalTableObj* mgmtGetNormalTable(char *tableId);
SSchema* mgmtGetNormalTableSchema(SNormalTableObj *pTable); SSchema* mgmtGetNormalTableSchema(SNormalTableObj *pTable);
int8_t * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int8_t *pMsg, int32_t vnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,12 +27,14 @@ extern "C" { ...@@ -27,12 +27,14 @@ extern "C" {
int32_t mgmtInitStreamTables(); int32_t mgmtInitStreamTables();
void mgmtCleanUpStreamTables(); void mgmtCleanUpStreamTables();
int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid); int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable); int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable);
int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter);
SStreamTableObj* mgmtGetStreamTable(char *tableId); SStreamTableObj* mgmtGetStreamTable(char *tableId);
SSchema* mgmtGetStreamTableSchema(SStreamTableObj *pTable); SSchema* mgmtGetStreamTableSchema(SStreamTableObj *pTable);
int8_t * mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int32_t vnode);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -49,25 +49,53 @@ int32_t mgmtInitChildTables() { ...@@ -49,25 +49,53 @@ int32_t mgmtInitChildTables() {
void mgmtCleanUpChildTables() { void mgmtCleanUpChildTables() {
} }
char *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, char *pMsg, int vnode) { int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32_t vnode, int32_t tagDataLen,
int8_t *pTagData) {
SCreateChildTableMsg *pCreateTable = (SCreateChildTableMsg *) pMsg;
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN);
pCreateTable->vnode = htonl(vnode);
pCreateTable->sid = htonl(pTable->sid);
pCreateTable->uid = pTable->uid;
pCreateTable->createdTime = htobe64(pTable->createdTime);
pCreateTable->sversion = htonl(pTable->superTable->sversion);
pCreateTable->numOfColumns = htons(pTable->superTable->numOfColumns);
pCreateTable->numOfTags = htons(pTable->superTable->numOfTags);
SSchema *pSchema = pTable->superTable->schema;
int32_t totalCols = pCreateTable->numOfColumns + pCreateTable->numOfTags;
for (int32_t col = 0; col < totalCols; ++col) {
SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
colData->type = pSchema[col].type;
colData->bytes = htons(pSchema[col].bytes);
colData->colId = htons(pSchema[col].colId);
}
int32_t totalColsSize = sizeof(SMColumn *) * totalCols;
pMsg = pCreateTable->data + totalColsSize + tagDataLen;
memcpy(pCreateTable->data + totalColsSize, pTagData, tagDataLen);
pCreateTable->tagDataLen = htonl(tagDataLen);
return pMsg;
} }
int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) { int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int numOfTables = sdbGetNumOfRows(tsChildTableSdb); int numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= tsMaxTables) { if (numOfTables >= tsMaxTables) {
mError("child table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables); mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables);
return TSDB_CODE_TOO_MANY_TABLES; return TSDB_CODE_TOO_MANY_TABLES;
} }
char *pTagData = (char *)pCreate->schema; // it is a tag key char *pTagData = (char *) pCreate->schema; // it is a tag key
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData);
if (pSuperTable == NULL) { if (pSuperTable == NULL) {
mError("table:%s, corresponding super table does not exist", pCreate->meterId); mError("table:%s, corresponding super table does not exist", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE; return TSDB_CODE_INVALID_TABLE;
} }
SChildTableObj *pTable = (SChildTableObj *)calloc(sizeof(SChildTableObj), 1); SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1);
if (pTable == NULL) { if (pTable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
...@@ -75,14 +103,14 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId ...@@ -75,14 +103,14 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId
strcpy(pTable->superTableId, pSuperTable->tableId); strcpy(pTable->superTableId, pSuperTable->tableId);
pTable->createdTime = taosGetTimestampMs(); pTable->createdTime = taosGetTimestampMs();
pTable->superTable = pSuperTable; pTable->superTable = pSuperTable;
pTable->vgId = vgId; pTable->vgId = pVgroup->vgId;
pTable->sid = sid; pTable->sid = sid;
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
SVariableMsg tags = {0}; SVariableMsg tags = {0};
tags.size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t)TSDB_TABLE_ID_LEN; tags.size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t) TSDB_TABLE_ID_LEN;
tags.data = (char *)calloc(1, tags.size); tags.data = (char *) calloc(1, tags.size);
if (tags.data == NULL) { if (tags.data == NULL) {
free(pTable); free(pTable);
mError("table:%s, corresponding super table schema is null", pCreate->meterId); mError("table:%s, corresponding super table schema is null", pCreate->meterId);
...@@ -96,10 +124,13 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId ...@@ -96,10 +124,13 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId
} }
mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1); mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1);
mgmtSendCreateMsgToVgroup(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", mgmtSendCreateChildTableMsg(pTable, pVgroup, tags.size, tags.data);
pTable->tableId, vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%"
PRIu64
" db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
return 0; return 0;
} }
......
...@@ -31,6 +31,11 @@ ...@@ -31,6 +31,11 @@
#include "dnodeSystem.h" #include "dnodeSystem.h"
#include "mgmtChildTable.h"
#include "mgmtNormalTable.h"
#include "mgmtStreamTable.h"
void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj); void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj);
int mgmtSendVPeersMsg(SVgObj *pVgroup); int mgmtSendVPeersMsg(SVgObj *pVgroup);
char *mgmtBuildVpeersIe(char *pMsg, SVgObj *pVgroup, int vnode); char *mgmtBuildVpeersIe(char *pMsg, SVgObj *pVgroup, int vnode);
...@@ -229,30 +234,75 @@ char *mgmtBuildCreateMeterIe(STabObj *pTable, char *pMsg, int vnode) { ...@@ -229,30 +234,75 @@ char *mgmtBuildCreateMeterIe(STabObj *pTable, char *pMsg, int vnode) {
return pMsg; return pMsg;
} }
int mgmtSendCreateMsgToVgroup(STabObj table, SVgObj *pVgroup) { int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int32_t tagDataLen, int8_t *pTagData) {
char * pMsg, *pStart; uint64_t timeStamp = taosGetTimestampMs();
int i, msgLen = 0;
SDnodeObj *pObj;
uint64_t timeStamp;
timeStamp = taosGetTimestampMs(); for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) {
SDnodeObj *pObj = mgmtGetDnode(pVgroup->vnodeGid[index].ip);
if (pObj == NULL) {
continue;
}
for (i = 0; i < pVgroup->numOfVnodes; ++i) { int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000);
//if (pVgroup->vnodeGid[i].ip == 0) continue; if (pStart == NULL) {
continue;
}
pObj = mgmtGetDnode(pVgroup->vnodeGid[i].ip); int8_t *pMsg = mgmtBuildCreateChildTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode, tagDataLen, pTagData);
if (pObj == NULL) continue; int32_t msgLen = pMsg - pStart;
pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000); taosSendMsgToDnode(pObj, pStart, msgLen);
if (pStart == NULL) continue; }
pMsg = mgmtBuildCreateMeterIe(pTable, pStart, pVgroup->vnodeGid[i].vnode);
msgLen = pMsg - pStart; pVgroup->lastCreate = timeStamp;
return 0;
}
int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) {
uint64_t timeStamp = taosGetTimestampMs();
for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) {
SDnodeObj *pObj = mgmtGetDnode(pVgroup->vnodeGid[index].ip);
if (pObj == NULL) {
continue;
}
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000);
if (pStart == NULL) {
continue;
}
int8_t *pMsg = mgmtBuildCreateStreamTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode);
int32_t msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen); taosSendMsgToDnode(pObj, pStart, msgLen);
} }
pVgroup->lastCreate = timeStamp; pVgroup->lastCreate = timeStamp;
return 0;
}
int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
uint64_t timeStamp = taosGetTimestampMs();
for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) {
SDnodeObj *pObj = mgmtGetDnode(pVgroup->vnodeGid[index].ip);
if (pObj == NULL) {
continue;
}
int8_t *pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000);
if (pStart == NULL) {
continue;
}
int8_t *pMsg = mgmtBuildCreateNormalTableMsg(pTable, pStart, pVgroup->vnodeGid[index].vnode);
int32_t msgLen = pMsg - pStart;
taosSendMsgToDnode(pObj, pStart, msgLen);
}
pVgroup->lastCreate = timeStamp;
return 0; return 0;
} }
......
...@@ -47,29 +47,55 @@ void mgmtCleanUpNormalTables() { ...@@ -47,29 +47,55 @@ void mgmtCleanUpNormalTables() {
sdbCloseTable(tsNormalTableSdb); sdbCloseTable(tsNormalTableSdb);
} }
int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) { int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int8_t *pMsg, int32_t vnode) {
SCreateNormalTableMsg *pCreateTable = (SCreateNormalTableMsg *) pMsg;
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
pCreateTable->vnode = htobe32(vnode);
pCreateTable->sid = htobe32(pTable->sid);
pCreateTable->uid = htobe64(pTable->uid);
pCreateTable->createdTime = htobe64(pTable->createdTime);
pCreateTable->sversion = htobe32(pTable->sversion);
pCreateTable->numOfColumns = htobe16(pTable->numOfColumns);
SSchema *pSchema = pTable->schema;
int32_t totalCols = pCreateTable->numOfColumns;
for (int32_t col = 0; col < totalCols; ++col) {
SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
colData->type = pSchema[col].type;
colData->bytes = htons(pSchema[col].bytes);
colData->colId = htons(pSchema[col].colId);
}
int32_t totalColsSize = sizeof(SMColumn *) * totalCols;
pMsg = pCreateTable->data + totalColsSize;
return pMsg;
}
int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int numOfTables = sdbGetNumOfRows(tsChildTableSdb); int numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) { if (numOfTables >= TSDB_MAX_TABLES) {
mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES); mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES);
return TSDB_CODE_TOO_MANY_TABLES; return TSDB_CODE_TOO_MANY_TABLES;
} }
SNormalTableObj *pTable = (SNormalTableObj *)calloc(sizeof(SNormalTableObj), 1); SNormalTableObj *pTable = (SNormalTableObj *) calloc(sizeof(SNormalTableObj), 1);
if (pTable == NULL) { if (pTable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
strcpy(pTable->tableId, pCreate->meterId); strcpy(pTable->tableId, pCreate->meterId);
pTable->createdTime = taosGetTimestampMs(); pTable->createdTime = taosGetTimestampMs();
pTable->vgId = vgId; pTable->vgId = pVgroup->vgId;
pTable->sid = sid; pTable->sid = sid;
pTable->uid = (((uint64_t)pTable->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul)); pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sversion = 0; pTable->sversion = 0;
pTable->numOfColumns = pCreate->numOfColumns; pTable->numOfColumns = pCreate->numOfColumns;
int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; int numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
pTable->schemaSize = numOfCols * sizeof(SSchema); pTable->schemaSize = numOfCols * sizeof(SSchema);
pTable->schema = (int8_t *)calloc(1, pTable->schemaSize); pTable->schema = (int8_t *) calloc(1, pTable->schemaSize);
if (pTable->schema == NULL) { if (pTable->schema == NULL) {
free(pTable); free(pTable);
mError("table:%s, no schema input", pCreate->meterId); mError("table:%s, no schema input", pCreate->meterId);
...@@ -79,7 +105,7 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI ...@@ -79,7 +105,7 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI
pTable->nextColId = 0; pTable->nextColId = 0;
for (int col = 0; col < pCreate->numOfColumns; col++) { for (int col = 0; col < pCreate->numOfColumns; col++) {
SSchema *tschema = (SSchema *)pTable->schema; SSchema *tschema = (SSchema *) pTable->schema;
tschema[col].colId = pTable->nextColId++; tschema[col].colId = pTable->nextColId++;
} }
...@@ -88,14 +114,14 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI ...@@ -88,14 +114,14 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI
return TSDB_CODE_SDB_ERROR; return TSDB_CODE_SDB_ERROR;
} }
// mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d", mgmtAddTimeSeries(pTable->numOfColumns - 1);
// pTable->meterId, pTable->gid.vgId, pTable->gid.sid, pVgroup->vnodeGid[0].vnode);
// mgmtSendCreateNormalTableMsg(pTable, pVgroup);
// mgmtAddTimeSeries(pTable->numOfColumns - 1);
// mgmtSendCreateMsgToVgroup(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%"
pTable->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); PRIu64
" db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
return 0; return 0;
} }
......
...@@ -46,29 +46,59 @@ int32_t mgmtInitStreamTables() { ...@@ -46,29 +46,59 @@ int32_t mgmtInitStreamTables() {
void mgmtCleanUpStreamTables() { void mgmtCleanUpStreamTables() {
} }
int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) { int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int32_t vnode) {
SCreateStreamTableMsg *pCreateTable = (SCreateStreamTableMsg *) pMsg;
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
pCreateTable->vnode = htonl(vnode);
pCreateTable->sid = htonl(pTable->sid);
pCreateTable->uid = pTable->uid;
pCreateTable->createdTime = htobe64(pTable->createdTime);
pCreateTable->sversion = htonl(pTable->sversion);
pCreateTable->numOfColumns = htons(pTable->numOfColumns);
pCreateTable->sqlLen = htons(pTable->sqlLen);
SSchema *pSchema = pTable->schema;
int32_t totalCols = pCreateTable->numOfColumns;
for (int32_t col = 0; col < totalCols; ++col) {
SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
colData->type = pSchema[col].type;
colData->bytes = htons(pSchema[col].bytes);
colData->colId = htons(pSchema[col].colId);
}
int32_t totalColsSize = sizeof(SMColumn *) * totalCols;
pMsg = pCreateTable->data + totalColsSize + pTable->sqlLen;
char *sql = pTable->schema + pTable->schemaSize;
memcpy(pCreateTable->data + totalColsSize, pTable->sqlLen, sql);
return pMsg;
}
int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int numOfTables = sdbGetNumOfRows(tsStreamTableSdb); int numOfTables = sdbGetNumOfRows(tsStreamTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) { if (numOfTables >= TSDB_MAX_TABLES) {
mError("stream table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES); mError("stream table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES);
return TSDB_CODE_TOO_MANY_TABLES; return TSDB_CODE_TOO_MANY_TABLES;
} }
SStreamTableObj *pTable = (SStreamTableObj *)calloc(sizeof(SStreamTableObj), 1); SStreamTableObj *pTable = (SStreamTableObj *) calloc(sizeof(SStreamTableObj), 1);
if (pTable == NULL) { if (pTable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY; return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
strcpy(pTable->tableId, pCreate->meterId); strcpy(pTable->tableId, pCreate->meterId);
pTable->createdTime = taosGetTimestampMs(); pTable->createdTime = taosGetTimestampMs();
pTable->vgId = vgId; pTable->vgId = pVgroup->vgId;
pTable->sid = sid; pTable->sid = sid;
pTable->uid = (((uint64_t)pTable->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul)); pTable->uid = (((uint64_t) pTable->createdTime) << 16) + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
pTable->sversion = 0; pTable->sversion = 0;
pTable->numOfColumns = pCreate->numOfColumns; pTable->numOfColumns = pCreate->numOfColumns;
int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; int numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
pTable->schemaSize = numOfCols * sizeof(SSchema) + pCreate->sqlLen; pTable->schemaSize = numOfCols * sizeof(SSchema) + pCreate->sqlLen;
pTable->schema = (int8_t *)calloc(1, pTable->schemaSize); pTable->schema = (int8_t *) calloc(1, pTable->schemaSize);
if (pTable->schema == NULL) { if (pTable->schema == NULL) {
free(pTable); free(pTable);
mError("table:%s, no schema input", pCreate->meterId); mError("table:%s, no schema input", pCreate->meterId);
...@@ -78,12 +108,12 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI ...@@ -78,12 +108,12 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI
pTable->nextColId = 0; pTable->nextColId = 0;
for (int col = 0; col < pCreate->numOfColumns; col++) { for (int col = 0; col < pCreate->numOfColumns; col++) {
SSchema *tschema = (SSchema *)pTable->schema; SSchema *tschema = (SSchema *) pTable->schema;
tschema[col].colId = pTable->nextColId++; tschema[col].colId = pTable->nextColId++;
} }
pTable->pSql = pTable->schema + numOfCols * sizeof(SSchema); pTable->pSql = pTable->schema + numOfCols * sizeof(SSchema);
memcpy(pTable->pSql, (char *)(pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen); memcpy(pTable->pSql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen);
pTable->pSql[pCreate->sqlLen - 1] = 0; pTable->pSql[pCreate->sqlLen - 1] = 0;
mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->pSql); mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->pSql);
...@@ -92,14 +122,14 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI ...@@ -92,14 +122,14 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgI
return TSDB_CODE_SDB_ERROR; return TSDB_CODE_SDB_ERROR;
} }
// mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d", mgmtAddTimeSeries(pTable->numOfColumns - 1);
// pTable->meterId, pTable->gid.vgId, pTable->gid.sid, pVgroup->vnodeGid[0].vnode);
// mgmtSendCreateStreamTableMsg(pTable, pVgroup);
// mgmtAddTimeSeries(pTable->numOfColumns - 1);
// mgmtSendCreateMsgToVgroup(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%"
pTable->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); PRIu64
" db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
return 0; return 0;
} }
......
...@@ -570,11 +570,11 @@ int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { ...@@ -570,11 +570,11 @@ int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
} }
if (pCreate->numOfColumns == 0) { if (pCreate->numOfColumns == 0) {
return mgmtCreateChildTable(pDb, pCreate, pVgroup->vgId, sid); return mgmtCreateChildTable(pDb, pCreate, pVgroup, sid);
} else if (pCreate->sqlLen > 0) { } else if (pCreate->sqlLen > 0) {
return mgmtCreateStreamTable(pDb, pCreate, pVgroup->vgId, sid); return mgmtCreateStreamTable(pDb, pCreate, pVgroup, sid);
} else { } else {
return mgmtCreateNormalTable(pDb, pCreate, pVgroup->vgId, sid); return mgmtCreateNormalTable(pDb, pCreate, pVgroup, sid);
} }
} else { } else {
return mgmtCreateSuperTable(pDb, pCreate); return mgmtCreateSuperTable(pDb, pCreate);
...@@ -762,18 +762,16 @@ int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { ...@@ -762,18 +762,16 @@ int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
STabObj *pTable = NULL;
char * pWrite;
int32_t cols = 0;
int32_t prefixLen;
int32_t numOfRead = 0; int32_t numOfRead = 0;
char prefix[20] = {0}; int32_t cols = 0;
void *pTable = NULL;
char *pWrite = NULL;
int16_t numOfColumns; int16_t numOfColumns;
char * tableId;
char * superTableId;
int64_t createdTime; int64_t createdTime;
void * pNormalTableNode; char *tableId;
void * pChildTableNode; char *superTableId;
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
if (pConn->pDb != NULL) { if (pConn->pDb != NULL) {
...@@ -791,27 +789,34 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon ...@@ -791,27 +789,34 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon
} }
} }
char prefix[20] = {0};
strcpy(prefix, pDb->name); strcpy(prefix, pDb->name);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
prefixLen = strlen(prefix); int32_t prefixLen = strlen(prefix);
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char meterName[TSDB_METER_NAME_LEN] = {0};
while (numOfRows < rows) { while (numOfRows < rows) {
pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable); void *pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable);
if (pTable != NULL) { if (pTable != NULL) {
pShow->pNode = pNormalTableNode;
SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable; SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable;
pShow->pNode = pNormalTableNode;
tableId = pNormalTable->tableId; tableId = pNormalTable->tableId;
superTableId = NULL; superTableId = NULL;
createdTime = pNormalTable->createdTime; createdTime = pNormalTable->createdTime;
numOfColumns = pNormalTable->numOfColumns; numOfColumns = pNormalTable->numOfColumns;
} else { } else {
pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable); void *pStreamTableNode = sdbFetchRow(tsStreamTableSdb, pShow->pNode, (void **) &pTable);
if (pTable != NULL) {
SStreamTableObj *pChildTable = (SStreamTableObj *) pTable;
pShow->pNode = pStreamTableNode;
tableId = pChildTable->tableId;
superTableId = NULL;
createdTime = pChildTable->createdTime;
numOfColumns = pChildTable->numOfColumns;
} else {
void *pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable);
if (pTable != NULL) { if (pTable != NULL) {
pShow->pNode = pChildTableNode;
SChildTableObj *pChildTable = (SChildTableObj *) pTable; SChildTableObj *pChildTable = (SChildTableObj *) pTable;
pShow->pNode = pChildTableNode;
tableId = pChildTable->tableId; tableId = pChildTable->tableId;
superTableId = NULL; superTableId = NULL;
createdTime = pChildTable->createdTime; createdTime = pChildTable->createdTime;
...@@ -820,14 +825,16 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon ...@@ -820,14 +825,16 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon
break; break;
} }
} }
}
// not belong to current db // not belong to current db
if (strncmp(tableId, prefix, prefixLen)) { if (strncmp(tableId, prefix, prefixLen)) {
continue; continue;
} }
numOfRead++; char meterName[TSDB_METER_NAME_LEN] = {0};
memset(meterName, 0, tListLen(meterName)); memset(meterName, 0, tListLen(meterName));
numOfRead++;
// pattern compare for meter name // pattern compare for meter name
extractTableName(tableId, meterName); extractTableName(tableId, meterName);
...@@ -867,19 +874,3 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon ...@@ -867,19 +874,3 @@ int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pCon
return numOfRows; return numOfRows;
} }
SSchema *mgmtGetTableSchema(STabObj *pTable) {
if (pTable == NULL) {
return NULL;
}
if (!mgmtTableCreateFromSuperTable(pTable)) {
return (SSchema *)pTable->schema;
}
STabObj *pMetric = mgmtGetTable(pTable->pTagData);
assert(pMetric != NULL);
return (SSchema *)pMetric->schema;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册