diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 7d3bcd9adb6dac45aa2de724f1fe41c385375a5f..0668c0a57ff0e02a6da368291e8c8b1e021c9b49 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -120,7 +120,7 @@ typedef struct SCond { } SCond; typedef struct SJoinNode { - char meterId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN]; uint64_t uid; int16_t tagCol; } SJoinNode; @@ -155,7 +155,7 @@ typedef struct SParamInfo { } SParamInfo; typedef struct STableDataBlocks { - char meterId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN]; int8_t tsSource; // where does the UNIX timestamp come from, server or client bool ordered; // if current rows are ordered or not int64_t vgid; // virtual group id @@ -384,7 +384,7 @@ int tscProcessSql(SSqlObj *pSql); void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows); -int tscRenewMeterMeta(SSqlObj *pSql, char *meterId); +int tscRenewMeterMeta(SSqlObj *pSql, char *tableId); void tscQueueAsyncRes(SSqlObj *pSql); void tscQueueAsyncError(void(*fp), void *param); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 8039fbe405feada1552993cc2c1d78364ec2b2ab..48cfab638c17cf11a0569c57f8fed936c2ea06a3 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1544,7 +1544,7 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) { continue; } - strncpy(pMeterMetaInfo->name, pDataBlock->meterId, TSDB_TABLE_ID_LEN); + strncpy(pMeterMetaInfo->name, pDataBlock->tableId, TSDB_TABLE_ID_LEN); memset(pDataBlock->pData, 0, pDataBlock->nAllocSize); int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 57978d3c474ec20f9ff3d2893f56d81e5cdee256..88ad65ac2a28743a73a9489aa9161f461e9334f1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2867,7 +2867,7 @@ static int32_t getJoinCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) { pLeft->uid = pMeterMetaInfo->pMeterMeta->uid; pLeft->tagCol = tagColIndex; - strcpy(pLeft->meterId, pMeterMetaInfo->name); + strcpy(pLeft->tableId, pMeterMetaInfo->name); index = (SColumnIndex)COLUMN_INDEX_INITIALIZER; if (getColumnIndexByName(&pExpr->pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { @@ -2879,7 +2879,7 @@ static int32_t getJoinCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) { pRight->uid = pMeterMetaInfo->pMeterMeta->uid; pRight->tagCol = tagColIndex; - strcpy(pRight->meterId, pMeterMetaInfo->name); + strcpy(pRight->tableId, pMeterMetaInfo->name); pTagCond->joinInfo.hasJoin = true; return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 4c3739e4bb79340a61ab38e194da7a2342c30113..7043fc560afc46923656c67ac1020b3770beaffd 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1838,7 +1838,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pDropTableMsg = (SDropTableMsg *)pMsg; - strcpy(pDropTableMsg->meterId, pMeterMetaInfo->name); + strcpy(pDropTableMsg->tableId, pMeterMetaInfo->name); pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0; pMsg += sizeof(SDropTableMsg); @@ -2353,7 +2353,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - SMeterInfoMsg *pInfoMsg; + STableInfoMsg *pInfoMsg; char * pMsg, *pStart; int msgLen = 0; @@ -2381,10 +2381,10 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(SMgmtHead); - pInfoMsg = (SMeterInfoMsg *)pMsg; - strcpy(pInfoMsg->meterId, pMeterMetaInfo->name); + pInfoMsg = (STableInfoMsg *)pMsg; + strcpy(pInfoMsg->tableId, pMeterMetaInfo->name); pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0); - pMsg += sizeof(SMeterInfoMsg); + pMsg += sizeof(STableInfoMsg); if (pSql->cmd.createOnDemand) { memcpy(pInfoMsg->tags, tmpData, sizeof(STagData)); @@ -2403,7 +2403,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { /** * multi meter meta req pkg format: - * | SMgmtHead | SMultiMeterInfoMsg | meterId0 | meterId1 | meterId2 | ...... + * | SMgmtHead | SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ...... * no used 4B **/ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { @@ -2421,16 +2421,16 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize); memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN); // server don't need the db - SMultiMeterInfoMsg *pInfoMsg = (SMultiMeterInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead)); - pInfoMsg->numOfMeters = htonl((int32_t)pCmd->count); + SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead)); + pInfoMsg->numOfTables = htonl((int32_t)pCmd->count); if (pCmd->payloadLen > 0) { - memcpy(pInfoMsg->meterId, tmpData, pCmd->payloadLen); + memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen); } tfree(tmpData); - pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiMeterInfoMsg); + pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg); pCmd->msgType = TSDB_MSG_TYPE_MULTI_TABLE_META; assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize); @@ -2502,13 +2502,13 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // todo refactor pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2); - memcpy(pMsg, pTagCond->joinInfo.left.meterId, TSDB_TABLE_ID_LEN); + memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN); pMsg += TSDB_TABLE_ID_LEN; *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol; pMsg += sizeof(int16_t); - memcpy(pMsg, pTagCond->joinInfo.right.meterId, TSDB_TABLE_ID_LEN); + memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN); pMsg += TSDB_TABLE_ID_LEN; *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol; @@ -2590,7 +2590,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } - strcpy(pElem->meterId, pMeterMetaInfo->name); + strcpy(pElem->tableId, pMeterMetaInfo->name); pElem->numOfTags = htons(pMeterMetaInfo->numOfTags); int16_t len = pMsg - (char *)pElem; @@ -2751,7 +2751,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) { /** * multi meter meta rsp pkg format: - * | STaosRsp | ieType | SMultiMeterInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2 + * | STaosRsp | ieType | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2 * |...... 1B 1B 4B **/ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { @@ -2772,13 +2772,13 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { rsp++; - SMultiMeterInfoMsg *pInfo = (SMultiMeterInfoMsg *)rsp; - totalNum = htonl(pInfo->numOfMeters); - rsp += sizeof(SMultiMeterInfoMsg); + SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp; + totalNum = htonl(pInfo->numOfTables); + rsp += sizeof(SMultiTableInfoMsg); for (i = 0; i < totalNum; i++) { SMultiMeterMeta *pMultiMeta = (SMultiMeterMeta *)rsp; - SMeterMeta * pMeta = &pMultiMeta->meta; + SMeterMeta * pMeta = &pMultiMeta->metas; pMeta->sid = htonl(pMeta->sid); pMeta->sversion = htons(pMeta->sversion); @@ -2850,7 +2850,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with SMeterMeta in cache pMeta->index = 0; - (void)taosAddDataIntoCache(tscCacheHandle, pMultiMeta->meterId, (char *)pMeta, size, tsMeterMetaKeepTimer); + (void)taosAddDataIntoCache(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer); } pSql->res.code = TSDB_CODE_SUCCESS; @@ -3312,10 +3312,10 @@ static void tscWaitingForCreateTable(SSqlCmd *pCmd) { /** * in renew metermeta, do not retrieve metadata in cache. * @param pSql sql object - * @param meterId meter id + * @param tableId meter id * @return status code */ -int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) { +int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) { int code = 0; // handle metric meta renew process diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4f747053eedfc497c2d91513aee6a0ff6b479049..e81ed463d97faed7fba98811b1f746d0249ba187 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -56,7 +56,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) { char join[512] = {0}; if (pTagCond->joinInfo.hasJoin) { - sprintf(join, "%s,%s", pTagCond->joinInfo.left.meterId, pTagCond->joinInfo.right.meterId); + sprintf(join, "%s,%s", pTagCond->joinInfo.left.tableId, pTagCond->joinInfo.right.tableId); } // estimate the buffer size @@ -156,13 +156,13 @@ bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd) { return false; } -void tscGetDBInfoFromMeterId(char* meterId, char* db) { - char* st = strstr(meterId, TS_PATH_DELIMITER); +void tscGetDBInfoFromMeterId(char* tableId, char* db) { + char* st = strstr(tableId, TS_PATH_DELIMITER); if (st != NULL) { char* end = strstr(st + 1, TS_PATH_DELIMITER); if (end != NULL) { - memcpy(db, meterId, (end - meterId)); - db[end - meterId] = 0; + memcpy(db, tableId, (end - tableId)); + db[end - tableId] = 0; return; } } @@ -590,12 +590,12 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { // set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) { - strcpy(pMeterMetaInfo->name, pDataBlock->meterId); + strcpy(pMeterMetaInfo->name, pDataBlock->tableId); taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false); pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pDataBlock->pMeterMeta); } else { - assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0); + assert(strncmp(pMeterMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0); } /* @@ -660,7 +660,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff dataBuf->size = startOffset; dataBuf->tsSource = -1; - strncpy(dataBuf->meterId, name, TSDB_TABLE_ID_LEN); + strncpy(dataBuf->tableId, name, TSDB_TABLE_ID_LEN); /* * The metermeta may be released since the metermeta cache are completed clean by other thread @@ -709,7 +709,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi STableDataBlocks* dataBuf = NULL; int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE, - tsInsertHeadSize, 0, pOneTableBlock->meterId, pOneTableBlock->pMeterMeta, &dataBuf); + tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pMeterMeta, &dataBuf); if (ret != TSDB_CODE_SUCCESS) { tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret); taosCleanUpHashTable(pVnodeDataBlockHashList); @@ -743,7 +743,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi char* e = (char*)pBlocks->payLoad + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); - tscTrace("%p meterId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->meterId, pBlocks->sid, + tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, pBlocks->sid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->payLoad), GET_INT64_VAL(e)); pBlocks->sid = htonl(pBlocks->sid); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 9a65ef2705db25d52e8f38116e9be4f063cd8953..5034a0080c58104a718baf11a6ae1129bebfc2d0 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -277,11 +277,11 @@ typedef struct { int16_t numOfColumns; int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string int16_t reserved[16]; - SSchema schema[]; + SSchema schema[]; } SCreateTableMsg; typedef struct { - char meterId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN]; char db[TSDB_DB_NAME_LEN]; int8_t igNotExists; } SDropTableMsg; @@ -348,7 +348,7 @@ typedef struct { short vnode; int32_t sid; uint64_t uid; - char meterId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN]; } SDRemoveTableMsg; typedef struct { @@ -615,7 +615,7 @@ typedef struct { typedef struct { uint32_t destId; uint32_t destIp; - char meterId[TSDB_UNI_LEN]; + char tableId[TSDB_UNI_LEN]; char empty[3]; uint8_t msgType; int32_t msgLen; @@ -647,20 +647,21 @@ typedef struct { } SVPeersMsg; typedef struct { - char meterId[TSDB_TABLE_ID_LEN]; - short createFlag; - char tags[]; -} SMeterInfoMsg; + char tableId[TSDB_TABLE_ID_LEN]; + char db[TSDB_DB_NAME_LEN]; + int16_t createFlag; + char tags[]; +} STableInfoMsg; typedef struct { - int32_t numOfMeters; - char meterId[]; -} SMultiMeterInfoMsg; + int32_t numOfTables; + char tableIds[]; +} SMultiTableInfoMsg; typedef struct { int16_t elemLen; - char meterId[TSDB_TABLE_ID_LEN]; + char tableId[TSDB_TABLE_ID_LEN]; int16_t orderIndex; int16_t orderType; // used in group by xx order by xxx @@ -701,26 +702,26 @@ typedef struct { } SMetricMeta; typedef struct SMeterMeta { + char tableId[TSDB_TABLE_ID_LEN]; // note: This field must be at the front + int32_t contLen; uint8_t numOfTags : 6; uint8_t precision : 2; uint8_t tableType : 4; uint8_t index : 4; // used locally - int16_t numOfColumns; - int16_t rowSize; // used locally, calculated in client int16_t sversion; - SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; - int32_t sid; int32_t vgid; uint64_t uid; + SSchema schema[]; } SMeterMeta; typedef struct SMultiMeterMeta { - char meterId[TSDB_TABLE_ID_LEN]; // note: This field must be at the front - SMeterMeta meta; + int32_t numOfTables; + int32_t contLen; + SMeterMeta metas[]; } SMultiMeterMeta; typedef struct { diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 0fa6355898f4aa1c3bf8aa7ed37d81162e94001a..a374897d6264027dc6baaad02d7d321d0e48e43d 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -26,14 +26,18 @@ extern "C" { #include "mnode.h" -int32_t mgmtInitChildTables(); -void mgmtCleanUpChildTables(); -int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); -int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); -int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter); -int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); -SChildTableObj* mgmtGetChildTable(char *tableId); -int8_t * mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup); +int32_t mgmtInitChildTables(); +void mgmtCleanUpChildTables(); + +void * mgmtGetChildTable(char *tableId); + +int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); +int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); +int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter); +int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); +int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup); + +int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index b45b6bbd4eb81fbe7313eeee2b527d3b0d42d050..b2ba6d941d2620c108315eda4048ead937f50785 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -23,15 +23,19 @@ extern "C" { #include #include #include "mnode.h" - -int32_t mgmtInitNormalTables(); -void mgmtCleanUpNormalTables(); -int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); -int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); -int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); -int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); -SNormalTableObj* mgmtGetNormalTable(char *tableId); -int8_t * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); + +int32_t mgmtInitNormalTables(); +void mgmtCleanUpNormalTables(); + +void * mgmtGetNormalTable(char *tableId); + +int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); +int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); +int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); +int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); +int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable); + +int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtStreamTable.h b/src/mnode/inc/mgmtStreamTable.h index e980f76742d16926dc792312134b0a7c929e8514..3eab6664ac714326159533f23147a1f08b2ef76d 100644 --- a/src/mnode/inc/mgmtStreamTable.h +++ b/src/mnode/inc/mgmtStreamTable.h @@ -24,13 +24,17 @@ extern "C" { #include #include "mnode.h" -int32_t mgmtInitStreamTables(); -void mgmtCleanUpStreamTables(); -int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); -int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable); -int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter); -SStreamTableObj* mgmtGetStreamTable(char *tableId); -int8_t * mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup); +int32_t mgmtInitStreamTables(); +void mgmtCleanUpStreamTables(); + +void * mgmtGetStreamTable(char *tableId); + +int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid); +int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable); +int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter); +int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup); + +int32_t mgmtGetStreamTableMeta(SDbObj *pDb, SStreamTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index 6b87ee38ac63333d3914def7dbe3370454bfed49..f97be288ebd122147ba05f2b4970e44a4b9c9062 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -26,18 +26,26 @@ extern "C" { #include "taosdef.h" #include "mnode.h" -int32_t mgmtInitSuperTables(); -void mgmtCleanUpSuperTables(); -int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate); -int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable); -SSuperTableObj* mgmtGetSuperTable(char *tableId); -int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName); -int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); -int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); -int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName); -int32_t mgmtAddSuperTableColumn(SSuperTableObj *pTable, SSchema schema[], int32_t ncols); -int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *colName); -int32_t mgmtGetTagsLength(SSuperTableObj* pSuperTable, int32_t col); +int32_t mgmtInitSuperTables(); +void mgmtCleanUpSuperTables(); + +void * mgmtGetSuperTable(char *tableId); +int32_t mgmtGetShowSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate); +int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable); +int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); +int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); +int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName); +int32_t mgmtAddSuperTableColumn(SSuperTableObj *pTable, SSchema schema[], int32_t ncols); +int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *colName); + +int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp); + +int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName); +int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); +int32_t mgmtGetTagsLength(SSuperTableObj *pSuperTable, int32_t col); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 9a3417318586fed7f54d7af0bfe8bc0d62f94381..ec4769ca9a662b276b3bd708d7fb32c1e20c8dc2 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -28,20 +28,18 @@ extern "C" { int32_t mgmtInitTables(); STableInfo* mgmtGetTable(char *tableId); STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid); +int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, SMeterMeta *pMeta, bool usePublicIp); int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo); int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate); int32_t mgmtDropTable(SDbObj *pDb, char *meterId, int32_t ignore); int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter); -int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn); -int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); +int32_t mgmtGetShowTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); void mgmtCleanUpMeters(); void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable); void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable); -int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn); -int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); - #ifdef __cplusplus diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 8ff968dbc0f67f1545a00b38651ca9fc6c6a6c1f..70c5a6a902f9de005f00c7bb3329e4750b3766dd 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -340,8 +340,8 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { return 0; } -SChildTableObj* mgmtGetChildTable(char *tableId) { - return (SChildTableObj *)sdbGetRow(tsChildTableSdb, tableId); +void* mgmtGetChildTable(char *tableId) { + return sdbGetRow(tsChildTableSdb, tableId); } int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) { @@ -392,3 +392,30 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName return 0; } +int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp) { + pMeta->uid = htobe64(pTable->uid); + pMeta->sid = htonl(pTable->sid); + pMeta->vgid = htonl(pTable->vgId); + pMeta->sversion = htons(pTable->superTable->sversion); + pMeta->precision = pDb->cfg.precision; + pMeta->numOfTags = pTable->superTable->numOfTags; + pMeta->numOfColumns = htons(pTable->superTable->numOfColumns); + pMeta->tableType = pTable->type; + pMeta->contLen = sizeof(SMeterMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable); + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + return TSDB_CODE_INVALID_TABLE; + } + for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { + if (usePublicIp) { + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } else { + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } + } + + return TSDB_CODE_SUCCESS; +} diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index 62809c039706c5865945ae98ab6ec16d1c451fd6..e1922e6faa86b28fb32213d2005f935be3717b3d 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -357,8 +357,8 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { return 0; } -SNormalTableObj* mgmtGetNormalTable(char *tableId) { - return (SNormalTableObj *)sdbGetRow(tsNormalTableSdb, tableId); +void* mgmtGetNormalTable(char *tableId) { + return sdbGetRow(tsNormalTableSdb, tableId); } static int32_t mgmtFindNormalTableColumnIndex(SNormalTableObj *pTable, char *colName) { @@ -442,3 +442,45 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName) return TSDB_CODE_SUCCESS; } + +static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SNormalTableObj *pTable) { + int32_t numOfCols = pTable->numOfColumns; + for (int32_t i = 0; i < numOfCols; ++i) { + strcpy(pSchema->name, pTable->schema[i].name); + pSchema->type = pTable->schema[i].type; + pSchema->bytes = htons(pTable->schema[i].bytes); + pSchema->colId = htons(pTable->schema[i].colId); + pSchema++; + } + + return numOfCols * sizeof(SSchema); +} + +int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp) { + pMeta->uid = htobe64(pTable->uid); + pMeta->sid = htonl(pTable->sid); + pMeta->vgid = htonl(pTable->vgId); + pMeta->sversion = htons(pTable->sversion); + pMeta->precision = pDb->cfg.precision; + pMeta->numOfTags = 0; + pMeta->numOfColumns = htons(pTable->numOfColumns); + pMeta->tableType = pTable->type; + pMeta->contLen = sizeof(SMeterMeta) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable); + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + return TSDB_CODE_INVALID_TABLE; + } + for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { + if (usePublicIp) { + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } else { + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } + } + + return TSDB_CODE_SUCCESS; +} + diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 3392621cf5b6a56db6349f6df96357f6d779db42..72b584a9d87083e447e9fe3e9dbbbc5e043473d4 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -24,17 +24,22 @@ #include "mnode.h" #include "mgmtAcct.h" #include "mgmtBalance.h" +#include "mgmtChildTable.h" #include "mgmtConn.h" #include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtGrant.h" #include "mgmtMnode.h" +#include "mgmtNormalTable.h" #include "mgmtProfile.h" #include "mgmtShell.h" +#include "mgmtStreamTable.h" +#include "mgmtSuperTable.h" #include "mgmtTable.h" #include "mgmtUser.h" #include "mgmtVgroup.h" + #define MAX_LEN_OF_METER_META (sizeof(SMultiMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN) typedef int32_t (*GetMateFp)(SMeterMeta *pMeta, SShowObj *pShow, void *pConn); @@ -113,407 +118,135 @@ void mgmtCleanUpShell() { } } -static void mgmtSetSchemaFromMeters(SSchema *pSchema, STabObj *pMeterObj, uint32_t numOfCols) { - SSchema *pMeterSchema = (SSchema *)(pMeterObj->schema); - for (int32_t i = 0; i < numOfCols; ++i) { - strcpy(pSchema->name, pMeterSchema[i].name); - pSchema->type = pMeterSchema[i].type; - pSchema->bytes = htons(pMeterSchema[i].bytes); - pSchema->colId = htons(pMeterSchema[i].colId); - pSchema++; +int32_t mgmtProcessMeterMetaMsg(void *pCont, int32_t contLen, void *ahandle) { + SRpcConnInfo connInfo; + rpcGetConnInfo(ahandle, &connInfo); + + bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); + SUserObj *pUser = mgmtGetUser(connInfo.user); + if (pUser == NULL) { + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return TSDB_CODE_INVALID_USER; + } + + STableInfoMsg *pInfo = pCont; + pInfo->createFlag = htons(pInfo->createFlag); + + SDbObj *pDb = mgmtGetDb(pInfo->db); + if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) { + rpcSendResponse(ahandle, TSDB_CODE_INVALID_DB, NULL, 0); + return TSDB_CODE_INVALID_DB; } -} -static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMeterObj) { - SSchema *pTagSchema = (SSchema *)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema)); + STableInfo *pTable = mgmtGetTable(pInfo->tableId); - char *tagVal = pMeterObj->pTagData + TSDB_TABLE_ID_LEN; // tag start position + // on demand create table from super table if meter does not exists + if (pTable == NULL && pInfo->createFlag == 1) { + // write operation needs to redirect to master mnode + if (mgmtCheckRedirectMsg(ahandle) != 0) { + return TSDB_CODE_REDIRECT; + } - uint32_t tagsLen = 0; - for (int32_t i = 0; i < pMetric->numOfTags; ++i) { - tagsLen += pTagSchema[i].bytes; + SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData)); + if (pCreateMsg == NULL) { + rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); + strcpy(pCreateMsg->tableId, pInfo->tableId); + + int32_t code = mgmtCreateTable(pDb, pCreateMsg); + + char stableName[TSDB_TABLE_ID_LEN] = {0}; + strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN); + mTrace("table:%s is auto created by %s from %s, code:%d", pCreateMsg->tableId, pUser->user, stableName, code); + + tfree(pCreateMsg); + + if (code != TSDB_CODE_SUCCESS) { + rpcSendResponse(ahandle, code, NULL, 0); + return code; + } + + pTable = mgmtGetTable(pInfo->tableId); } - memcpy(pTags, tagVal, tagsLen); - return tagsLen; -} + if (pTable == NULL) { + rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0); + return TSDB_CODE_INVALID_TABLE; + } -int32_t mgmtProcessMeterMetaMsg(void *pCont, int32_t contLen, void *ahandle) { -// SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; -// STabObj * pMeterObj = NULL; -// SVgObj * pVgroup = NULL; -// SMeterMeta * pMeta = NULL; -// SSchema * pSchema = NULL; -// STaosRsp * pRsp = NULL; -// char * pStart = NULL; -// -// pInfo->createFlag = htons(pInfo->createFlag); -// -// int32_t size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + -// sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN + TSDB_EXTRA_PAYLOAD_SIZE; -// -// SDbObj *pDb = NULL; -// if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); -// -// // todo db check should be extracted -// if (pDb == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) { -// -// if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) { -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); -// return 0; -// } -// -// pRsp->code = TSDB_CODE_INVALID_DB; -// pMsg++; -// -// goto _exit_code; -// } -// -// pMeterObj = mgmtGetTable(pInfo->meterId); -// -// // on demand create table from super table if meter does not exists -// if (pMeterObj == NULL && pInfo->createFlag == 1) { -// // write operation needs to redirect to master mnode -// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_TABLE_META_RSP) != 0) { -// return 0; -// } -// -// SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData)); -// if (pCreateMsg == NULL) { -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); -// return 0; -// } -// -// memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); -// strcpy(pCreateMsg->meterId, pInfo->meterId); -// -// SDbObj* pMeterDb = mgmtGetDbByTableId(pCreateMsg->meterId); -// mTrace("table:%s, pConnDb:%p, pConnDbName:%s, pMeterDb:%p, pMeterDbName:%s", -// pCreateMsg->meterId, pDb, pDb->name, pMeterDb, pMeterDb->name); -// assert(pDb == pMeterDb); -// -// int32_t code = mgmtCreateTable(pDb, pCreateMsg); -// -// char stableName[TSDB_TABLE_ID_LEN] = {0}; -// strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN); -// mTrace("table:%s is automatically created by %s from %s, code:%d", pCreateMsg->meterId, pConn->pUser->user, -// stableName, code); -// -// tfree(pCreateMsg); -// -// if (code != TSDB_CODE_SUCCESS) { -// if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) { -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); -// return 0; -// } -// -// pRsp->code = code; -// pMsg++; -// -// goto _exit_code; -// } -// -// pMeterObj = mgmtGetTable(pInfo->meterId); -// } -// -// if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) { -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); -// return 0; -// } -// -// if (pMeterObj == NULL) { -// if (pDb) -// pRsp->code = TSDB_CODE_INVALID_TABLE; -// else -// pRsp->code = TSDB_CODE_DB_NOT_SELECTED; -// pMsg++; -// } else { -// mTrace("%s, uid:%" PRIu64 " meter meta is retrieved", pInfo->meterId, pMeterObj->uid); -// pRsp->code = 0; -// pMsg += sizeof(STaosRsp); -// *pMsg = TSDB_IE_TYPE_META; -// pMsg++; -// -// pMeta = (SMeterMeta *)pMsg; -// pMeta->uid = htobe64(pMeterObj->uid); -// pMeta->sid = htonl(pMeterObj->gid.sid); -// pMeta->vgid = htonl(pMeterObj->gid.vgId); -// pMeta->sversion = htons(pMeterObj->sversion); -// -// pMeta->precision = pDb->cfg.precision; -// -// pMeta->numOfTags = pMeterObj->numOfTags; -// pMeta->numOfColumns = htons(pMeterObj->numOfColumns); -// pMeta->tableType = pMeterObj->tableType; -// -// pMsg += sizeof(SMeterMeta); -// pSchema = (SSchema *)pMsg; // schema locates at the end of SMeterMeta struct -// -// if (mgmtTableCreateFromSuperTable(pMeterObj)) { -// assert(pMeterObj->numOfTags == 0); -// -// STabObj *pMetric = mgmtGetTable(pMeterObj->pTagData); -// uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns; -// -// pMeta->numOfTags = pMetric->numOfTags; // update the numOfTags info -// mgmtSetSchemaFromMeters(pSchema, pMetric, numOfTotalCols); -// pMsg += numOfTotalCols * sizeof(SSchema); -// -// // for meters created from metric, we need the metric tag schema to parse the tag data -// int32_t tagsLen = mgmtSetMeterTagValue(pMsg, pMetric, pMeterObj); -// pMsg += tagsLen; -// } else { -// /* -// * for metrics, or meters that are not created from metric, set the schema directly -// * for meters created from metric, we use the schema of metric instead -// */ -// uint32_t numOfTotalCols = (uint32_t)pMeterObj->numOfTags + pMeterObj->numOfColumns; -// mgmtSetSchemaFromMeters(pSchema, pMeterObj, numOfTotalCols); -// pMsg += numOfTotalCols * sizeof(SSchema); -// } -// -// if (mgmtIsNormalTable(pMeterObj)) { -// pVgroup = mgmtGetVgroup(pMeterObj->gid.vgId); -// if (pVgroup == NULL) { -// pRsp->code = TSDB_CODE_INVALID_TABLE; -// goto _exit_code; -// } -// for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { -// if (pConn->usePublicIp) { -// pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; -// pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); -// } else { -// pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; -// pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); -// } -// } -// } -// } -// -//_exit_code: -// msgLen = pMsg - pStart; -// -// taosSendMsgToPeer(pConn->thandle, pStart, msgLen); -// -// return msgLen; - return 0; + SMeterMeta *pMeta = rpcMallocCont(sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS); + int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp); + + if (code == TSDB_CODE_SUCCESS) { + rpcFreeCont(pMeta); + rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, NULL, 0); + } else { + pMeta->contLen = htons(pMeta->contLen); + rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pMeta, pMeta->contLen); + } + + return TSDB_CODE_SUCCESS; } -/** - * multi meter meta rsp pkg format: - * | STaosRsp | ieType | SMultiMeterInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2 - * 1B 1B 4B - * - * | STaosHeader | STaosRsp | ieType | SMultiMeterInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | ......................| - * ^ ^ ^ - * |<--------------------------------------size-----------------------------------------------|---------------------->| - * | | | - * pStart pCurMeter pTail - **/ int32_t mgmtProcessMultiMeterMetaMsg(void *pCont, int32_t contLen, void *ahandle) { -// SDbObj * pDbObj = NULL; -// STabObj * pMeterObj = NULL; -// SVgObj * pVgroup = NULL; -// SMultiMeterMeta * pMeta = NULL; -// SSchema * pSchema = NULL; -// STaosRsp * pRsp = NULL; -// char * pStart = NULL; -// -// SMultiMeterInfoMsg * pInfo = (SMultiMeterInfoMsg *)pMsg; -// char * str = pMsg + sizeof(SMultiMeterInfoMsg); -// pInfo->numOfMeters = htonl(pInfo->numOfMeters); -// -// int32_t size = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice -// -// char *pNewMsg; -// if ((pStart = mgmtForMultiAllocMsg(pConn, size, &pNewMsg, &pRsp)) == NULL) { -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); -// return 0; -// } -// -// int32_t totalNum = 0; -// char tblName[TSDB_TABLE_ID_LEN]; -// char* nextStr; -// -// char* pCurMeter = pStart + sizeof(STaosRsp) + sizeof(SMultiMeterInfoMsg) + 1; // 1: ie type byte -// char* pTail = pStart + size; -// -// while (str - pMsg < msgLen) { -// nextStr = strchr(str, ','); -// if (nextStr == NULL) { -// break; -// } -// -// memcpy(tblName, str, nextStr - str); -// tblName[nextStr - str] = '\0'; -// str = nextStr + 1; -// -// // judge whether the remaining memory is adequate -// if ((pTail - pCurMeter) < MAX_LEN_OF_METER_META) { -// char* pMsgHdr = pStart - sizeof(STaosHeader); -// size *= 2; -// pMsgHdr = (char*)realloc(pMsgHdr, size); -// if (NULL == pMsgHdr) { -// char* pTmp = pStart - sizeof(STaosHeader); -// tfree(pTmp); -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); -// break; -// } -// -// pCurMeter = (char*)pMsgHdr + sizeof(STaosHeader) + (pCurMeter - pStart); -// pStart = (char*)pMsgHdr + sizeof(STaosHeader); -// pNewMsg = pStart; -// pRsp = (STaosRsp *)pStart; -// pTail = pMsgHdr + size; -// } -// -// // get meter schema, and fill into resp payload -// pMeterObj = mgmtGetTable(tblName); -// pDbObj = mgmtGetDbByTableId(tblName); -// -// if (pMeterObj == NULL || (pDbObj == NULL)) { -// continue; -// } else { -// mTrace("%s, uid:%" PRIu64 " sversion:%d meter meta is retrieved", tblName, pMeterObj->uid, pMeterObj->sversion); -// pMeta = (SMultiMeterMeta *)pCurMeter; -// -// memcpy(pMeta->meterId, tblName, strlen(tblName)); -// pMeta->meta.uid = htobe64(pMeterObj->uid); -// pMeta->meta.sid = htonl(pMeterObj->gid.sid); -// pMeta->meta.vgid = htonl(pMeterObj->gid.vgId); -// pMeta->meta.sversion = htons(pMeterObj->sversion); -// pMeta->meta.precision = pDbObj->cfg.precision; -// pMeta->meta.numOfTags = pMeterObj->numOfTags; -// pMeta->meta.numOfColumns = htons(pMeterObj->numOfColumns); -// pMeta->meta.tableType = pMeterObj->tableType; -// -// pCurMeter += sizeof(SMultiMeterMeta); -// pSchema = (SSchema *)pCurMeter; // schema locates at the end of SMeterMeta struct -// -// if (mgmtTableCreateFromSuperTable(pMeterObj)) { -// assert(pMeterObj->numOfTags == 0); -// -// STabObj *pMetric = mgmtGetTable(pMeterObj->pTagData); -// uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns; -// -// pMeta->meta.numOfTags = pMetric->numOfTags; // update the numOfTags info -// mgmtSetSchemaFromMeters(pSchema, pMetric, numOfTotalCols); -// pCurMeter += numOfTotalCols * sizeof(SSchema); -// -// // for meters created from metric, we need the metric tag schema to parse the tag data -// int32_t tagsLen = mgmtSetMeterTagValue(pCurMeter, pMetric, pMeterObj); -// pCurMeter += tagsLen; -// } else { -// /* -// * for metrics, or meters that are not created from metric, set the schema directly -// * for meters created from metric, we use the schema of metric instead -// */ -// uint32_t numOfTotalCols = (uint32_t)pMeterObj->numOfTags + pMeterObj->numOfColumns; -// mgmtSetSchemaFromMeters(pSchema, pMeterObj, numOfTotalCols); -// pCurMeter += numOfTotalCols * sizeof(SSchema); -// } -// -// if (mgmtIsNormalTable(pMeterObj)) { -// pVgroup = mgmtGetVgroup(pMeterObj->gid.vgId); -// if (pVgroup == NULL) { -// pRsp->code = TSDB_CODE_INVALID_TABLE; -// pNewMsg++; -// mError("%s, uid:%" PRIu64 " sversion:%d vgId:%d pVgroup is NULL", tblName, pMeterObj->uid, pMeterObj->sversion, -// pMeterObj->gid.vgId); -// goto _error_exit_code; -// } -// -// for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { -// if (pConn->usePublicIp) { -// pMeta->meta.vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; -// pMeta->meta.vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); -// } else { -// pMeta->meta.vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; -// pMeta->meta.vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); -// } -// } -// } -// } -// -// totalNum++; -// if (totalNum > pInfo->numOfMeters) { -// pNewMsg++; -// break; -// } -// } -// -// // fill rsp code, ieType -// msgLen = pCurMeter - pNewMsg; -// -// pRsp->code = 0; -// pNewMsg += sizeof(STaosRsp); -// *pNewMsg = TSDB_IE_TYPE_META; -// pNewMsg++; -// -// SMultiMeterInfoMsg *pRspInfo = (SMultiMeterInfoMsg *)pNewMsg; -// -// pRspInfo->numOfMeters = htonl(totalNum); -// goto _exit_code; -// -//_error_exit_code: -// msgLen = pNewMsg - pStart; -// -//_exit_code: -// taosSendMsgToPeer(pConn->thandle, pStart, msgLen); -// -// return msgLen; - return 0; -} + SRpcConnInfo connInfo; + rpcGetConnInfo(ahandle, &connInfo); -int32_t mgmtProcessMetricMetaMsg(void *pCont, int32_t contLen, void *ahandle) { -// SSuperTableMetaMsg *pSuperTableMetaMsg = (SSuperTableMetaMsg *)pMsg; -// STabObj * pMetric; -// STaosRsp * pRsp; -// char * pStart; -// -// pSuperTableMetaMsg->numOfMeters = htonl(pSuperTableMetaMsg->numOfMeters); -// -// pSuperTableMetaMsg->join = htonl(pSuperTableMetaMsg->join); -// pSuperTableMetaMsg->joinCondLen = htonl(pSuperTableMetaMsg->joinCondLen); -// -// for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) { -// pSuperTableMetaMsg->metaElem[i] = htonl(pSuperTableMetaMsg->metaElem[i]); -// } -// -// SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)(((char *)pSuperTableMetaMsg) + pSuperTableMetaMsg->metaElem[0]); -// pMetric = mgmtGetTable(pElem->meterId); -// -// SDbObj *pDb = NULL; -// if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); -// -// if (pMetric == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) { -// pStart = taosBuildRspMsg(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP); -// if (pStart == NULL) { -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); -// return 0; -// } -// -// pMsg = pStart; -// pRsp = (STaosRsp *)pMsg; -// if (pDb) -// pRsp->code = TSDB_CODE_INVALID_TABLE; -// else -// pRsp->code = TSDB_CODE_DB_NOT_SELECTED; -// pMsg++; -// -// msgLen = pMsg - pStart; -// } else { -// msgLen = mgmtRetrieveMetricMeta(pConn, &pStart, pSuperTableMetaMsg); -// if (msgLen <= 0) { -// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); -// return 0; -// } -// } -// -// taosSendMsgToPeer(pConn->thandle, pStart, msgLen); -// -// return msgLen; - return 0; + bool usePublicIp = (connInfo.serverIp == tsPublicIpInt); + SUserObj *pUser = mgmtGetUser(connInfo.user); + if (pUser == NULL) { + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return TSDB_CODE_INVALID_USER; + } + + SMultiTableInfoMsg *pInfo = pCont; + pInfo->numOfTables = htonl(pInfo->numOfTables); + + int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice + SMultiMeterMeta *pMultiMeta = rpcMallocCont(totalMallocLen); + if (pMultiMeta == NULL) { + rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + pMultiMeta->contLen = sizeof(SMultiMeterMeta); + pMultiMeta->numOfTables = 0; + + for (int t = 0; t < pInfo->numOfTables; ++t) { + char *tableId = (char*)(pInfo->tableIds + t * TSDB_TABLE_ID_LEN); + STableInfo *pTable = mgmtGetTable(tableId); + if (pTable == NULL) continue; + + SDbObj *pDb = mgmtGetDbByTableId(tableId); + if (pDb == NULL) continue; + + int availLen = totalMallocLen - pMultiMeta->contLen; + if (availLen <= sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS) { + //TODO realloc + //totalMallocLen *= 2; + //pMultiMeta = rpcReMalloc(pMultiMeta, totalMallocLen); + //if (pMultiMeta == NULL) { + /// rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); + // return TSDB_CODE_SERV_OUT_OF_MEMORY; + //} else { + // t--; + // continue; + //} + } + + SMeterMeta *pMeta = (SMeterMeta *)(pMultiMeta->metas + pMultiMeta->contLen); + int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp); + if (code == TSDB_CODE_SUCCESS) { + pMultiMeta->numOfTables ++; + pMultiMeta->contLen += pMeta->contLen; + } + } + + rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pMultiMeta, pMultiMeta->contLen); + return TSDB_CODE_SUCCESS; } int32_t mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) { @@ -873,40 +606,40 @@ int32_t mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) { static void mgmtInitShowMsgFp() { mgmtGetMetaFp = (GetMateFp *)malloc(TSDB_MGMT_TABLE_MAX * sizeof(GetMateFp)); - mgmtGetMetaFp[TSDB_MGMT_TABLE_ACCT] = mgmtGetAcctMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_USER] = mgmtGetUserMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_DB] = mgmtGetDbMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_TABLE] = mgmtGetTableMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_DNODE] = mgmtGetDnodeMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_MNODE] = mgmtGetMnodeMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_VGROUP] = mgmtGetVgroupMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_METRIC] = mgmtGetSuperTableMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_MODULE] = mgmtGetModuleMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_ACCT] = mgmtGetAcctMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_USER] = mgmtGetUserMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_DB] = mgmtGetDbMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_TABLE] = mgmtGetShowTableMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_DNODE] = mgmtGetDnodeMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_MNODE] = mgmtGetMnodeMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_VGROUP] = mgmtGetVgroupMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_METRIC] = mgmtGetShowSuperTableMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_MODULE] = mgmtGetModuleMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_QUERIES] = mgmtGetQueryMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_STREAMS] = mgmtGetStreamMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_CONFIGS] = mgmtGetConfigMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_CONNS] = mgmtGetConnsMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_SCORES] = mgmtGetScoresMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_GRANTS] = mgmtGetGrantsMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_VNODES] = mgmtGetVnodeMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_CONNS] = mgmtGetConnsMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_SCORES] = mgmtGetScoresMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_GRANTS] = mgmtGetGrantsMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_VNODES] = mgmtGetVnodeMeta; mgmtRetrieveFp = (RetrieveMetaFp *)malloc(TSDB_MGMT_TABLE_MAX * sizeof(RetrieveMetaFp)); - mgmtRetrieveFp[TSDB_MGMT_TABLE_ACCT] = mgmtRetrieveAccts; - mgmtRetrieveFp[TSDB_MGMT_TABLE_USER] = mgmtRetrieveUsers; - mgmtRetrieveFp[TSDB_MGMT_TABLE_DB] = mgmtRetrieveDbs; - mgmtRetrieveFp[TSDB_MGMT_TABLE_TABLE] = mgmtRetrieveTables; - mgmtRetrieveFp[TSDB_MGMT_TABLE_DNODE] = mgmtRetrieveDnodes; - mgmtRetrieveFp[TSDB_MGMT_TABLE_MNODE] = mgmtRetrieveMnodes; - mgmtRetrieveFp[TSDB_MGMT_TABLE_VGROUP] = mgmtRetrieveVgroups; - mgmtRetrieveFp[TSDB_MGMT_TABLE_METRIC] = mgmtRetrieveSuperTables; - mgmtRetrieveFp[TSDB_MGMT_TABLE_MODULE] = mgmtRetrieveModules; + mgmtRetrieveFp[TSDB_MGMT_TABLE_ACCT] = mgmtRetrieveAccts; + mgmtRetrieveFp[TSDB_MGMT_TABLE_USER] = mgmtRetrieveUsers; + mgmtRetrieveFp[TSDB_MGMT_TABLE_DB] = mgmtRetrieveDbs; + mgmtRetrieveFp[TSDB_MGMT_TABLE_TABLE] = mgmtRetrieveShowTables; + mgmtRetrieveFp[TSDB_MGMT_TABLE_DNODE] = mgmtRetrieveDnodes; + mgmtRetrieveFp[TSDB_MGMT_TABLE_MNODE] = mgmtRetrieveMnodes; + mgmtRetrieveFp[TSDB_MGMT_TABLE_VGROUP] = mgmtRetrieveVgroups; + mgmtRetrieveFp[TSDB_MGMT_TABLE_METRIC] = mgmtRetrieveShowSuperTables; + mgmtRetrieveFp[TSDB_MGMT_TABLE_MODULE] = mgmtRetrieveModules; mgmtRetrieveFp[TSDB_MGMT_TABLE_QUERIES] = mgmtRetrieveQueries; mgmtRetrieveFp[TSDB_MGMT_TABLE_STREAMS] = mgmtRetrieveStreams; mgmtRetrieveFp[TSDB_MGMT_TABLE_CONFIGS] = mgmtRetrieveConfigs; - mgmtRetrieveFp[TSDB_MGMT_TABLE_CONNS] = mgmtRetrieveConns; - mgmtRetrieveFp[TSDB_MGMT_TABLE_SCORES] = mgmtRetrieveScores; - mgmtRetrieveFp[TSDB_MGMT_TABLE_GRANTS] = mgmtRetrieveGrants; - mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes; + mgmtRetrieveFp[TSDB_MGMT_TABLE_CONNS] = mgmtRetrieveConns; + mgmtRetrieveFp[TSDB_MGMT_TABLE_SCORES] = mgmtRetrieveScores; + mgmtRetrieveFp[TSDB_MGMT_TABLE_GRANTS] = mgmtRetrieveGrants; + mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes; } int32_t mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) { @@ -1084,9 +817,9 @@ int32_t mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) { } else { SDbObj *pDb = mgmtGetDb(pDrop->db); if (pDb) { - code = mgmtDropTable(pDb, pDrop->meterId, pDrop->igNotExists); + code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists); if (code == TSDB_CODE_SUCCESS) { - mTrace("table:%s is dropped by user:%s", pDrop->meterId, pUser->user); + mTrace("table:%s is dropped by user:%s", pDrop->tableId, pUser->user); } } else { code = TSDB_CODE_DB_NOT_SELECTED; @@ -1310,14 +1043,14 @@ connect_over: * check if we need to add mgmtProcessMeterMetaMsg into tranQueue, which will be executed one-by-one. */ static bool mgmtCheckMeterMetaMsgType(void *pMsg) { - SMeterInfoMsg *pInfo = (SMeterInfoMsg *) pMsg; + STableInfoMsg *pInfo = (STableInfoMsg *) pMsg; int16_t autoCreate = htons(pInfo->createFlag); - STableInfo *pTable = mgmtGetTable(pInfo->meterId); + STableInfo *pTable = mgmtGetTable(pInfo->tableId); // If table does not exists and autoCreate flag is set, we add the handler into task queue bool addIntoTranQueue = (pTable == NULL && autoCreate == 1); if (addIntoTranQueue) { - mTrace("table:%s auto created task added", pInfo->meterId); + mTrace("table:%s auto created task added", pInfo->tableId); } return addIntoTranQueue; @@ -1380,10 +1113,10 @@ void mgmtInitProcessShellMsg() { mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_STREAM] = mgmtProcessKillStreamMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_KILL_CONNECTION] = mgmtProcessKillConnectionMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessMeterMetaMsg; - mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessMetricMetaMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiMeterMetaMsg; + mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessUnSupportMsg; } static int32_t mgmtCheckRedirectMsgImp(void *pConn) { diff --git a/src/mnode/src/mgmtStreamTable.c b/src/mnode/src/mgmtStreamTable.c index dc7ddba14b271c176171aa7cd03ea7e7f64c86a4..dc7dba5ff0592ed1aad85b4777cee44d95191bb5 100644 --- a/src/mnode/src/mgmtStreamTable.c +++ b/src/mnode/src/mgmtStreamTable.c @@ -379,6 +379,47 @@ int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) { return 0; } -SStreamTableObj* mgmtGetStreamTable(char *tableId) { - return (SStreamTableObj *)sdbGetRow(tsStreamTableSdb, tableId); -} \ No newline at end of file +void* mgmtGetStreamTable(char *tableId) { + return sdbGetRow(tsStreamTableSdb, tableId); +} + +static int32_t mgmtSetSchemaFromStreamTable(SSchema *pSchema, SStreamTableObj *pTable) { + int32_t numOfCols = pTable->numOfColumns; + for (int32_t i = 0; i < numOfCols; ++i) { + strcpy(pSchema->name, pTable->schema[i].name); + pSchema->type = pTable->schema[i].type; + pSchema->bytes = htons(pTable->schema[i].bytes); + pSchema->colId = htons(pTable->schema[i].colId); + pSchema++; + } + + return numOfCols * sizeof(SSchema); +} + +int32_t mgmtGetStreamTableMeta(SDbObj *pDb, SStreamTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp) { + pMeta->uid = htobe64(pTable->uid); + pMeta->sid = htonl(pTable->sid); + pMeta->vgid = htonl(pTable->vgId); + pMeta->sversion = htons(pTable->sversion); + pMeta->precision = pDb->cfg.precision; + pMeta->numOfTags = 0; + pMeta->numOfColumns = htons(pTable->numOfColumns); + pMeta->tableType = pTable->type; + pMeta->contLen = sizeof(SMeterMeta) + mgmtSetSchemaFromStreamTable(pMeta->schema, pTable); + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + return TSDB_CODE_INVALID_TABLE; + } + for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { + if (usePublicIp) { + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } else { + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } + } + + return TSDB_CODE_SUCCESS; +} diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index ca071aca40a0a3dd3e068c52ffa4f35d40d8bd75..5e1a0960aa8f02ba53e85c0a8eadf9f1556ba6e8 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -235,8 +235,8 @@ int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) { return sdbDeleteRow(tsSuperTableSdb, pSuperTable); } -SSuperTableObj* mgmtGetSuperTable(char *tableId) { - return (SSuperTableObj *)sdbGetRow(tsSuperTableSdb, tableId); +void* mgmtGetSuperTable(char *tableId) { + return sdbGetRow(tsSuperTableSdb, tableId); } int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) { @@ -457,7 +457,7 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) { return TSDB_CODE_SUCCESS; } -int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) { +int32_t mgmtGetShowSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) { // int32_t cols = 0; // // SDbObj *pDb = NULL; @@ -509,7 +509,7 @@ int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) { return 0; } -int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; // char * pWrite; // int32_t cols = 0; @@ -602,3 +602,45 @@ int32_t mgmtGetTagsLength(SSuperTableObj* pSuperTable, int32_t col) { // length return len; } + +int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { + int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags; + for (int32_t i = 0; i < numOfCols; ++i) { + strcpy(pSchema->name, pTable->schema[i].name); + pSchema->type = pTable->schema[i].type; + pSchema->bytes = htons(pTable->schema[i].bytes); + pSchema->colId = htons(pTable->schema[i].colId); + pSchema++; + } + + return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); +} + +int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp) { + pMeta->uid = htobe64(pTable->uid); + pMeta->sid = htonl(pTable->sid); + pMeta->vgid = htonl(pTable->vgId); + pMeta->sversion = htons(pTable->sversion); + pMeta->precision = pDb->cfg.precision; + pMeta->numOfTags = pTable->numOfTags; + pMeta->numOfColumns = htons(pTable->numOfColumns); + pMeta->tableType = pTable->type; + pMeta->contLen = sizeof(SMeterMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable); + + SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + return TSDB_CODE_INVALID_TABLE; + } + for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) { + if (usePublicIp) { + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } else { + pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; + pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + } + } + + return TSDB_CODE_SUCCESS; +} + diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 5a5edffb56fd29d55e501cbf9c2e367e19db0930..4a7d41aa072ecd5b58d44d19965c9134a726855c 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -99,6 +99,24 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid) { return NULL; } +int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, SMeterMeta *pMeta, bool usePublicIp) { + if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) { + mgmtGetChildTableMeta(pDb, (SChildTableObj *) pTable, pMeta, usePublicIp); + } else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) { + mgmtGetStreamTableMeta(pDb, (SStreamTableObj *) pTable, pMeta, usePublicIp); + } else if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { + mgmtGetNormalTableMeta(pDb, (SNormalTableObj *) pTable, pMeta, usePublicIp); + } else if (pTable->type == TSDB_TABLE_TYPE_SUPER_TABLE) { + mgmtGetSuperTableMeta(pDb, (SSuperTableObj *) pTable, pMeta, usePublicIp); + } else { + mTrace("%s, uid:%" PRIu64 " table meta retrieve failed, invalid type", pTable->tableId, pTable->uid); + return TSDB_CODE_INVALID_TABLE; + } + + mTrace("%s, uid:%" PRIu64 " table meta is retrieved", pTable->tableId, pTable->uid); + return TSDB_CODE_SUCCESS; +} + int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { STableInfo *pTable = mgmtGetTable(pCreate->tableId); if (pTable != NULL) { @@ -229,7 +247,7 @@ void mgmtCleanUpMeters() { mgmtCleanUpSuperTables(); } -int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) { +int32_t mgmtGetShowTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) { // int32_t cols = 0; // // SDbObj *pDb = NULL; @@ -292,7 +310,7 @@ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_ } } -int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; // int32_t numOfRead = 0; // int32_t cols = 0;