提交 b973c6e3 编写于 作者: S slguan

for table sdb

上级 06db5c54
......@@ -181,11 +181,10 @@ typedef struct {
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int32_t schemaSize;
int8_t reserved[3];
int8_t updateEnd[1];
int16_t nextColId;
char* schema;
SSchema* schema;
} SNormalTableObj;
typedef struct {
......@@ -197,13 +196,12 @@ typedef struct {
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int32_t schemaSize;
int16_t sqlLen;
int8_t reserved[3];
int8_t updateEnd[1];
int16_t nextColId;
char* pSql; //null-terminated string
char* schema;
char* sql; //null-terminated string
SSchema* schema;
} SStreamTableObj;
typedef struct _vg_obj {
......@@ -214,7 +212,7 @@ typedef struct _vg_obj {
uint64_t lastRemove;
int32_t numOfVnodes;
SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT];
int32_t numOfMeters;
int32_t numOfTables;
int32_t lbIp;
int32_t lbTime;
int8_t lbStatus;
......
......@@ -33,8 +33,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
SChildTableObj* mgmtGetChildTable(char *tableId);
SSchema* mgmtGetChildTableSchema(SChildTableObj *pTable);
int8_t * mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32_t vnode, int32_t tagDataLen, int8_t *pTagData);
int8_t * mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup);
#ifdef __cplusplus
}
......
......@@ -26,11 +26,11 @@ extern "C" {
extern void *mgmtStatusTimer;
int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int32_t tagDataLen, int8_t *pTagData);
int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup);
int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup);
int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup);
int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup);
int mgmtSendRemoveMeterMsgToDnode(STableInfo *pTable, SVgObj *pVgroup);
int mgmtSendVPeersMsg(SVgObj *pVgroup);
int mgmtSendFreeVnodeMsg(SVgObj *pVgroup);
int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid);
......
......@@ -22,7 +22,6 @@ extern "C" {
#include <stdint.h>
#include <stdbool.h>
#include "mnode.h"
int32_t mgmtInitNormalTables();
......@@ -32,9 +31,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
SNormalTableObj* mgmtGetNormalTable(char *tableId);
SSchema* mgmtGetNormalTableSchema(SNormalTableObj *pTable);
int8_t * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int32_t vnode);
int8_t * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
#ifdef __cplusplus
}
......
......@@ -26,7 +26,6 @@ extern "C" {
int mgmtInitShell();
void mgmtCleanUpShell();
int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey);
extern int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType);
extern int32_t (*mgmtProcessAlterAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn);
......
......@@ -22,7 +22,6 @@ extern "C" {
#include <stdint.h>
#include <stdbool.h>
#include "mnode.h"
int32_t mgmtInitStreamTables();
......@@ -31,11 +30,7 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVg
int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable);
int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter);
SStreamTableObj* mgmtGetStreamTable(char *tableId);
SSchema* mgmtGetStreamTableSchema(SStreamTableObj *pTable);
int8_t * mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int32_t vnode);
int8_t * mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup);
#ifdef __cplusplus
}
......
......@@ -38,8 +38,6 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *
int32_t mgmtAddSuperTableColumn(SSuperTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *colName);
SSchema* mgmtGetSuperTableSchema(SSuperTableObj *pTable);
#ifdef __cplusplus
}
#endif
......
......@@ -37,8 +37,8 @@ int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
void mgmtCleanUpMeters();
int32_t mgmtAddMeterIntoMetric(SSuperTableObj *pMetric, SChildTableObj *pTable);
int32_t mgmtRemoveMeterFromMetric(SSuperTableObj *pMetric, SChildTableObj *pTable);
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
......
......@@ -15,15 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtGrant.h"
#include "mgmtUtil.h"
#include "mgmtDb.h"
#include "mgmtDnodeInt.h"
#include "mgmtVgroup.h"
#include "mgmtTable.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
......@@ -33,24 +24,28 @@
#include "tsqlfunction.h"
#include "ttime.h"
#include "tstatus.h"
#include "sdb.h"
#include "mgmtChildTable.h"
#include "tutil.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtChildTable.h"
#include "mgmtDb.h"
#include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUtil.h"
#include "mgmtVgroup.h"
void *tsChildTableSdb;
void *(*mgmtChildTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *(*mgmtChildTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionReset(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionDestroy(void *row, char *str, int size, int *ssize);
void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtChildTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtChildTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtChildTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtChildTableActionReset(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtChildTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
static void mgmtDestroyChildTable(SChildTableObj *pTable) {
free(pTable);
......@@ -66,17 +61,17 @@ static void mgmtChildTableActionInit() {
mgmtChildTableActionFp[SDB_TYPE_DESTROY] = mgmtChildTableActionDestroy;
}
void *mgmtChildTableActionReset(void *row, char *str, int size, int *ssize) {
void *mgmtChildTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
return NULL;
}
void *mgmtChildTableActionDestroy(void *row, char *str, int size, int *ssize) {
void *mgmtChildTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
SChildTableObj *pTable = (SChildTableObj *)row;
mgmtDestroyChildTable(pTable);
return NULL;
}
void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize) {
void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
SChildTableObj *pTable = (SChildTableObj *) row;
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
......@@ -97,30 +92,30 @@ void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize) {
return NULL;
}
if (!sdbMaster) {
int sid = taosAllocateId(pVgroup->idPool);
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("sid:%d is not matched from the master:%d", sid, pTable->sid);
return NULL;
}
}
mgmtAddMeterIntoMetric(pTable->superTableId, pTable);
pTable->superTable = mgmtGetSuperTable(pTable->superTableId);
mgmtAddTableIntoSuperTable(pTable->superTable);
pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1);
pVgroup->numOfMeters++;
pVgroup->numOfTables++;
pDb->numOfTables++;
pVgroup->tableList[pTable->sid] = pTable;
pVgroup->tableList[pTable->sid] = (STableInfo *) pTable;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
}
return NULL;
}
void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize) {
void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
SChildTableObj *pTable = (SChildTableObj *) row;
if (pTable->vgId == 0) {
return NULL;
......@@ -146,43 +141,43 @@ void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize) {
pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1);
pVgroup->tableList[pTable->sid] = NULL;
pVgroup->numOfMeters--;
pVgroup->numOfTables--;
pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid);
mgmtRemoveMeterFromMetric(pTable->superTable, pTable);
mgmtRemoveTableFromSuperTable(pTable->superTable);
if (pVgroup->numOfMeters > 0) {
if (pVgroup->numOfTables > 0) {
mgmtMoveVgroupToHead(pDb, pVgroup);
}
return NULL;
}
void *mgmtChildTableActionUpdate(void *row, char *str, int size, int *ssize) {
void *mgmtChildTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
return mgmtChildTableActionReset(row, str, size, NULL);
}
void *mgmtChildTableActionEncode(void *row, char *str, int size, int *ssize) {
void *mgmtChildTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
SChildTableObj *pTable = (SChildTableObj *) row;
assert(row != NULL && str != NULL);
int tsize = pTable->updateEnd - (int8_t *) pTable;
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(str, pTable, tsize);
return NULL;
}
void *mgmtChildTableActionDecode(void *row, char *str, int size, int *ssize) {
void *mgmtChildTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
assert(str != NULL);
SChildTableObj *pTable = (SChildTableObj *)malloc(sizeof(SChildTableObj));
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(STabObj));
memset(pTable, 0, sizeof(SChildTableObj));
int tsize = pTable->updateEnd - (int8_t *)pTable;
int32_t tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
mgmtDestroyChildTable(pTable);
return NULL;
......@@ -192,7 +187,7 @@ void *mgmtChildTableActionDecode(void *row, char *str, int size, int *ssize) {
return (void *)pTable;
}
void *mgmtChildTableAction(char action, void *row, char *str, int size, int *ssize) {
void *mgmtChildTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
if (mgmtChildTableActionFp[(uint8_t)action] != NULL) {
return (*(mgmtChildTableActionFp[(uint8_t)action]))(row, str, size, ssize);
}
......@@ -200,15 +195,46 @@ void *mgmtChildTableAction(char action, void *row, char *str, int size, int *ssi
}
int32_t mgmtInitChildTables() {
void *pNode = NULL;
void *pLastNode = NULL;
SChildTableObj *pTable = NULL;
mgmtChildTableActionInit();
tsChildTableSdb = sdbOpenTable(tsMaxTables, sizeof(SChildTableObj),
"ctables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtChildTableAction);
if (tsChildTableSdb == NULL) {
mError("failed to init child table data");
return -1;
}
pNode = NULL;
while (1) {
pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) {
break;
}
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("super table:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsChildTableSdb, pTable);
pNode = pLastNode;
continue;
}
}
mgmtSetVgroupIdPool();
mTrace("child table is initialized");
return 0;
}
void mgmtCleanUpChildTables() {
}
int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32_t vnode, int32_t tagDataLen,
int8_t *pTagData) {
SCreateTableMsg *pCreateTable = (SCreateTableMsg *) pMsg;
int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) {
// SCreateTableMsg *pCreateTable = (SCreateTableMsg *) pMsg;
// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
// memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN);
// pCreateTable->vnode = htonl(vnode);
......@@ -235,59 +261,56 @@ int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32
// memcpy(pCreateTable->data + totalColsSize, pTagData, tagDataLen);
// pCreateTable->tagDataLen = htonl(tagDataLen);
return pMsg;
return NULL;
}
int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
// int numOfTables = sdbGetNumOfRows(tsChildTableSdb);
// if (numOfTables >= tsMaxTables) {
// mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables);
// return TSDB_CODE_TOO_MANY_TABLES;
// }
//
// char *pTagData = (char *) pCreate->schema; // it is a tag key
// SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData);
// if (pSuperTable == NULL) {
// mError("table:%s, corresponding super table does not exist", pCreate->meterId);
// return TSDB_CODE_INVALID_TABLE;
// }
//
// SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1);
// if (pTable == NULL) {
// return TSDB_CODE_SERV_OUT_OF_MEMORY;
// }
// strcpy(pTable->tableId, pCreate->meterId);
// strcpy(pTable->superTableId, pSuperTable->tableId);
// pTable->createdTime = taosGetTimestampMs();
// pTable->superTable = pSuperTable;
// pTable->vgId = pVgroup->vgId;
// pTable->sid = sid;
// pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
// ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
//
// SVariableMsg tags = {0};
// tags.size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t) TSDB_TABLE_ID_LEN;
// tags.data = (char *) calloc(1, tags.size);
// if (tags.data == NULL) {
// free(pTable);
// mError("table:%s, corresponding super table schema is null", pCreate->meterId);
// return TSDB_CODE_INVALID_TABLE;
// }
// memcpy(tags.data, pTagData, tags.size);
//
// if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) {
// mError("table:%s, update sdb error", pCreate->meterId);
// return TSDB_CODE_SDB_ERROR;
// }
//
// mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1);
//
// mgmtSendCreateChildTableMsg(pTable, pVgroup, tags.size, tags.data);
//
// mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%"
// PRIu64
// " db:%s",
// pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= tsMaxTables) {
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables);
return TSDB_CODE_TOO_MANY_TABLES;
}
char *pTagData = (char *) pCreate->schema; // it is a tag key
SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData);
if (pSuperTable == NULL) {
mError("table:%s, corresponding super table does not exist", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE;
}
SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1);
if (pTable == NULL) {
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
strcpy(pTable->tableId, pCreate->meterId);
strcpy(pTable->superTableId, pSuperTable->tableId);
pTable->createdTime = taosGetTimestampMs();
pTable->superTable = pSuperTable;
pTable->vgId = pVgroup->vgId;
pTable->sid = sid;
pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul));
int32_t size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t) TSDB_TABLE_ID_LEN;
SSchema * schema = (SSchema *) calloc(1, size);
if (schema == NULL) {
free(pTable);
mError("table:%s, corresponding super table schema is null", pCreate->meterId);
return TSDB_CODE_INVALID_TABLE;
}
memcpy(schema, pTagData + TSDB_TABLE_ID_LEN + 1, size);
if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->meterId);
return TSDB_CODE_SDB_ERROR;
}
mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1);
mgmtSendCreateTableMsg(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
return 0;
}
......@@ -308,10 +331,10 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
}
mgmtRestoreTimeSeries(pTable->superTable->numOfColumns - 1);
mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup);
mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
if (pVgroup->numOfMeters <= 0) {
if (pVgroup->numOfTables <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
......@@ -323,7 +346,7 @@ SChildTableObj* mgmtGetChildTable(char *tableId) {
}
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) {
// int col = mgmtFindSuperTableTagIndex(pTable->superTable, tagName);
// int32_t col = mgmtFindSuperTableTagIndex(pTable->superTable, tagName);
// if (col < 0 || col > pTable->superTable->numOfTags) {
// return TSDB_CODE_APP_ERROR;
// }
......@@ -332,7 +355,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
// mTrace("Succeed to modify tag column %d of table %s", col, pTable->tableId);
// return TSDB_CODE_SUCCESS;
// int rowSize = 0;
// int32_t rowSize = 0;
// SSchema *schema = (SSchema *)(pSuperTable->schema + (pSuperTable->numOfColumns + col) * sizeof(SSchema));
//
// if (col == 0) {
......@@ -345,7 +368,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
// }
//
// // Encode the string
// int size = sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW + 1;
// int32_t size = sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW + 1;
// char *msg = (char *)malloc(size);
// if (msg == NULL) {
// mError("failed to allocate message memory while modify tag value");
......@@ -367,5 +390,6 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
//
// mTrace("Succeed to modify tag column %d of table %s", col, pTable->meterId);
// return TSDB_CODE_SUCCESS;
return 0;
}
......@@ -95,11 +95,11 @@ int32_t mgmtProcessMeterCfgMsg(int8_t *pCont, int32_t contLen, void *pConn) {
int8_t *pCreateTableMsg = NULL;
if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable, vnode);
pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable);
} else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) {
pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable, vnode);
pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable);
} else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) {
pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable, vnode);
pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable);
} else {}
if (pCreateTableMsg != NULL) {
......@@ -210,7 +210,7 @@ void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, vo
}
}
int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int32_t tagDataLen, int8_t *pTagData) {
int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) {
// uint64_t timeStamp = taosGetTimestampMs();
//
// for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) {
......@@ -280,9 +280,10 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) {
//
// pVgroup->lastCreate = timeStamp;
// return 0;
return 0;
}
int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
int mgmtSendRemoveMeterMsgToDnode(STableInfo *pTable, SVgObj *pVgroup) {
// SDRemoveTableMsg *pRemove;
// char * pMsg, *pStart;
// int i, msgLen = 0;
......
......@@ -15,15 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtGrant.h"
#include "mgmtUtil.h"
#include "mgmtDb.h"
#include "mgmtDnodeInt.h"
#include "mgmtVgroup.h"
#include "mgmtTable.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
......@@ -33,22 +24,28 @@
#include "tsqlfunction.h"
#include "ttime.h"
#include "tstatus.h"
#include "sdb.h"
#include "tutil.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtDb.h"
#include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtNormalTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUtil.h"
#include "mgmtVgroup.h"
void *tsNormalTableSdb;
void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionReset(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionDestroy(void *row, char *str, int size, int *ssize);
void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtNormalTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtNormalTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtNormalTableActionReset(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtNormalTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
static void mgmtDestroyNormalTable(SNormalTableObj *pTable) {
free(pTable->schema);
......@@ -65,20 +62,20 @@ static void mgmtNormalTableActionInit() {
mgmtNormalTableActionFp[SDB_TYPE_DESTROY] = mgmtNormalTableActionDestroy;
}
void *mgmtNormalTableActionReset(void *row, char *str, int size, int *ssize) {
void *mgmtNormalTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
int tsize = pTable->updateEnd - (int8_t *) pTable;
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(pTable, str, tsize);
return NULL;
}
void *mgmtNormalTableActionDestroy(void *row, char *str, int size, int *ssize) {
void *mgmtNormalTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *)row;
mgmtDestroyNormalTable(pTable);
return NULL;
}
void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize) {
void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
......@@ -100,7 +97,7 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize) {
}
if (!sdbMaster) {
int sid = taosAllocateId(pVgroup->idPool);
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("sid:%d is not matched from the master:%d", sid, pTable->sid);
return NULL;
......@@ -108,18 +105,18 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize) {
}
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
pVgroup->numOfMeters++;
pVgroup->numOfTables++;
pDb->numOfTables++;
pVgroup->tableList[pTable->sid] = pTable;
pVgroup->tableList[pTable->sid] = (STableInfo *) pTable;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
}
return NULL;
}
void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize) {
void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
if (pTable->vgId == 0) {
return NULL;
......@@ -145,65 +142,67 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize) {
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
pVgroup->tableList[pTable->sid] = NULL;
pVgroup->numOfMeters--;
pVgroup->numOfTables--;
pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid);
if (pVgroup->numOfMeters > 0) {
if (pVgroup->numOfTables > 0) {
mgmtMoveVgroupToHead(pDb, pVgroup);
}
return NULL;
}
void *mgmtNormalTableActionUpdate(void *row, char *str, int size, int *ssize) {
void *mgmtNormalTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
return mgmtNormalTableActionReset(row, str, size, NULL);
}
void *mgmtNormalTableActionEncode(void *row, char *str, int size, int *ssize) {
void *mgmtNormalTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
assert(row != NULL && str != NULL);
int tsize = pTable->updateEnd - (int8_t *) pTable;
if (size < tsize + pTable->schemaSize + 1) {
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
if (size < tsize + schemaSize + 1) {
*ssize = -1;
return NULL;
}
memcpy(str, pTable, tsize);
memcpy(str + tsize, pTable->schema, pTable->schemaSize);
*ssize = tsize + pTable->schemaSize;
memcpy(str + tsize, pTable->schema, schemaSize);
*ssize = tsize + schemaSize;
return NULL;
}
void *mgmtNormalTableActionDecode(void *row, char *str, int size, int *ssize) {
void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
assert(str != NULL);
SNormalTableObj *pTable = (SNormalTableObj *)malloc(sizeof(SNormalTableObj));
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(STabObj));
memset(pTable, 0, sizeof(SNormalTableObj));
int tsize = pTable->updateEnd - (int8_t *)pTable;
int32_t tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
mgmtDestroyNormalTable(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
pTable->schema = (char *)malloc(pTable->schemaSize);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = (SSchema *)malloc(schemaSize);
if (pTable->schema == NULL) {
mgmtDestroyNormalTable(pTable);
return NULL;
}
memcpy(pTable->schema, str + tsize, pTable->schemaSize);
memcpy(pTable->schema, str + tsize, schemaSize);
return (void *)pTable;
}
void *mgmtNormalTableAction(char action, void *row, char *str, int size, int *ssize) {
void *mgmtNormalTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
if (mgmtNormalTableActionFp[(uint8_t)action] != NULL) {
return (*(mgmtNormalTableActionFp[(uint8_t)action]))(row, str, size, ssize);
}
......@@ -211,6 +210,38 @@ void *mgmtNormalTableAction(char action, void *row, char *str, int size, int *ss
}
int32_t mgmtInitNormalTables() {
void *pNode = NULL;
void *pLastNode = NULL;
SChildTableObj *pTable = NULL;
mgmtNormalTableActionInit();
tsNormalTableSdb = sdbOpenTable(tsMaxTables, sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,
"ntables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtNormalTableAction);
if (tsNormalTableSdb == NULL) {
mError("failed to init normal table data");
return -1;
}
pNode = NULL;
while (1) {
pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) {
break;
}
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("normal table:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsNormalTableSdb, pTable);
pNode = pLastNode;
continue;
}
}
mgmtSetVgroupIdPool();
mTrace("normal table is initialized");
return 0;
}
......@@ -218,19 +249,19 @@ void mgmtCleanUpNormalTables() {
sdbCloseTable(tsNormalTableSdb);
}
int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int32_t vnode) {
int8_t *pMsg = NULL;
SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg;
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
pCreateTable->vnode = htobe32(vnode);
pCreateTable->sid = htobe32(pTable->sid);
pCreateTable->uid = htobe64(pTable->uid);
pCreateTable->createdTime = htobe64(pTable->createdTime);
pCreateTable->sversion = htobe32(pTable->sversion);
pCreateTable->numOfColumns = htobe16(pTable->numOfColumns);
SSchema *pSchema = pTable->schema;
int32_t totalCols = pCreateTable->numOfColumns;
int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) {
// int8_t *pMsg = NULL;
// SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg;
// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
// pCreateTable->vnode = htobe32(vnode);
// pCreateTable->sid = htobe32(pTable->sid);
// pCreateTable->uid = htobe64(pTable->uid);
// pCreateTable->createdTime = htobe64(pTable->createdTime);
// pCreateTable->sversion = htobe32(pTable->sversion);
// pCreateTable->numOfColumns = htobe16(pTable->numOfColumns);
//
// SSchema *pSchema = pTable->schema;
// int32_t totalCols = pCreateTable->numOfColumns;
// for (int32_t col = 0; col < totalCols; ++col) {
// SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
......@@ -242,11 +273,12 @@ int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int32_t vnode) {
// int32_t totalColsSize = sizeof(SMColumn *) * totalCols;
// pMsg = pCreateTable->data + totalColsSize;
return pMsg;
// return pMsg;
return NULL;
}
int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int numOfTables = sdbGetNumOfRows(tsChildTableSdb);
int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) {
mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES);
return TSDB_CODE_TOO_MANY_TABLES;
......@@ -265,9 +297,9 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
pTable->sversion = 0;
pTable->numOfColumns = pCreate->numOfColumns;
int numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
pTable->schemaSize = numOfCols * sizeof(SSchema);
pTable->schema = (int8_t *) calloc(1, pTable->schemaSize);
int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
int32_t schemaSize = numOfCols * sizeof(SSchema);
pTable->schema = (SSchema *) calloc(1, schemaSize);
if (pTable->schema == NULL) {
free(pTable);
mError("table:%s, no schema input", pCreate->meterId);
......@@ -276,7 +308,7 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
pTable->nextColId = 0;
for (int col = 0; col < pCreate->numOfColumns; col++) {
for (int32_t col = 0; col < pCreate->numOfColumns; col++) {
SSchema *tschema = (SSchema *) pTable->schema;
tschema[col].colId = pTable->nextColId++;
}
......@@ -315,11 +347,11 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
mgmtRestoreTimeSeries(pTable->numOfColumns - 1);
mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup);
mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
if (pVgroup->numOfMeters <= 0) {
if (pVgroup->numOfTables <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
......@@ -341,12 +373,12 @@ static int32_t mgmtFindNormalTableColumnIndex(SNormalTableObj *pTable, char *col
return -1;
}
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int ncols) {
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols) {
if (ncols <= 0) {
return TSDB_CODE_APP_ERROR;
}
for (int i = 0; i < ncols; i++) {
for (int32_t i = 0; i < ncols; i++) {
if (mgmtFindNormalTableColumnIndex(pTable, schema[i].name) > 0) {
return TSDB_CODE_APP_ERROR;
}
......@@ -364,16 +396,16 @@ int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int
return TSDB_CODE_APP_ERROR;
}
pTable->schema = realloc(pTable->schema, pTable->schemaSize + sizeof(SSchema) * ncols);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = realloc(pTable->schema, schemaSize + sizeof(SSchema) * ncols);
memcpy(pTable->schema + pTable->schemaSize, schema, sizeof(SSchema) * ncols);
memcpy(pTable->schema + schemaSize, schema, sizeof(SSchema) * ncols);
SSchema *tschema = (SSchema *) (pTable->schema + sizeof(SSchema) * pTable->numOfColumns);
for (int i = 0; i < ncols; i++) {
for (int32_t i = 0; i < ncols; i++) {
tschema[i].colId = pTable->nextColId++;
}
pTable->schemaSize += sizeof(SSchema) * ncols;
pTable->numOfColumns += ncols;
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries += ncols;
......@@ -403,10 +435,7 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName)
memmove(pTable->schema + sizeof(SSchema) * col, pTable->schema + sizeof(SSchema) * (col + 1),
sizeof(SSchema) * (pTable->numOfColumns - col - 1));
pTable->schemaSize -= sizeof(SSchema);
pTable->numOfColumns--;
pTable->schema = realloc(pTable->schema, pTable->schemaSize);
pTable->sversion++;
pAcct->acctInfo.numOfTimeSeries--;
......
......@@ -45,10 +45,10 @@ static GetMateFp* mgmtGetMetaFp;
static RetrieveMetaFp* mgmtRetrieveFp;
static void mgmtInitShowMsgFp();
void * tsShellConn = NULL;
SConnObj *connList;
void * mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle);
void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
int (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(char *, int, SConnObj *);
void mgmtInitProcessShellMsg();
int mgmtRedirectMsg(SConnObj *pConn, int msgType);
......@@ -164,20 +164,21 @@ static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMe
* @return
*/
bool mgmtCheckMeterMetaMsgType(char *pMsg) {
SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
int16_t autoCreate = htons(pInfo->createFlag);
STableInfo *table = mgmtGetTable(pInfo->meterId);
// SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
//
// int16_t autoCreate = htons(pInfo->createFlag);
// STableInfo *table = mgmtGetTable(pInfo->meterId);
// If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue
// If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue
// bool addIntoTranQueue = (pMeterObj == NULL && autoCreate == 1);
// if (addIntoTranQueue) {
// mTrace("meter:%s auto created task added", pInfo->meterId);
// }
bool addIntoTranQueue = true;
// bool addIntoTranQueue = true;
return addIntoTranQueue;
// return addIntoTranQueue;
return 0;
}
int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
......@@ -1296,7 +1297,7 @@ void mgmtEstablishConn(SConnObj *pConn) {
// mgmtAddConnIntoAcct(pConn);
}
int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey) {
int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SUserObj *pUser = NULL;
*spi = 0;
......@@ -1421,7 +1422,7 @@ int mgmtProcessConnectMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0;
}
void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
// SIntMsg * pMsg = (SIntMsg *)msg;
// SConnObj *pConn = (SConnObj *)ahandle;
//
......@@ -1506,7 +1507,6 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
// }
//
// return pConn;
return NULL;
}
void mgmtInitProcessShellMsg() {
......
......@@ -15,15 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtGrant.h"
#include "mgmtUtil.h"
#include "mgmtDb.h"
#include "mgmtDnodeInt.h"
#include "mgmtVgroup.h"
#include "mgmtTable.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
......@@ -33,26 +24,32 @@
#include "tsqlfunction.h"
#include "ttime.h"
#include "tstatus.h"
#include "sdb.h"
#include "tutil.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtDb.h"
#include "mgmtDnodeInt.h"
#include "mgmtGrant.h"
#include "mgmtStreamTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUtil.h"
#include "mgmtVgroup.h"
void *tsStreamTableSdb;
void *(*mgmtStreamTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize);
void *(*mgmtStreamTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionUpdate(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionEncode(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionDecode(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionReset(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionDestroy(void *row, char *str, int size, int *ssize);
void *mgmtStreamTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionReset(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtStreamTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize);
static void mgmtDestroyStreamTable(SStreamTableObj *pTable) {
free(pTable->schema);
free(pTable->pSql);
free(pTable->sql);
free(pTable);
}
......@@ -66,24 +63,27 @@ static void mgmtStreamTableActionInit() {
mgmtStreamTableActionFp[SDB_TYPE_DESTROY] = mgmtStreamTableActionDestroy;
}
void *mgmtStreamTableActionReset(void *row, char *str, int size, int *ssize) {
void *mgmtStreamTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SStreamTableObj *pTable = (SStreamTableObj *) row;
int tsize = pTable->updateEnd - (int8_t *) pTable;
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(pTable, str, tsize);
pTable->schema = (char *) realloc(pTable->schema, pTable->schemaSize);
memcpy(pTable->schema, str + tsize, pTable->schemaSize);
pTable->pSql = (char *) realloc(pTable->pSql, pTable->sqlLen);
memcpy(pTable->pSql, str + tsize + pTable->schemaSize, pTable->sqlLen);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = (SSchema *) realloc(pTable->schema, schemaSize);
memcpy(pTable->schema, str + tsize, schemaSize);
pTable->sql = (char *) realloc(pTable->sql, pTable->sqlLen);
memcpy(pTable->sql, str + tsize + schemaSize, pTable->sqlLen);
return NULL;
}
void *mgmtStreamTableActionDestroy(void *row, char *str, int size, int *ssize) {
void *mgmtStreamTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
SStreamTableObj *pTable = (SStreamTableObj *)row;
mgmtDestroyStreamTable(pTable);
return NULL;
}
void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize) {
void *mgmtStreamTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
......@@ -105,7 +105,7 @@ void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize) {
}
if (!sdbMaster) {
int sid = taosAllocateId(pVgroup->idPool);
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid != pTable->sid) {
mError("sid:%d is not matched from the master:%d", sid, pTable->sid);
return NULL;
......@@ -113,18 +113,18 @@ void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize) {
}
pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1);
pVgroup->numOfMeters++;
pVgroup->numOfTables++;
pDb->numOfTables++;
pVgroup->tableList[pTable->sid] = pTable;
pVgroup->tableList[pTable->sid] = (STableInfo *) pTable;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) {
mgmtMoveVgroupToTail(pDb, pVgroup);
}
return NULL;
}
void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize) {
void *mgmtStreamTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
SNormalTableObj *pTable = (SNormalTableObj *) row;
if (pTable->vgId == 0) {
return NULL;
......@@ -150,40 +150,41 @@ void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize) {
pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1);
pVgroup->tableList[pTable->sid] = NULL;
pVgroup->numOfMeters--;
pVgroup->numOfTables--;
pDb->numOfTables--;
taosFreeId(pVgroup->idPool, pTable->sid);
if (pVgroup->numOfMeters > 0) {
if (pVgroup->numOfTables > 0) {
mgmtMoveVgroupToHead(pDb, pVgroup);
}
return NULL;
}
void *mgmtStreamTableActionUpdate(void *row, char *str, int size, int *ssize) {
void *mgmtStreamTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
return mgmtStreamTableActionReset(row, str, size, NULL);
}
void *mgmtStreamTableActionEncode(void *row, char *str, int size, int *ssize) {
void *mgmtStreamTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
SStreamTableObj *pTable = (SStreamTableObj *) row;
assert(row != NULL && str != NULL);
int tsize = pTable->updateEnd - (int8_t *) pTable;
if (size < tsize + pTable->schemaSize + pTable->sqlLen + 1) {
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
if (size < tsize + schemaSize + pTable->sqlLen + 1) {
*ssize = -1;
return NULL;
}
memcpy(str, pTable, tsize);
memcpy(str + tsize, pTable->schema, pTable->schemaSize);
memcpy(str + tsize + pTable->schemaSize, pTable->pSql, pTable->sqlLen);
*ssize = tsize + pTable->schemaSize + pTable->sqlLen;
memcpy(str + tsize, pTable->schema, schemaSize);
memcpy(str + tsize + schemaSize, pTable->sql, pTable->sqlLen);
*ssize = tsize + schemaSize + pTable->sqlLen;
return NULL;
}
void *mgmtStreamTableActionDecode(void *row, char *str, int size, int *ssize) {
void *mgmtStreamTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
assert(str != NULL);
SStreamTableObj *pTable = (SStreamTableObj *)malloc(sizeof(SNormalTableObj));
......@@ -192,30 +193,31 @@ void *mgmtStreamTableActionDecode(void *row, char *str, int size, int *ssize) {
}
memset(pTable, 0, sizeof(STabObj));
int tsize = pTable->updateEnd - (int8_t *)pTable;
int32_t tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
mgmtDestroyStreamTable(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
pTable->schema = (char *)malloc(pTable->schemaSize);
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = (SSchema *)malloc(schemaSize);
if (pTable->schema == NULL) {
mgmtDestroyStreamTable(pTable);
return NULL;
}
memcpy(pTable->schema, str + tsize, pTable->schemaSize);
memcpy(pTable->schema, str + tsize, schemaSize);
pTable->pSql = (char *)malloc(pTable->sqlLen);
if (pTable->pSql == NULL) {
pTable->sql = (char *)malloc(pTable->sqlLen);
if (pTable->sql == NULL) {
mgmtDestroyStreamTable(pTable);
return NULL;
}
memcpy(pTable->pSql, str + tsize + pTable->schemaSize, pTable->sqlLen);
memcpy(pTable->sql, str + tsize + schemaSize, pTable->sqlLen);
return (void *)pTable;
}
void *mgmtStreamTableAction(char action, void *row, char *str, int size, int *ssize) {
void *mgmtStreamTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) {
if (mgmtStreamTableActionFp[(uint8_t)action] != NULL) {
return (*(mgmtStreamTableActionFp[(uint8_t)action]))(row, str, size, ssize);
}
......@@ -223,25 +225,57 @@ void *mgmtStreamTableAction(char action, void *row, char *str, int size, int *ss
}
int32_t mgmtInitStreamTables() {
void *pNode = NULL;
void *pLastNode = NULL;
SChildTableObj *pTable = NULL;
mgmtStreamTableActionInit();
tsStreamTableSdb = sdbOpenTable(tsMaxTables, sizeof(SStreamTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN,
"streams", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtStreamTableAction);
if (tsStreamTableSdb == NULL) {
mError("failed to init stream table data");
return -1;
}
pNode = NULL;
while (1) {
pNode = sdbFetchRow(tsStreamTableSdb, pNode, (void **)&pTable);
if (pTable == NULL) {
break;
}
SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId);
if (pDb == NULL) {
mError("stream table:%s, failed to get db, discard it", pTable->tableId);
sdbDeleteRow(tsStreamTableSdb, pTable);
pNode = pLastNode;
continue;
}
}
mgmtSetVgroupIdPool();
mTrace("stream table is initialized");
return 0;
}
void mgmtCleanUpStreamTables() {
}
int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int32_t vnode) {
SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg;
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
pCreateTable->vnode = htonl(vnode);
pCreateTable->sid = htonl(pTable->sid);
pCreateTable->uid = pTable->uid;
pCreateTable->createdTime = htobe64(pTable->createdTime);
pCreateTable->sversion = htonl(pTable->sversion);
pCreateTable->numOfColumns = htons(pTable->numOfColumns);
//pCreateTable->sqlLen = htons(pTable->sqlLen);
SSchema *pSchema = pTable->schema;
int32_t totalCols = pCreateTable->numOfColumns;
int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) {
// SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg;
// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN);
// pCreateTable->vnode = htonl(vnode);
// pCreateTable->sid = htonl(pTable->sid);
// pCreateTable->uid = pTable->uid;
// pCreateTable->createdTime = htobe64(pTable->createdTime);
// pCreateTable->sversion = htonl(pTable->sversion);
// pCreateTable->numOfColumns = htons(pTable->numOfColumns);
// //pCreateTable->sqlLen = htons(pTable->sqlLen);
//
// SSchema *pSchema = pTable->schema;
// int32_t totalCols = pCreateTable->numOfColumns;
// for (int32_t col = 0; col < totalCols; ++col) {
// SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col];
......@@ -256,11 +290,12 @@ int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int
// char *sql = pTable->schema + pTable->schemaSize;
// memcpy(pCreateTable->data + totalColsSize, pTable->sqlLen, sql);
return pMsg;
// return pMsg;
return NULL;
}
int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) {
int numOfTables = sdbGetNumOfRows(tsStreamTableSdb);
int32_t numOfTables = sdbGetNumOfRows(tsStreamTableSdb);
if (numOfTables >= TSDB_MAX_TABLES) {
mError("stream table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES);
return TSDB_CODE_TOO_MANY_TABLES;
......@@ -279,9 +314,9 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
pTable->sversion = 0;
pTable->numOfColumns = pCreate->numOfColumns;
int numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
pTable->schemaSize = numOfCols * sizeof(SSchema) + pCreate->sqlLen;
pTable->schema = (int8_t *) calloc(1, pTable->schemaSize);
int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema);
pTable->schema = (SSchema *) calloc(1, schemaSize);
if (pTable->schema == NULL) {
free(pTable);
mError("table:%s, no schema input", pCreate->meterId);
......@@ -290,15 +325,15 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
pTable->nextColId = 0;
for (int col = 0; col < pCreate->numOfColumns; col++) {
for (int32_t col = 0; col < pCreate->numOfColumns; col++) {
SSchema *tschema = (SSchema *) pTable->schema;
tschema[col].colId = pTable->nextColId++;
}
pTable->pSql = pTable->schema + numOfCols * sizeof(SSchema);
memcpy(pTable->pSql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen);
pTable->pSql[pCreate->sqlLen - 1] = 0;
mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->pSql);
pTable->sql = (char*)(pTable->schema + numOfCols * sizeof(SSchema));
memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen);
pTable->sql[pCreate->sqlLen - 1] = 0;
mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->sql);
if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) {
mError("table:%s, update sdb error", pCreate->meterId);
......@@ -334,11 +369,11 @@ int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) {
mgmtRestoreTimeSeries(pTable->numOfColumns - 1);
mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup);
mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup);
sdbDeleteRow(tsChildTableSdb, pTable);
if (pVgroup->numOfMeters <= 0) {
if (pVgroup->numOfTables <= 0) {
mgmtDropVgroup(pDb, pVgroup);
}
......
......@@ -152,8 +152,8 @@ int32_t mgmtInitSuperTables() {
mgmtSuperTableActionInit();
tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN,
"stable", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction);
tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,
"stables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction);
if (tsSuperTableSdb == NULL) {
mError("failed to init super table data");
return -1;
......@@ -585,14 +585,10 @@ int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, SConn
return numOfRows;
}
int32_t mgmtAddMeterIntoMetric(SSuperTableObj *pStable, SChildTableObj *pTable) {
if (pTable != NULL && pStable != NULL) return -1;
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) {
pStable->numOfTables++;
return 0;
}
int32_t mgmtRemoveMeterFromMetric(SSuperTableObj *pStable, SChildTableObj *pTable) {
if (pTable != NULL && pStable != NULL) return -1;
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable) {
pStable->numOfTables--;
return 0;
}
\ No newline at end of file
......@@ -209,7 +209,7 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
STableInfo *pTable;
if (pVgroup->numOfMeters > 0) {
if (pVgroup->numOfTables > 0) {
for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) {
if (pVgroup->tableList != NULL) {
pTable = pVgroup->tableList[i];
......@@ -235,11 +235,11 @@ void mgmtSetVgroupIdPool() {
if (pVgroup == NULL || pVgroup->idPool == 0) break;
taosIdPoolSetFreeList(pVgroup->idPool);
pVgroup->numOfMeters = taosIdPoolNumOfUsed(pVgroup->idPool);
pVgroup->numOfTables = taosIdPoolNumOfUsed(pVgroup->idPool);
pDb = mgmtGetDb(pVgroup->dbName);
pDb->numOfTables += pVgroup->numOfMeters;
if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1)
pDb->numOfTables += pVgroup->numOfTables;
if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1)
mgmtAddVgroupIntoDbTail(pDb, pVgroup);
else
mgmtAddVgroupIntoDb(pDb, pVgroup);
......@@ -374,7 +374,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, SConnObj
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pVgroup->numOfMeters;
*(int32_t *)pWrite = pVgroup->numOfTables;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -422,7 +422,7 @@ void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize)
int32_t tsize = sizeof(STableInfo *) * pDb->cfg.maxSessions;
pVgroup->tableList = (STableInfo **)malloc(tsize);
memset(pVgroup->tableList, 0, tsize);
pVgroup->numOfMeters = 0;
pVgroup->numOfTables = 0;
pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions);
mgmtAddVgroupIntoDb(pDb, pVgroup);
mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册