From b973c6e30c47d211d109a72a8d35c99b30c56539 Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 20 Feb 2020 23:56:33 +0800 Subject: [PATCH] for table sdb --- src/inc/mnode.h | 18 ++- src/mnode/inc/mgmtChildTable.h | 3 +- src/mnode/inc/mgmtDnodeInt.h | 4 +- src/mnode/inc/mgmtNormalTable.h | 7 +- src/mnode/inc/mgmtShell.h | 1 - src/mnode/inc/mgmtStreamTable.h | 7 +- src/mnode/inc/mgmtSuperTable.h | 2 - src/mnode/inc/mgmtTable.h | 4 +- src/mnode/src/mgmtChildTable.c | 226 ++++++++++++++++++-------------- src/mnode/src/mgmtDnodeInt.c | 11 +- src/mnode/src/mgmtNormalTable.c | 177 ++++++++++++++----------- src/mnode/src/mgmtShell.c | 24 ++-- src/mnode/src/mgmtStreamTable.c | 189 +++++++++++++++----------- src/mnode/src/mgmtSuperTable.c | 12 +- src/mnode/src/mgmtVgroup.c | 12 +- 15 files changed, 384 insertions(+), 313 deletions(-) diff --git a/src/inc/mnode.h b/src/inc/mnode.h index ccd40d7b40..794e41cc6c 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -181,11 +181,10 @@ typedef struct { int64_t createdTime; int32_t sversion; int32_t numOfColumns; - int32_t schemaSize; - int8_t reserved[3]; - int8_t updateEnd[1]; + int8_t reserved[3]; + int8_t updateEnd[1]; int16_t nextColId; - char* schema; + SSchema* schema; } SNormalTableObj; typedef struct { @@ -197,13 +196,12 @@ typedef struct { int64_t createdTime; int32_t sversion; int32_t numOfColumns; - int32_t schemaSize; int16_t sqlLen; - int8_t reserved[3]; - int8_t updateEnd[1]; + int8_t reserved[3]; + int8_t updateEnd[1]; int16_t nextColId; - char* pSql; //null-terminated string - char* schema; + char* sql; //null-terminated string + SSchema* schema; } SStreamTableObj; typedef struct _vg_obj { @@ -214,7 +212,7 @@ typedef struct _vg_obj { uint64_t lastRemove; int32_t numOfVnodes; SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT]; - int32_t numOfMeters; + int32_t numOfTables; int32_t lbIp; int32_t lbTime; int8_t lbStatus; diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 397f8ac72f..0fa6355898 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -33,8 +33,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter); int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); SChildTableObj* mgmtGetChildTable(char *tableId); -SSchema* mgmtGetChildTableSchema(SChildTableObj *pTable); -int8_t * mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32_t vnode, int32_t tagDataLen, int8_t *pTagData); +int8_t * mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtDnodeInt.h b/src/mnode/inc/mgmtDnodeInt.h index c21081950b..20cb7e5e81 100644 --- a/src/mnode/inc/mgmtDnodeInt.h +++ b/src/mnode/inc/mgmtDnodeInt.h @@ -26,11 +26,11 @@ extern "C" { extern void *mgmtStatusTimer; -int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int32_t tagDataLen, int8_t *pTagData); +int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup); int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup); int32_t mgmtSendCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup); -int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup); +int mgmtSendRemoveMeterMsgToDnode(STableInfo *pTable, SVgObj *pVgroup); int mgmtSendVPeersMsg(SVgObj *pVgroup); int mgmtSendFreeVnodeMsg(SVgObj *pVgroup); int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index f6238a1b5b..b45b6bbd4e 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -22,19 +22,16 @@ extern "C" { #include #include - #include "mnode.h" int32_t mgmtInitNormalTables(); void mgmtCleanUpNormalTables(); -int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); +int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); SNormalTableObj* mgmtGetNormalTable(char *tableId); -SSchema* mgmtGetNormalTableSchema(SNormalTableObj *pTable); - -int8_t * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int32_t vnode); +int8_t * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index 22650adc3d..e1181ff0f3 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -26,7 +26,6 @@ extern "C" { int mgmtInitShell(); void mgmtCleanUpShell(); -int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey); extern int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType); extern int32_t (*mgmtProcessAlterAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); diff --git a/src/mnode/inc/mgmtStreamTable.h b/src/mnode/inc/mgmtStreamTable.h index e9f636482c..e980f76742 100644 --- a/src/mnode/inc/mgmtStreamTable.h +++ b/src/mnode/inc/mgmtStreamTable.h @@ -22,7 +22,6 @@ extern "C" { #include #include - #include "mnode.h" int32_t mgmtInitStreamTables(); @@ -31,11 +30,7 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVg int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable); int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter); SStreamTableObj* mgmtGetStreamTable(char *tableId); -SSchema* mgmtGetStreamTableSchema(SStreamTableObj *pTable); - -int8_t * mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int32_t vnode); - - +int8_t * mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index 22c80e3eca..c631d1df61 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -38,8 +38,6 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char * int32_t mgmtAddSuperTableColumn(SSuperTableObj *pTable, SSchema schema[], int32_t ncols); int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *colName); -SSchema* mgmtGetSuperTableSchema(SSuperTableObj *pTable); - #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 406caad856..0b5b7636fd 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -37,8 +37,8 @@ int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); void mgmtCleanUpMeters(); -int32_t mgmtAddMeterIntoMetric(SSuperTableObj *pMetric, SChildTableObj *pTable); -int32_t mgmtRemoveMeterFromMetric(SSuperTableObj *pMetric, SChildTableObj *pTable); +void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable); +void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable); int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn); diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index c8c97f5c45..41596f3ddd 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -15,15 +15,6 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "mnode.h" -#include "mgmtAcct.h" -#include "mgmtGrant.h" -#include "mgmtUtil.h" -#include "mgmtDb.h" -#include "mgmtDnodeInt.h" -#include "mgmtVgroup.h" -#include "mgmtTable.h" #include "taosmsg.h" #include "tast.h" #include "textbuffer.h" @@ -33,24 +24,28 @@ #include "tsqlfunction.h" #include "ttime.h" #include "tstatus.h" - -#include "sdb.h" - -#include "mgmtChildTable.h" +#include "tutil.h" +#include "mnode.h" +#include "mgmtAcct.h" #include "mgmtChildTable.h" - - +#include "mgmtDb.h" +#include "mgmtDnodeInt.h" +#include "mgmtGrant.h" +#include "mgmtSuperTable.h" +#include "mgmtTable.h" +#include "mgmtUtil.h" +#include "mgmtVgroup.h" void *tsChildTableSdb; -void *(*mgmtChildTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); +void *(*mgmtChildTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize); -void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize); -void *mgmtChildTableActionUpdate(void *row, char *str, int size, int *ssize); -void *mgmtChildTableActionEncode(void *row, char *str, int size, int *ssize); -void *mgmtChildTableActionDecode(void *row, char *str, int size, int *ssize); -void *mgmtChildTableActionReset(void *row, char *str, int size, int *ssize); -void *mgmtChildTableActionDestroy(void *row, char *str, int size, int *ssize); +void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtChildTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtChildTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtChildTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtChildTableActionReset(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtChildTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); static void mgmtDestroyChildTable(SChildTableObj *pTable) { free(pTable); @@ -66,17 +61,17 @@ static void mgmtChildTableActionInit() { mgmtChildTableActionFp[SDB_TYPE_DESTROY] = mgmtChildTableActionDestroy; } -void *mgmtChildTableActionReset(void *row, char *str, int size, int *ssize) { +void *mgmtChildTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { return NULL; } -void *mgmtChildTableActionDestroy(void *row, char *str, int size, int *ssize) { +void *mgmtChildTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { SChildTableObj *pTable = (SChildTableObj *)row; mgmtDestroyChildTable(pTable); return NULL; } -void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize) { +void *mgmtChildTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { SChildTableObj *pTable = (SChildTableObj *) row; SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); @@ -97,30 +92,30 @@ void *mgmtChildTableActionInsert(void *row, char *str, int size, int *ssize) { return NULL; } - if (!sdbMaster) { - int sid = taosAllocateId(pVgroup->idPool); + int32_t sid = taosAllocateId(pVgroup->idPool); if (sid != pTable->sid) { mError("sid:%d is not matched from the master:%d", sid, pTable->sid); return NULL; } } - mgmtAddMeterIntoMetric(pTable->superTableId, pTable); + pTable->superTable = mgmtGetSuperTable(pTable->superTableId); + mgmtAddTableIntoSuperTable(pTable->superTable); pAcct->acctInfo.numOfTimeSeries += (pTable->superTable->numOfColumns - 1); - pVgroup->numOfMeters++; + pVgroup->numOfTables++; pDb->numOfTables++; - pVgroup->tableList[pTable->sid] = pTable; + pVgroup->tableList[pTable->sid] = (STableInfo *) pTable; - if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { + if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { mgmtMoveVgroupToTail(pDb, pVgroup); } return NULL; } -void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize) { +void *mgmtChildTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { SChildTableObj *pTable = (SChildTableObj *) row; if (pTable->vgId == 0) { return NULL; @@ -146,43 +141,43 @@ void *mgmtChildTableActionDelete(void *row, char *str, int size, int *ssize) { pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); pVgroup->tableList[pTable->sid] = NULL; - pVgroup->numOfMeters--; + pVgroup->numOfTables--; pDb->numOfTables--; taosFreeId(pVgroup->idPool, pTable->sid); - mgmtRemoveMeterFromMetric(pTable->superTable, pTable); + mgmtRemoveTableFromSuperTable(pTable->superTable); - if (pVgroup->numOfMeters > 0) { + if (pVgroup->numOfTables > 0) { mgmtMoveVgroupToHead(pDb, pVgroup); } return NULL; } -void *mgmtChildTableActionUpdate(void *row, char *str, int size, int *ssize) { +void *mgmtChildTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return mgmtChildTableActionReset(row, str, size, NULL); } -void *mgmtChildTableActionEncode(void *row, char *str, int size, int *ssize) { +void *mgmtChildTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { SChildTableObj *pTable = (SChildTableObj *) row; assert(row != NULL && str != NULL); - int tsize = pTable->updateEnd - (int8_t *) pTable; + int32_t tsize = pTable->updateEnd - (int8_t *) pTable; memcpy(str, pTable, tsize); return NULL; } -void *mgmtChildTableActionDecode(void *row, char *str, int size, int *ssize) { +void *mgmtChildTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { assert(str != NULL); SChildTableObj *pTable = (SChildTableObj *)malloc(sizeof(SChildTableObj)); if (pTable == NULL) { return NULL; } - memset(pTable, 0, sizeof(STabObj)); + memset(pTable, 0, sizeof(SChildTableObj)); - int tsize = pTable->updateEnd - (int8_t *)pTable; + int32_t tsize = pTable->updateEnd - (int8_t *)pTable; if (size < tsize) { mgmtDestroyChildTable(pTable); return NULL; @@ -192,7 +187,7 @@ void *mgmtChildTableActionDecode(void *row, char *str, int size, int *ssize) { return (void *)pTable; } -void *mgmtChildTableAction(char action, void *row, char *str, int size, int *ssize) { +void *mgmtChildTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { if (mgmtChildTableActionFp[(uint8_t)action] != NULL) { return (*(mgmtChildTableActionFp[(uint8_t)action]))(row, str, size, ssize); } @@ -200,15 +195,46 @@ void *mgmtChildTableAction(char action, void *row, char *str, int size, int *ssi } int32_t mgmtInitChildTables() { + void *pNode = NULL; + void *pLastNode = NULL; + SChildTableObj *pTable = NULL; + + mgmtChildTableActionInit(); + + tsChildTableSdb = sdbOpenTable(tsMaxTables, sizeof(SChildTableObj), + "ctables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtChildTableAction); + if (tsChildTableSdb == NULL) { + mError("failed to init child table data"); + return -1; + } + + pNode = NULL; + while (1) { + pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) { + break; + } + + SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); + if (pDb == NULL) { + mError("super table:%s, failed to get db, discard it", pTable->tableId); + sdbDeleteRow(tsChildTableSdb, pTable); + pNode = pLastNode; + continue; + } + } + + mgmtSetVgroupIdPool(); + + mTrace("child table is initialized"); return 0; } void mgmtCleanUpChildTables() { } -int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32_t vnode, int32_t tagDataLen, - int8_t *pTagData) { - SCreateTableMsg *pCreateTable = (SCreateTableMsg *) pMsg; +int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) { +// SCreateTableMsg *pCreateTable = (SCreateTableMsg *) pMsg; // memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); // memcpy(pCreateTable->superTableId, pTable->superTable->tableId, TSDB_TABLE_ID_LEN); // pCreateTable->vnode = htonl(vnode); @@ -235,59 +261,56 @@ int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, int8_t *pMsg, int32 // memcpy(pCreateTable->data + totalColsSize, pTagData, tagDataLen); // pCreateTable->tagDataLen = htonl(tagDataLen); - return pMsg; + return NULL; } int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { -// int numOfTables = sdbGetNumOfRows(tsChildTableSdb); -// if (numOfTables >= tsMaxTables) { -// mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables); -// return TSDB_CODE_TOO_MANY_TABLES; -// } -// -// char *pTagData = (char *) pCreate->schema; // it is a tag key -// SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); -// if (pSuperTable == NULL) { -// mError("table:%s, corresponding super table does not exist", pCreate->meterId); -// return TSDB_CODE_INVALID_TABLE; -// } -// -// SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1); -// if (pTable == NULL) { -// return TSDB_CODE_SERV_OUT_OF_MEMORY; -// } -// strcpy(pTable->tableId, pCreate->meterId); -// strcpy(pTable->superTableId, pSuperTable->tableId); -// pTable->createdTime = taosGetTimestampMs(); -// pTable->superTable = pSuperTable; -// pTable->vgId = pVgroup->vgId; -// pTable->sid = sid; -// pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + -// ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); -// -// SVariableMsg tags = {0}; -// tags.size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t) TSDB_TABLE_ID_LEN; -// tags.data = (char *) calloc(1, tags.size); -// if (tags.data == NULL) { -// free(pTable); -// mError("table:%s, corresponding super table schema is null", pCreate->meterId); -// return TSDB_CODE_INVALID_TABLE; -// } -// memcpy(tags.data, pTagData, tags.size); -// -// if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) { -// mError("table:%s, update sdb error", pCreate->meterId); -// return TSDB_CODE_SDB_ERROR; -// } -// -// mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1); -// -// mgmtSendCreateChildTableMsg(pTable, pVgroup, tags.size, tags.data); -// -// mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" -// PRIu64 -// " db:%s", -// pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); + int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb); + if (numOfTables >= tsMaxTables) { + mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables); + return TSDB_CODE_TOO_MANY_TABLES; + } + + char *pTagData = (char *) pCreate->schema; // it is a tag key + SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); + if (pSuperTable == NULL) { + mError("table:%s, corresponding super table does not exist", pCreate->meterId); + return TSDB_CODE_INVALID_TABLE; + } + + SChildTableObj *pTable = (SChildTableObj *) calloc(sizeof(SChildTableObj), 1); + if (pTable == NULL) { + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + strcpy(pTable->tableId, pCreate->meterId); + strcpy(pTable->superTableId, pSuperTable->tableId); + pTable->createdTime = taosGetTimestampMs(); + pTable->superTable = pSuperTable; + pTable->vgId = pVgroup->vgId; + pTable->sid = sid; + pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); + + int32_t size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t) TSDB_TABLE_ID_LEN; + SSchema * schema = (SSchema *) calloc(1, size); + if (schema == NULL) { + free(pTable); + mError("table:%s, corresponding super table schema is null", pCreate->meterId); + return TSDB_CODE_INVALID_TABLE; + } + memcpy(schema, pTagData + TSDB_TABLE_ID_LEN + 1, size); + + if (sdbInsertRow(tsChildTableSdb, pTable, 0) < 0) { + mError("table:%s, update sdb error", pCreate->meterId); + return TSDB_CODE_SDB_ERROR; + } + + mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1); + + mgmtSendCreateTableMsg(pTable, pVgroup); + + mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", + pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); return 0; } @@ -308,10 +331,10 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { } mgmtRestoreTimeSeries(pTable->superTable->numOfColumns - 1); - mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup); + mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup); sdbDeleteRow(tsChildTableSdb, pTable); - if (pVgroup->numOfMeters <= 0) { + if (pVgroup->numOfTables <= 0) { mgmtDropVgroup(pDb, pVgroup); } @@ -323,7 +346,7 @@ SChildTableObj* mgmtGetChildTable(char *tableId) { } int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) { -// int col = mgmtFindSuperTableTagIndex(pTable->superTable, tagName); +// int32_t col = mgmtFindSuperTableTagIndex(pTable->superTable, tagName); // if (col < 0 || col > pTable->superTable->numOfTags) { // return TSDB_CODE_APP_ERROR; // } @@ -332,7 +355,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName // mTrace("Succeed to modify tag column %d of table %s", col, pTable->tableId); // return TSDB_CODE_SUCCESS; -// int rowSize = 0; +// int32_t rowSize = 0; // SSchema *schema = (SSchema *)(pSuperTable->schema + (pSuperTable->numOfColumns + col) * sizeof(SSchema)); // // if (col == 0) { @@ -345,7 +368,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName // } // // // Encode the string -// int size = sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW + 1; +// int32_t size = sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW + 1; // char *msg = (char *)malloc(size); // if (msg == NULL) { // mError("failed to allocate message memory while modify tag value"); @@ -367,5 +390,6 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName // // mTrace("Succeed to modify tag column %d of table %s", col, pTable->meterId); // return TSDB_CODE_SUCCESS; + return 0; } diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 6ad5eba38f..51cd120c36 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -95,11 +95,11 @@ int32_t mgmtProcessMeterCfgMsg(int8_t *pCont, int32_t contLen, void *pConn) { int8_t *pCreateTableMsg = NULL; if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { - pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable, vnode); + pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable); } else if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) { - pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable, vnode); + pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable); } else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) { - pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable, vnode); + pCreateTableMsg = mgmtBuildCreateNormalTableMsg((SNormalTableObj *)pTable); } else {} if (pCreateTableMsg != NULL) { @@ -210,7 +210,7 @@ void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, vo } } -int32_t mgmtSendCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup, int32_t tagDataLen, int8_t *pTagData) { +int32_t mgmtSendCreateTableMsg(SChildTableObj *pTable, SVgObj *pVgroup) { // uint64_t timeStamp = taosGetTimestampMs(); // // for (int32_t index = 0; index < pVgroup->numOfVnodes; ++index) { @@ -280,9 +280,10 @@ int32_t mgmtSendCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgroup) { // // pVgroup->lastCreate = timeStamp; // return 0; + return 0; } -int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) { +int mgmtSendRemoveMeterMsgToDnode(STableInfo *pTable, SVgObj *pVgroup) { // SDRemoveTableMsg *pRemove; // char * pMsg, *pStart; // int i, msgLen = 0; diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 6e75bb6730..396262b527 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -15,15 +15,6 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "mnode.h" -#include "mgmtAcct.h" -#include "mgmtGrant.h" -#include "mgmtUtil.h" -#include "mgmtDb.h" -#include "mgmtDnodeInt.h" -#include "mgmtVgroup.h" -#include "mgmtTable.h" #include "taosmsg.h" #include "tast.h" #include "textbuffer.h" @@ -33,22 +24,28 @@ #include "tsqlfunction.h" #include "ttime.h" #include "tstatus.h" - - -#include "sdb.h" +#include "tutil.h" +#include "mnode.h" +#include "mgmtAcct.h" +#include "mgmtDb.h" +#include "mgmtDnodeInt.h" +#include "mgmtGrant.h" #include "mgmtNormalTable.h" - +#include "mgmtSuperTable.h" +#include "mgmtTable.h" +#include "mgmtUtil.h" +#include "mgmtVgroup.h" void *tsNormalTableSdb; -void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); +void *(*mgmtNormalTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize); -void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize); -void *mgmtNormalTableActionUpdate(void *row, char *str, int size, int *ssize); -void *mgmtNormalTableActionEncode(void *row, char *str, int size, int *ssize); -void *mgmtNormalTableActionDecode(void *row, char *str, int size, int *ssize); -void *mgmtNormalTableActionReset(void *row, char *str, int size, int *ssize); -void *mgmtNormalTableActionDestroy(void *row, char *str, int size, int *ssize); +void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtNormalTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtNormalTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtNormalTableActionReset(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtNormalTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); static void mgmtDestroyNormalTable(SNormalTableObj *pTable) { free(pTable->schema); @@ -65,20 +62,20 @@ static void mgmtNormalTableActionInit() { mgmtNormalTableActionFp[SDB_TYPE_DESTROY] = mgmtNormalTableActionDestroy; } -void *mgmtNormalTableActionReset(void *row, char *str, int size, int *ssize) { +void *mgmtNormalTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { SNormalTableObj *pTable = (SNormalTableObj *) row; - int tsize = pTable->updateEnd - (int8_t *) pTable; + int32_t tsize = pTable->updateEnd - (int8_t *) pTable; memcpy(pTable, str, tsize); return NULL; } -void *mgmtNormalTableActionDestroy(void *row, char *str, int size, int *ssize) { +void *mgmtNormalTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { SNormalTableObj *pTable = (SNormalTableObj *)row; mgmtDestroyNormalTable(pTable); return NULL; } -void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize) { +void *mgmtNormalTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { SNormalTableObj *pTable = (SNormalTableObj *) row; SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); @@ -100,7 +97,7 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize) { } if (!sdbMaster) { - int sid = taosAllocateId(pVgroup->idPool); + int32_t sid = taosAllocateId(pVgroup->idPool); if (sid != pTable->sid) { mError("sid:%d is not matched from the master:%d", sid, pTable->sid); return NULL; @@ -108,18 +105,18 @@ void *mgmtNormalTableActionInsert(void *row, char *str, int size, int *ssize) { } pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); - pVgroup->numOfMeters++; + pVgroup->numOfTables++; pDb->numOfTables++; - pVgroup->tableList[pTable->sid] = pTable; + pVgroup->tableList[pTable->sid] = (STableInfo *) pTable; - if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { + if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { mgmtMoveVgroupToTail(pDb, pVgroup); } return NULL; } -void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize) { +void *mgmtNormalTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { SNormalTableObj *pTable = (SNormalTableObj *) row; if (pTable->vgId == 0) { return NULL; @@ -145,65 +142,67 @@ void *mgmtNormalTableActionDelete(void *row, char *str, int size, int *ssize) { pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); pVgroup->tableList[pTable->sid] = NULL; - pVgroup->numOfMeters--; + pVgroup->numOfTables--; pDb->numOfTables--; taosFreeId(pVgroup->idPool, pTable->sid); - if (pVgroup->numOfMeters > 0) { + if (pVgroup->numOfTables > 0) { mgmtMoveVgroupToHead(pDb, pVgroup); } return NULL; } -void *mgmtNormalTableActionUpdate(void *row, char *str, int size, int *ssize) { +void *mgmtNormalTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return mgmtNormalTableActionReset(row, str, size, NULL); } -void *mgmtNormalTableActionEncode(void *row, char *str, int size, int *ssize) { +void *mgmtNormalTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { SNormalTableObj *pTable = (SNormalTableObj *) row; assert(row != NULL && str != NULL); - int tsize = pTable->updateEnd - (int8_t *) pTable; - if (size < tsize + pTable->schemaSize + 1) { + int32_t tsize = pTable->updateEnd - (int8_t *) pTable; + int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); + if (size < tsize + schemaSize + 1) { *ssize = -1; return NULL; } memcpy(str, pTable, tsize); - memcpy(str + tsize, pTable->schema, pTable->schemaSize); - *ssize = tsize + pTable->schemaSize; + memcpy(str + tsize, pTable->schema, schemaSize); + *ssize = tsize + schemaSize; return NULL; } -void *mgmtNormalTableActionDecode(void *row, char *str, int size, int *ssize) { +void *mgmtNormalTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { assert(str != NULL); SNormalTableObj *pTable = (SNormalTableObj *)malloc(sizeof(SNormalTableObj)); if (pTable == NULL) { return NULL; } - memset(pTable, 0, sizeof(STabObj)); + memset(pTable, 0, sizeof(SNormalTableObj)); - int tsize = pTable->updateEnd - (int8_t *)pTable; + int32_t tsize = pTable->updateEnd - (int8_t *)pTable; if (size < tsize) { mgmtDestroyNormalTable(pTable); return NULL; } memcpy(pTable, str, tsize); - pTable->schema = (char *)malloc(pTable->schemaSize); + int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); + pTable->schema = (SSchema *)malloc(schemaSize); if (pTable->schema == NULL) { mgmtDestroyNormalTable(pTable); return NULL; } - memcpy(pTable->schema, str + tsize, pTable->schemaSize); + memcpy(pTable->schema, str + tsize, schemaSize); return (void *)pTable; } -void *mgmtNormalTableAction(char action, void *row, char *str, int size, int *ssize) { +void *mgmtNormalTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { if (mgmtNormalTableActionFp[(uint8_t)action] != NULL) { return (*(mgmtNormalTableActionFp[(uint8_t)action]))(row, str, size, ssize); } @@ -211,6 +210,38 @@ void *mgmtNormalTableAction(char action, void *row, char *str, int size, int *ss } int32_t mgmtInitNormalTables() { + void *pNode = NULL; + void *pLastNode = NULL; + SChildTableObj *pTable = NULL; + + mgmtNormalTableActionInit(); + + tsNormalTableSdb = sdbOpenTable(tsMaxTables, sizeof(SNormalTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, + "ntables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtNormalTableAction); + if (tsNormalTableSdb == NULL) { + mError("failed to init normal table data"); + return -1; + } + + pNode = NULL; + while (1) { + pNode = sdbFetchRow(tsNormalTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) { + break; + } + + SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); + if (pDb == NULL) { + mError("normal table:%s, failed to get db, discard it", pTable->tableId); + sdbDeleteRow(tsNormalTableSdb, pTable); + pNode = pLastNode; + continue; + } + } + + mgmtSetVgroupIdPool(); + + mTrace("normal table is initialized"); return 0; } @@ -218,19 +249,19 @@ void mgmtCleanUpNormalTables() { sdbCloseTable(tsNormalTableSdb); } -int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int32_t vnode) { - int8_t *pMsg = NULL; - SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg; - memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); - pCreateTable->vnode = htobe32(vnode); - pCreateTable->sid = htobe32(pTable->sid); - pCreateTable->uid = htobe64(pTable->uid); - pCreateTable->createdTime = htobe64(pTable->createdTime); - pCreateTable->sversion = htobe32(pTable->sversion); - pCreateTable->numOfColumns = htobe16(pTable->numOfColumns); - - SSchema *pSchema = pTable->schema; - int32_t totalCols = pCreateTable->numOfColumns; +int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable) { +// int8_t *pMsg = NULL; +// SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg; +// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); +// pCreateTable->vnode = htobe32(vnode); +// pCreateTable->sid = htobe32(pTable->sid); +// pCreateTable->uid = htobe64(pTable->uid); +// pCreateTable->createdTime = htobe64(pTable->createdTime); +// pCreateTable->sversion = htobe32(pTable->sversion); +// pCreateTable->numOfColumns = htobe16(pTable->numOfColumns); +// +// SSchema *pSchema = pTable->schema; +// int32_t totalCols = pCreateTable->numOfColumns; // for (int32_t col = 0; col < totalCols; ++col) { // SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; @@ -242,11 +273,12 @@ int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, int32_t vnode) { // int32_t totalColsSize = sizeof(SMColumn *) * totalCols; // pMsg = pCreateTable->data + totalColsSize; - return pMsg; +// return pMsg; + return NULL; } int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { - int numOfTables = sdbGetNumOfRows(tsChildTableSdb); + int32_t numOfTables = sdbGetNumOfRows(tsChildTableSdb); if (numOfTables >= TSDB_MAX_TABLES) { mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES); return TSDB_CODE_TOO_MANY_TABLES; @@ -265,9 +297,9 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg pTable->sversion = 0; pTable->numOfColumns = pCreate->numOfColumns; - int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; - pTable->schemaSize = numOfCols * sizeof(SSchema); - pTable->schema = (int8_t *) calloc(1, pTable->schemaSize); + int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags; + int32_t schemaSize = numOfCols * sizeof(SSchema); + pTable->schema = (SSchema *) calloc(1, schemaSize); if (pTable->schema == NULL) { free(pTable); mError("table:%s, no schema input", pCreate->meterId); @@ -276,7 +308,7 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); pTable->nextColId = 0; - for (int col = 0; col < pCreate->numOfColumns; col++) { + for (int32_t col = 0; col < pCreate->numOfColumns; col++) { SSchema *tschema = (SSchema *) pTable->schema; tschema[col].colId = pTable->nextColId++; } @@ -315,11 +347,11 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { mgmtRestoreTimeSeries(pTable->numOfColumns - 1); - mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup); + mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup); sdbDeleteRow(tsChildTableSdb, pTable); - if (pVgroup->numOfMeters <= 0) { + if (pVgroup->numOfTables <= 0) { mgmtDropVgroup(pDb, pVgroup); } @@ -341,12 +373,12 @@ static int32_t mgmtFindNormalTableColumnIndex(SNormalTableObj *pTable, char *col return -1; } -int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int ncols) { +int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols) { if (ncols <= 0) { return TSDB_CODE_APP_ERROR; } - for (int i = 0; i < ncols; i++) { + for (int32_t i = 0; i < ncols; i++) { if (mgmtFindNormalTableColumnIndex(pTable, schema[i].name) > 0) { return TSDB_CODE_APP_ERROR; } @@ -364,16 +396,16 @@ int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int return TSDB_CODE_APP_ERROR; } - pTable->schema = realloc(pTable->schema, pTable->schemaSize + sizeof(SSchema) * ncols); + int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); + pTable->schema = realloc(pTable->schema, schemaSize + sizeof(SSchema) * ncols); - memcpy(pTable->schema + pTable->schemaSize, schema, sizeof(SSchema) * ncols); + memcpy(pTable->schema + schemaSize, schema, sizeof(SSchema) * ncols); SSchema *tschema = (SSchema *) (pTable->schema + sizeof(SSchema) * pTable->numOfColumns); - for (int i = 0; i < ncols; i++) { + for (int32_t i = 0; i < ncols; i++) { tschema[i].colId = pTable->nextColId++; } - pTable->schemaSize += sizeof(SSchema) * ncols; pTable->numOfColumns += ncols; pTable->sversion++; pAcct->acctInfo.numOfTimeSeries += ncols; @@ -403,10 +435,7 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName) memmove(pTable->schema + sizeof(SSchema) * col, pTable->schema + sizeof(SSchema) * (col + 1), sizeof(SSchema) * (pTable->numOfColumns - col - 1)); - - pTable->schemaSize -= sizeof(SSchema); pTable->numOfColumns--; - pTable->schema = realloc(pTable->schema, pTable->schemaSize); pTable->sversion++; pAcct->acctInfo.numOfTimeSeries--; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 7fee8d62eb..5f69bfc4aa 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -45,10 +45,10 @@ static GetMateFp* mgmtGetMetaFp; static RetrieveMetaFp* mgmtRetrieveFp; static void mgmtInitShowMsgFp(); - void * tsShellConn = NULL; SConnObj *connList; -void * mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle); +void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code); +int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(char *, int, SConnObj *); void mgmtInitProcessShellMsg(); int mgmtRedirectMsg(SConnObj *pConn, int msgType); @@ -164,20 +164,21 @@ static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMe * @return */ bool mgmtCheckMeterMetaMsgType(char *pMsg) { - SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; - - int16_t autoCreate = htons(pInfo->createFlag); - STableInfo *table = mgmtGetTable(pInfo->meterId); +// SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; +// +// int16_t autoCreate = htons(pInfo->createFlag); +// STableInfo *table = mgmtGetTable(pInfo->meterId); - // If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue +// If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue // bool addIntoTranQueue = (pMeterObj == NULL && autoCreate == 1); // if (addIntoTranQueue) { // mTrace("meter:%s auto created task added", pInfo->meterId); // } - bool addIntoTranQueue = true; +// bool addIntoTranQueue = true; - return addIntoTranQueue; +// return addIntoTranQueue; + return 0; } int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { @@ -1296,7 +1297,7 @@ void mgmtEstablishConn(SConnObj *pConn) { // mgmtAddConnIntoAcct(pConn); } -int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, uint8_t *secret, uint8_t *ckey) { +int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { SUserObj *pUser = NULL; *spi = 0; @@ -1421,7 +1422,7 @@ int mgmtProcessConnectMsg(char *pMsg, int msgLen, SConnObj *pConn) { return 0; } -void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { +void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code) { // SIntMsg * pMsg = (SIntMsg *)msg; // SConnObj *pConn = (SConnObj *)ahandle; // @@ -1506,7 +1507,6 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { // } // // return pConn; - return NULL; } void mgmtInitProcessShellMsg() { diff --git a/src/mnode/src/mgmtStreamTable.c b/src/mnode/src/mgmtStreamTable.c index e4c7a6ecfb..8786d53e7e 100644 --- a/src/mnode/src/mgmtStreamTable.c +++ b/src/mnode/src/mgmtStreamTable.c @@ -15,15 +15,6 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "mnode.h" -#include "mgmtAcct.h" -#include "mgmtGrant.h" -#include "mgmtUtil.h" -#include "mgmtDb.h" -#include "mgmtDnodeInt.h" -#include "mgmtVgroup.h" -#include "mgmtTable.h" #include "taosmsg.h" #include "tast.h" #include "textbuffer.h" @@ -33,26 +24,32 @@ #include "tsqlfunction.h" #include "ttime.h" #include "tstatus.h" - - -#include "sdb.h" +#include "tutil.h" +#include "mnode.h" +#include "mgmtAcct.h" +#include "mgmtDb.h" +#include "mgmtDnodeInt.h" +#include "mgmtGrant.h" #include "mgmtStreamTable.h" - +#include "mgmtSuperTable.h" +#include "mgmtTable.h" +#include "mgmtUtil.h" +#include "mgmtVgroup.h" void *tsStreamTableSdb; -void *(*mgmtStreamTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); +void *(*mgmtStreamTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize); -void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize); -void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize); -void *mgmtStreamTableActionUpdate(void *row, char *str, int size, int *ssize); -void *mgmtStreamTableActionEncode(void *row, char *str, int size, int *ssize); -void *mgmtStreamTableActionDecode(void *row, char *str, int size, int *ssize); -void *mgmtStreamTableActionReset(void *row, char *str, int size, int *ssize); -void *mgmtStreamTableActionDestroy(void *row, char *str, int size, int *ssize); +void *mgmtStreamTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtStreamTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtStreamTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtStreamTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtStreamTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtStreamTableActionReset(void *row, char *str, int32_t size, int32_t *ssize); +void *mgmtStreamTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize); static void mgmtDestroyStreamTable(SStreamTableObj *pTable) { free(pTable->schema); - free(pTable->pSql); + free(pTable->sql); free(pTable); } @@ -66,24 +63,27 @@ static void mgmtStreamTableActionInit() { mgmtStreamTableActionFp[SDB_TYPE_DESTROY] = mgmtStreamTableActionDestroy; } -void *mgmtStreamTableActionReset(void *row, char *str, int size, int *ssize) { +void *mgmtStreamTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) { SStreamTableObj *pTable = (SStreamTableObj *) row; - int tsize = pTable->updateEnd - (int8_t *) pTable; + int32_t tsize = pTable->updateEnd - (int8_t *) pTable; memcpy(pTable, str, tsize); - pTable->schema = (char *) realloc(pTable->schema, pTable->schemaSize); - memcpy(pTable->schema, str + tsize, pTable->schemaSize); - pTable->pSql = (char *) realloc(pTable->pSql, pTable->sqlLen); - memcpy(pTable->pSql, str + tsize + pTable->schemaSize, pTable->sqlLen); + + int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); + pTable->schema = (SSchema *) realloc(pTable->schema, schemaSize); + memcpy(pTable->schema, str + tsize, schemaSize); + + pTable->sql = (char *) realloc(pTable->sql, pTable->sqlLen); + memcpy(pTable->sql, str + tsize + schemaSize, pTable->sqlLen); return NULL; } -void *mgmtStreamTableActionDestroy(void *row, char *str, int size, int *ssize) { +void *mgmtStreamTableActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) { SStreamTableObj *pTable = (SStreamTableObj *)row; mgmtDestroyStreamTable(pTable); return NULL; } -void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize) { +void *mgmtStreamTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize) { SNormalTableObj *pTable = (SNormalTableObj *) row; SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); @@ -105,7 +105,7 @@ void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize) { } if (!sdbMaster) { - int sid = taosAllocateId(pVgroup->idPool); + int32_t sid = taosAllocateId(pVgroup->idPool); if (sid != pTable->sid) { mError("sid:%d is not matched from the master:%d", sid, pTable->sid); return NULL; @@ -113,18 +113,18 @@ void *mgmtStreamTableActionInsert(void *row, char *str, int size, int *ssize) { } pAcct->acctInfo.numOfTimeSeries += (pTable->numOfColumns - 1); - pVgroup->numOfMeters++; + pVgroup->numOfTables++; pDb->numOfTables++; - pVgroup->tableList[pTable->sid] = pTable; + pVgroup->tableList[pTable->sid] = (STableInfo *) pTable; - if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { + if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1 && pDb->numOfVgroups > 1) { mgmtMoveVgroupToTail(pDb, pVgroup); } return NULL; } -void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize) { +void *mgmtStreamTableActionDelete(void *row, char *str, int32_t size, int32_t *ssize) { SNormalTableObj *pTable = (SNormalTableObj *) row; if (pTable->vgId == 0) { return NULL; @@ -150,40 +150,41 @@ void *mgmtStreamTableActionDelete(void *row, char *str, int size, int *ssize) { pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); pVgroup->tableList[pTable->sid] = NULL; - pVgroup->numOfMeters--; + pVgroup->numOfTables--; pDb->numOfTables--; taosFreeId(pVgroup->idPool, pTable->sid); - if (pVgroup->numOfMeters > 0) { + if (pVgroup->numOfTables > 0) { mgmtMoveVgroupToHead(pDb, pVgroup); } return NULL; } -void *mgmtStreamTableActionUpdate(void *row, char *str, int size, int *ssize) { +void *mgmtStreamTableActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) { return mgmtStreamTableActionReset(row, str, size, NULL); } -void *mgmtStreamTableActionEncode(void *row, char *str, int size, int *ssize) { +void *mgmtStreamTableActionEncode(void *row, char *str, int32_t size, int32_t *ssize) { SStreamTableObj *pTable = (SStreamTableObj *) row; assert(row != NULL && str != NULL); - int tsize = pTable->updateEnd - (int8_t *) pTable; - if (size < tsize + pTable->schemaSize + pTable->sqlLen + 1) { + int32_t tsize = pTable->updateEnd - (int8_t *) pTable; + int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); + if (size < tsize + schemaSize + pTable->sqlLen + 1) { *ssize = -1; return NULL; } memcpy(str, pTable, tsize); - memcpy(str + tsize, pTable->schema, pTable->schemaSize); - memcpy(str + tsize + pTable->schemaSize, pTable->pSql, pTable->sqlLen); - *ssize = tsize + pTable->schemaSize + pTable->sqlLen; + memcpy(str + tsize, pTable->schema, schemaSize); + memcpy(str + tsize + schemaSize, pTable->sql, pTable->sqlLen); + *ssize = tsize + schemaSize + pTable->sqlLen; return NULL; } -void *mgmtStreamTableActionDecode(void *row, char *str, int size, int *ssize) { +void *mgmtStreamTableActionDecode(void *row, char *str, int32_t size, int32_t *ssize) { assert(str != NULL); SStreamTableObj *pTable = (SStreamTableObj *)malloc(sizeof(SNormalTableObj)); @@ -192,30 +193,31 @@ void *mgmtStreamTableActionDecode(void *row, char *str, int size, int *ssize) { } memset(pTable, 0, sizeof(STabObj)); - int tsize = pTable->updateEnd - (int8_t *)pTable; + int32_t tsize = pTable->updateEnd - (int8_t *)pTable; if (size < tsize) { mgmtDestroyStreamTable(pTable); return NULL; } memcpy(pTable, str, tsize); - pTable->schema = (char *)malloc(pTable->schemaSize); + int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); + pTable->schema = (SSchema *)malloc(schemaSize); if (pTable->schema == NULL) { mgmtDestroyStreamTable(pTable); return NULL; } - memcpy(pTable->schema, str + tsize, pTable->schemaSize); + memcpy(pTable->schema, str + tsize, schemaSize); - pTable->pSql = (char *)malloc(pTable->sqlLen); - if (pTable->pSql == NULL) { + pTable->sql = (char *)malloc(pTable->sqlLen); + if (pTable->sql == NULL) { mgmtDestroyStreamTable(pTable); return NULL; } - memcpy(pTable->pSql, str + tsize + pTable->schemaSize, pTable->sqlLen); + memcpy(pTable->sql, str + tsize + schemaSize, pTable->sqlLen); return (void *)pTable; } -void *mgmtStreamTableAction(char action, void *row, char *str, int size, int *ssize) { +void *mgmtStreamTableAction(char action, void *row, char *str, int32_t size, int32_t *ssize) { if (mgmtStreamTableActionFp[(uint8_t)action] != NULL) { return (*(mgmtStreamTableActionFp[(uint8_t)action]))(row, str, size, ssize); } @@ -223,25 +225,57 @@ void *mgmtStreamTableAction(char action, void *row, char *str, int size, int *ss } int32_t mgmtInitStreamTables() { + void *pNode = NULL; + void *pLastNode = NULL; + SChildTableObj *pTable = NULL; + + mgmtStreamTableActionInit(); + + tsStreamTableSdb = sdbOpenTable(tsMaxTables, sizeof(SStreamTableObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN, + "streams", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtStreamTableAction); + if (tsStreamTableSdb == NULL) { + mError("failed to init stream table data"); + return -1; + } + + pNode = NULL; + while (1) { + pNode = sdbFetchRow(tsStreamTableSdb, pNode, (void **)&pTable); + if (pTable == NULL) { + break; + } + + SDbObj *pDb = mgmtGetDbByTableId(pTable->tableId); + if (pDb == NULL) { + mError("stream table:%s, failed to get db, discard it", pTable->tableId); + sdbDeleteRow(tsStreamTableSdb, pTable); + pNode = pLastNode; + continue; + } + } + + mgmtSetVgroupIdPool(); + + mTrace("stream table is initialized"); return 0; } void mgmtCleanUpStreamTables() { } -int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int32_t vnode) { - SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg; - memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); - pCreateTable->vnode = htonl(vnode); - pCreateTable->sid = htonl(pTable->sid); - pCreateTable->uid = pTable->uid; - pCreateTable->createdTime = htobe64(pTable->createdTime); - pCreateTable->sversion = htonl(pTable->sversion); - pCreateTable->numOfColumns = htons(pTable->numOfColumns); - //pCreateTable->sqlLen = htons(pTable->sqlLen); - - SSchema *pSchema = pTable->schema; - int32_t totalCols = pCreateTable->numOfColumns; +int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup) { +// SDCreateTableMsg *pCreateTable = (SDCreateTableMsg *) pMsg; +// memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); +// pCreateTable->vnode = htonl(vnode); +// pCreateTable->sid = htonl(pTable->sid); +// pCreateTable->uid = pTable->uid; +// pCreateTable->createdTime = htobe64(pTable->createdTime); +// pCreateTable->sversion = htonl(pTable->sversion); +// pCreateTable->numOfColumns = htons(pTable->numOfColumns); +// //pCreateTable->sqlLen = htons(pTable->sqlLen); +// +// SSchema *pSchema = pTable->schema; +// int32_t totalCols = pCreateTable->numOfColumns; // for (int32_t col = 0; col < totalCols; ++col) { // SMColumn *colData = &((SMColumn *) (pCreateTable->data))[col]; @@ -256,11 +290,12 @@ int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, int8_t *pMsg, int // char *sql = pTable->schema + pTable->schemaSize; // memcpy(pCreateTable->data + totalColsSize, pTable->sqlLen, sql); - return pMsg; +// return pMsg; + return NULL; } int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid) { - int numOfTables = sdbGetNumOfRows(tsStreamTableSdb); + int32_t numOfTables = sdbGetNumOfRows(tsStreamTableSdb); if (numOfTables >= TSDB_MAX_TABLES) { mError("stream table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES); return TSDB_CODE_TOO_MANY_TABLES; @@ -279,9 +314,9 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg pTable->sversion = 0; pTable->numOfColumns = pCreate->numOfColumns; - int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; - pTable->schemaSize = numOfCols * sizeof(SSchema) + pCreate->sqlLen; - pTable->schema = (int8_t *) calloc(1, pTable->schemaSize); + int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags; + int32_t schemaSize = pTable->numOfColumns * sizeof(SSchema); + pTable->schema = (SSchema *) calloc(1, schemaSize); if (pTable->schema == NULL) { free(pTable); mError("table:%s, no schema input", pCreate->meterId); @@ -290,15 +325,15 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); pTable->nextColId = 0; - for (int col = 0; col < pCreate->numOfColumns; col++) { + for (int32_t col = 0; col < pCreate->numOfColumns; col++) { SSchema *tschema = (SSchema *) pTable->schema; tschema[col].colId = pTable->nextColId++; } - pTable->pSql = pTable->schema + numOfCols * sizeof(SSchema); - memcpy(pTable->pSql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen); - pTable->pSql[pCreate->sqlLen - 1] = 0; - mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->pSql); + pTable->sql = (char*)(pTable->schema + numOfCols * sizeof(SSchema)); + memcpy(pTable->sql, (char *) (pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen); + pTable->sql[pCreate->sqlLen - 1] = 0; + mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->sql); if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) { mError("table:%s, update sdb error", pCreate->meterId); @@ -334,11 +369,11 @@ int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) { mgmtRestoreTimeSeries(pTable->numOfColumns - 1); - mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup); + mgmtSendRemoveMeterMsgToDnode((STableInfo *) pTable, pVgroup); sdbDeleteRow(tsChildTableSdb, pTable); - if (pVgroup->numOfMeters <= 0) { + if (pVgroup->numOfTables <= 0) { mgmtDropVgroup(pDb, pVgroup); } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 80f344a3b8..379336e20a 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -152,8 +152,8 @@ int32_t mgmtInitSuperTables() { mgmtSuperTableActionInit(); - tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_MAX_SQL_LEN, - "stable", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction); + tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS, + "stables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction); if (tsSuperTableSdb == NULL) { mError("failed to init super table data"); return -1; @@ -585,14 +585,10 @@ int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, SConn return numOfRows; } -int32_t mgmtAddMeterIntoMetric(SSuperTableObj *pStable, SChildTableObj *pTable) { - if (pTable != NULL && pStable != NULL) return -1; +void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable) { pStable->numOfTables++; - return 0; } -int32_t mgmtRemoveMeterFromMetric(SSuperTableObj *pStable, SChildTableObj *pTable) { - if (pTable != NULL && pStable != NULL) return -1; +void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable) { pStable->numOfTables--; - return 0; } \ No newline at end of file diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 03774abe16..701a1d6696 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -209,7 +209,7 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) { int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { STableInfo *pTable; - if (pVgroup->numOfMeters > 0) { + if (pVgroup->numOfTables > 0) { for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) { if (pVgroup->tableList != NULL) { pTable = pVgroup->tableList[i]; @@ -235,11 +235,11 @@ void mgmtSetVgroupIdPool() { if (pVgroup == NULL || pVgroup->idPool == 0) break; taosIdPoolSetFreeList(pVgroup->idPool); - pVgroup->numOfMeters = taosIdPoolNumOfUsed(pVgroup->idPool); + pVgroup->numOfTables = taosIdPoolNumOfUsed(pVgroup->idPool); pDb = mgmtGetDb(pVgroup->dbName); - pDb->numOfTables += pVgroup->numOfMeters; - if (pVgroup->numOfMeters >= pDb->cfg.maxSessions - 1) + pDb->numOfTables += pVgroup->numOfTables; + if (pVgroup->numOfTables >= pDb->cfg.maxSessions - 1) mgmtAddVgroupIntoDbTail(pDb, pVgroup); else mgmtAddVgroupIntoDb(pDb, pVgroup); @@ -374,7 +374,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, SConnObj cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pVgroup->numOfMeters; + *(int32_t *)pWrite = pVgroup->numOfTables; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -422,7 +422,7 @@ void *mgmtVgroupActionInsert(void *row, char *str, int32_t size, int32_t *ssize) int32_t tsize = sizeof(STableInfo *) * pDb->cfg.maxSessions; pVgroup->tableList = (STableInfo **)malloc(tsize); memset(pVgroup->tableList, 0, tsize); - pVgroup->numOfMeters = 0; + pVgroup->numOfTables = 0; pVgroup->idPool = taosInitIdPool(pDb->cfg.maxSessions); mgmtAddVgroupIntoDb(pDb, pVgroup); mgmtSetDnodeVgid(pVgroup->vnodeGid, pVgroup->numOfVnodes, pVgroup->vgId); -- GitLab