提交 03da5549 编写于 作者: S slguan

get meta message

上级 41149af7
......@@ -120,7 +120,7 @@ typedef struct SCond {
} SCond;
typedef struct SJoinNode {
char meterId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN];
uint64_t uid;
int16_t tagCol;
} SJoinNode;
......@@ -155,7 +155,7 @@ typedef struct SParamInfo {
} SParamInfo;
typedef struct STableDataBlocks {
char meterId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgid; // virtual group id
......@@ -384,7 +384,7 @@ int tscProcessSql(SSqlObj *pSql);
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
int tscRenewMeterMeta(SSqlObj *pSql, char *meterId);
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
void tscQueueAsyncRes(SSqlObj *pSql);
void tscQueueAsyncError(void(*fp), void *param);
......
......@@ -1544,7 +1544,7 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
continue;
}
strncpy(pMeterMetaInfo->name, pDataBlock->meterId, TSDB_TABLE_ID_LEN);
strncpy(pMeterMetaInfo->name, pDataBlock->tableId, TSDB_TABLE_ID_LEN);
memset(pDataBlock->pData, 0, pDataBlock->nAllocSize);
int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo);
......
......@@ -2867,7 +2867,7 @@ static int32_t getJoinCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) {
pLeft->uid = pMeterMetaInfo->pMeterMeta->uid;
pLeft->tagCol = tagColIndex;
strcpy(pLeft->meterId, pMeterMetaInfo->name);
strcpy(pLeft->tableId, pMeterMetaInfo->name);
index = (SColumnIndex)COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(&pExpr->pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
......@@ -2879,7 +2879,7 @@ static int32_t getJoinCondInfo(SQueryInfo* pQueryInfo, tSQLExpr* pExpr) {
pRight->uid = pMeterMetaInfo->pMeterMeta->uid;
pRight->tagCol = tagColIndex;
strcpy(pRight->meterId, pMeterMetaInfo->name);
strcpy(pRight->tableId, pMeterMetaInfo->name);
pTagCond->joinInfo.hasJoin = true;
return TSDB_CODE_SUCCESS;
......
......@@ -1838,7 +1838,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pDropTableMsg = (SDropTableMsg *)pMsg;
strcpy(pDropTableMsg->meterId, pMeterMetaInfo->name);
strcpy(pDropTableMsg->tableId, pMeterMetaInfo->name);
pDropTableMsg->igNotExists = pInfo->pDCLInfo->existsCheck ? 1 : 0;
pMsg += sizeof(SDropTableMsg);
......@@ -2353,7 +2353,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SMeterInfoMsg *pInfoMsg;
STableInfoMsg *pInfoMsg;
char * pMsg, *pStart;
int msgLen = 0;
......@@ -2381,10 +2381,10 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sizeof(SMgmtHead);
pInfoMsg = (SMeterInfoMsg *)pMsg;
strcpy(pInfoMsg->meterId, pMeterMetaInfo->name);
pInfoMsg = (STableInfoMsg *)pMsg;
strcpy(pInfoMsg->tableId, pMeterMetaInfo->name);
pInfoMsg->createFlag = htons(pSql->cmd.createOnDemand ? 1 : 0);
pMsg += sizeof(SMeterInfoMsg);
pMsg += sizeof(STableInfoMsg);
if (pSql->cmd.createOnDemand) {
memcpy(pInfoMsg->tags, tmpData, sizeof(STagData));
......@@ -2403,7 +2403,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
/**
* multi meter meta req pkg format:
* | SMgmtHead | SMultiMeterInfoMsg | meterId0 | meterId1 | meterId2 | ......
* | SMgmtHead | SMultiTableInfoMsg | tableId0 | tableId1 | tableId2 | ......
* no used 4B
**/
int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
......@@ -2421,16 +2421,16 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN); // server don't need the db
SMultiMeterInfoMsg *pInfoMsg = (SMultiMeterInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
pInfoMsg->numOfMeters = htonl((int32_t)pCmd->count);
SMultiTableInfoMsg *pInfoMsg = (SMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
if (pCmd->payloadLen > 0) {
memcpy(pInfoMsg->meterId, tmpData, pCmd->payloadLen);
memcpy(pInfoMsg->tableIds, tmpData, pCmd->payloadLen);
}
tfree(tmpData);
pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiMeterInfoMsg);
pCmd->payloadLen += sizeof(SMgmtHead) + sizeof(SMultiTableInfoMsg);
pCmd->msgType = TSDB_MSG_TYPE_MULTI_TABLE_META;
assert(pCmd->payloadLen + minMsgSize() <= pCmd->allocSize);
......@@ -2502,13 +2502,13 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// todo refactor
pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
memcpy(pMsg, pTagCond->joinInfo.left.meterId, TSDB_TABLE_ID_LEN);
memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
pMsg += TSDB_TABLE_ID_LEN;
*(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
pMsg += sizeof(int16_t);
memcpy(pMsg, pTagCond->joinInfo.right.meterId, TSDB_TABLE_ID_LEN);
memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
pMsg += TSDB_TABLE_ID_LEN;
*(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
......@@ -2590,7 +2590,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
strcpy(pElem->meterId, pMeterMetaInfo->name);
strcpy(pElem->tableId, pMeterMetaInfo->name);
pElem->numOfTags = htons(pMeterMetaInfo->numOfTags);
int16_t len = pMsg - (char *)pElem;
......@@ -2751,7 +2751,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
/**
* multi meter meta rsp pkg format:
* | STaosRsp | ieType | SMultiMeterInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
* | STaosRsp | ieType | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
* |...... 1B 1B 4B
**/
int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
......@@ -2772,13 +2772,13 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
rsp++;
SMultiMeterInfoMsg *pInfo = (SMultiMeterInfoMsg *)rsp;
totalNum = htonl(pInfo->numOfMeters);
rsp += sizeof(SMultiMeterInfoMsg);
SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp;
totalNum = htonl(pInfo->numOfTables);
rsp += sizeof(SMultiTableInfoMsg);
for (i = 0; i < totalNum; i++) {
SMultiMeterMeta *pMultiMeta = (SMultiMeterMeta *)rsp;
SMeterMeta * pMeta = &pMultiMeta->meta;
SMeterMeta * pMeta = &pMultiMeta->metas;
pMeta->sid = htonl(pMeta->sid);
pMeta->sversion = htons(pMeta->sversion);
......@@ -2850,7 +2850,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with SMeterMeta in cache
pMeta->index = 0;
(void)taosAddDataIntoCache(tscCacheHandle, pMultiMeta->meterId, (char *)pMeta, size, tsMeterMetaKeepTimer);
(void)taosAddDataIntoCache(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
}
pSql->res.code = TSDB_CODE_SUCCESS;
......@@ -3312,10 +3312,10 @@ static void tscWaitingForCreateTable(SSqlCmd *pCmd) {
/**
* in renew metermeta, do not retrieve metadata in cache.
* @param pSql sql object
* @param meterId meter id
* @param tableId meter id
* @return status code
*/
int tscRenewMeterMeta(SSqlObj *pSql, char *meterId) {
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
int code = 0;
// handle metric meta renew process
......
......@@ -56,7 +56,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) {
char join[512] = {0};
if (pTagCond->joinInfo.hasJoin) {
sprintf(join, "%s,%s", pTagCond->joinInfo.left.meterId, pTagCond->joinInfo.right.meterId);
sprintf(join, "%s,%s", pTagCond->joinInfo.left.tableId, pTagCond->joinInfo.right.tableId);
}
// estimate the buffer size
......@@ -156,13 +156,13 @@ bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd) {
return false;
}
void tscGetDBInfoFromMeterId(char* meterId, char* db) {
char* st = strstr(meterId, TS_PATH_DELIMITER);
void tscGetDBInfoFromMeterId(char* tableId, char* db) {
char* st = strstr(tableId, TS_PATH_DELIMITER);
if (st != NULL) {
char* end = strstr(st + 1, TS_PATH_DELIMITER);
if (end != NULL) {
memcpy(db, meterId, (end - meterId));
db[end - meterId] = 0;
memcpy(db, tableId, (end - tableId));
db[end - tableId] = 0;
return;
}
}
......@@ -590,12 +590,12 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
// set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) {
strcpy(pMeterMetaInfo->name, pDataBlock->meterId);
strcpy(pMeterMetaInfo->name, pDataBlock->tableId);
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false);
pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pDataBlock->pMeterMeta);
} else {
assert(strncmp(pMeterMetaInfo->name, pDataBlock->meterId, tListLen(pDataBlock->meterId)) == 0);
assert(strncmp(pMeterMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0);
}
/*
......@@ -660,7 +660,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
dataBuf->size = startOffset;
dataBuf->tsSource = -1;
strncpy(dataBuf->meterId, name, TSDB_TABLE_ID_LEN);
strncpy(dataBuf->tableId, name, TSDB_TABLE_ID_LEN);
/*
* The metermeta may be released since the metermeta cache are completed clean by other thread
......@@ -709,7 +709,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
STableDataBlocks* dataBuf = NULL;
int32_t ret =
tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE,
tsInsertHeadSize, 0, pOneTableBlock->meterId, pOneTableBlock->pMeterMeta, &dataBuf);
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pMeterMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
taosCleanUpHashTable(pVnodeDataBlockHashList);
......@@ -743,7 +743,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
char* e = (char*)pBlocks->payLoad + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
tscTrace("%p meterId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->meterId, pBlocks->sid,
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, pBlocks->sid,
pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->payLoad), GET_INT64_VAL(e));
pBlocks->sid = htonl(pBlocks->sid);
......
......@@ -281,7 +281,7 @@ typedef struct {
} SCreateTableMsg;
typedef struct {
char meterId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN];
int8_t igNotExists;
} SDropTableMsg;
......@@ -348,7 +348,7 @@ typedef struct {
short vnode;
int32_t sid;
uint64_t uid;
char meterId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN];
} SDRemoveTableMsg;
typedef struct {
......@@ -615,7 +615,7 @@ typedef struct {
typedef struct {
uint32_t destId;
uint32_t destIp;
char meterId[TSDB_UNI_LEN];
char tableId[TSDB_UNI_LEN];
char empty[3];
uint8_t msgType;
int32_t msgLen;
......@@ -647,20 +647,21 @@ typedef struct {
} SVPeersMsg;
typedef struct {
char meterId[TSDB_TABLE_ID_LEN];
short createFlag;
char tableId[TSDB_TABLE_ID_LEN];
char db[TSDB_DB_NAME_LEN];
int16_t createFlag;
char tags[];
} SMeterInfoMsg;
} STableInfoMsg;
typedef struct {
int32_t numOfMeters;
char meterId[];
} SMultiMeterInfoMsg;
int32_t numOfTables;
char tableIds[];
} SMultiTableInfoMsg;
typedef struct {
int16_t elemLen;
char meterId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_ID_LEN];
int16_t orderIndex;
int16_t orderType; // used in group by xx order by xxx
......@@ -701,26 +702,26 @@ typedef struct {
} SMetricMeta;
typedef struct SMeterMeta {
char tableId[TSDB_TABLE_ID_LEN]; // note: This field must be at the front
int32_t contLen;
uint8_t numOfTags : 6;
uint8_t precision : 2;
uint8_t tableType : 4;
uint8_t index : 4; // used locally
int16_t numOfColumns;
int16_t rowSize; // used locally, calculated in client
int16_t sversion;
SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int32_t sid;
int32_t vgid;
uint64_t uid;
SSchema schema[];
} SMeterMeta;
typedef struct SMultiMeterMeta {
char meterId[TSDB_TABLE_ID_LEN]; // note: This field must be at the front
SMeterMeta meta;
int32_t numOfTables;
int32_t contLen;
SMeterMeta metas[];
} SMultiMeterMeta;
typedef struct {
......
......@@ -28,12 +28,16 @@ extern "C" {
int32_t mgmtInitChildTables();
void mgmtCleanUpChildTables();
void * mgmtGetChildTable(char *tableId);
int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable);
int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent);
SChildTableObj* mgmtGetChildTable(char *tableId);
int8_t * mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup);
int8_t *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, SVgObj *pVgroup);
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp);
#ifdef __cplusplus
}
......
......@@ -26,12 +26,16 @@ extern "C" {
int32_t mgmtInitNormalTables();
void mgmtCleanUpNormalTables();
void * mgmtGetNormalTable(char *tableId);
int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable);
int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName);
SNormalTableObj* mgmtGetNormalTable(char *tableId);
int8_t * mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
int8_t *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable);
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp);
#ifdef __cplusplus
}
......
......@@ -26,11 +26,15 @@ extern "C" {
int32_t mgmtInitStreamTables();
void mgmtCleanUpStreamTables();
void * mgmtGetStreamTable(char *tableId);
int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgroup, int32_t sid);
int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable);
int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter);
SStreamTableObj* mgmtGetStreamTable(char *tableId);
int8_t * mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup);
int8_t *mgmtBuildCreateStreamTableMsg(SStreamTableObj *pTable, SVgObj *pVgroup);
int32_t mgmtGetStreamTableMeta(SDbObj *pDb, SStreamTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp);
#ifdef __cplusplus
}
......
......@@ -28,16 +28,24 @@ extern "C" {
int32_t mgmtInitSuperTables();
void mgmtCleanUpSuperTables();
void * mgmtGetSuperTable(char *tableId);
int32_t mgmtGetShowSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate);
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable);
SSuperTableObj* mgmtGetSuperTable(char *tableId);
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags);
int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName);
int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName);
int32_t mgmtAddSuperTableColumn(SSuperTableObj *pTable, SSchema schema[], int32_t ncols);
int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *colName);
int32_t mgmtGetTagsLength(SSuperTableObj* pSuperTable, int32_t col);
int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp);
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
int32_t mgmtGetTagsLength(SSuperTableObj *pSuperTable, int32_t col);
#ifdef __cplusplus
}
......
......@@ -28,20 +28,18 @@ extern "C" {
int32_t mgmtInitTables();
STableInfo* mgmtGetTable(char *tableId);
STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, SMeterMeta *pMeta, bool usePublicIp);
int32_t mgmtRetrieveMetricMeta(void *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate);
int32_t mgmtDropTable(SDbObj *pDb, char *meterId, int32_t ignore);
int32_t mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter);
int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtGetShowTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
void mgmtCleanUpMeters();
void mgmtAddTableIntoSuperTable(SSuperTableObj *pStable);
void mgmtRemoveTableFromSuperTable(SSuperTableObj *pStable);
int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn);
#ifdef __cplusplus
......
......@@ -340,8 +340,8 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
return 0;
}
SChildTableObj* mgmtGetChildTable(char *tableId) {
return (SChildTableObj *)sdbGetRow(tsChildTableSdb, tableId);
void* mgmtGetChildTable(char *tableId) {
return sdbGetRow(tsChildTableSdb, tableId);
}
int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) {
......@@ -392,3 +392,30 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName
return 0;
}
int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp) {
pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
pMeta->vgid = htonl(pTable->vgId);
pMeta->sversion = htons(pTable->superTable->sversion);
pMeta->precision = pDb->cfg.precision;
pMeta->numOfTags = pTable->superTable->numOfTags;
pMeta->numOfColumns = htons(pTable->superTable->numOfColumns);
pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(SMeterMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_INVALID_TABLE;
}
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -357,8 +357,8 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
return 0;
}
SNormalTableObj* mgmtGetNormalTable(char *tableId) {
return (SNormalTableObj *)sdbGetRow(tsNormalTableSdb, tableId);
void* mgmtGetNormalTable(char *tableId) {
return sdbGetRow(tsNormalTableSdb, tableId);
}
static int32_t mgmtFindNormalTableColumnIndex(SNormalTableObj *pTable, char *colName) {
......@@ -442,3 +442,45 @@ int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName)
return TSDB_CODE_SUCCESS;
}
static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SNormalTableObj *pTable) {
int32_t numOfCols = pTable->numOfColumns;
for (int32_t i = 0; i < numOfCols; ++i) {
strcpy(pSchema->name, pTable->schema[i].name);
pSchema->type = pTable->schema[i].type;
pSchema->bytes = htons(pTable->schema[i].bytes);
pSchema->colId = htons(pTable->schema[i].colId);
pSchema++;
}
return numOfCols * sizeof(SSchema);
}
int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp) {
pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
pMeta->vgid = htonl(pTable->vgId);
pMeta->sversion = htons(pTable->sversion);
pMeta->precision = pDb->cfg.precision;
pMeta->numOfTags = 0;
pMeta->numOfColumns = htons(pTable->numOfColumns);
pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(SMeterMeta) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_INVALID_TABLE;
}
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -24,17 +24,22 @@
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtBalance.h"
#include "mgmtChildTable.h"
#include "mgmtConn.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
#include "mgmtNormalTable.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtStreamTable.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
#define MAX_LEN_OF_METER_META (sizeof(SMultiMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN)
typedef int32_t (*GetMateFp)(SMeterMeta *pMeta, SShowObj *pShow, void *pConn);
......@@ -113,407 +118,135 @@ void mgmtCleanUpShell() {
}
}
static void mgmtSetSchemaFromMeters(SSchema *pSchema, STabObj *pMeterObj, uint32_t numOfCols) {
SSchema *pMeterSchema = (SSchema *)(pMeterObj->schema);
for (int32_t i = 0; i < numOfCols; ++i) {
strcpy(pSchema->name, pMeterSchema[i].name);
pSchema->type = pMeterSchema[i].type;
pSchema->bytes = htons(pMeterSchema[i].bytes);
pSchema->colId = htons(pMeterSchema[i].colId);
pSchema++;
int32_t mgmtProcessMeterMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
SRpcConnInfo connInfo;
rpcGetConnInfo(ahandle, &connInfo);
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
SUserObj *pUser = mgmtGetUser(connInfo.user);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
return TSDB_CODE_INVALID_USER;
}
}
static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMeterObj) {
SSchema *pTagSchema = (SSchema *)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema));
STableInfoMsg *pInfo = pCont;
pInfo->createFlag = htons(pInfo->createFlag);
SDbObj *pDb = mgmtGetDb(pInfo->db);
if (pDb == NULL || pDb->dropStatus != TSDB_DB_STATUS_READY) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_DB, NULL, 0);
return TSDB_CODE_INVALID_DB;
}
char *tagVal = pMeterObj->pTagData + TSDB_TABLE_ID_LEN; // tag start position
STableInfo *pTable = mgmtGetTable(pInfo->tableId);
uint32_t tagsLen = 0;
for (int32_t i = 0; i < pMetric->numOfTags; ++i) {
tagsLen += pTagSchema[i].bytes;
// on demand create table from super table if meter does not exists
if (pTable == NULL && pInfo->createFlag == 1) {
// write operation needs to redirect to master mnode
if (mgmtCheckRedirectMsg(ahandle) != 0) {
return TSDB_CODE_REDIRECT;
}
memcpy(pTags, tagVal, tagsLen);
return tagsLen;
}
SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData));
if (pCreateMsg == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
int32_t mgmtProcessMeterMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
// SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
// STabObj * pMeterObj = NULL;
// SVgObj * pVgroup = NULL;
// SMeterMeta * pMeta = NULL;
// SSchema * pSchema = NULL;
// STaosRsp * pRsp = NULL;
// char * pStart = NULL;
//
// pInfo->createFlag = htons(pInfo->createFlag);
//
// int32_t size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS +
// sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN + TSDB_EXTRA_PAYLOAD_SIZE;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
//
// // todo db check should be extracted
// if (pDb == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) {
//
// if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
// return 0;
// }
//
// pRsp->code = TSDB_CODE_INVALID_DB;
// pMsg++;
//
// goto _exit_code;
// }
//
// pMeterObj = mgmtGetTable(pInfo->meterId);
//
// // on demand create table from super table if meter does not exists
// if (pMeterObj == NULL && pInfo->createFlag == 1) {
// // write operation needs to redirect to master mnode
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_TABLE_META_RSP) != 0) {
// return 0;
// }
//
// SCreateTableMsg *pCreateMsg = calloc(1, sizeof(SCreateTableMsg) + sizeof(STagData));
// if (pCreateMsg == NULL) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
// return 0;
// }
//
// memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
// strcpy(pCreateMsg->meterId, pInfo->meterId);
//
// SDbObj* pMeterDb = mgmtGetDbByTableId(pCreateMsg->meterId);
// mTrace("table:%s, pConnDb:%p, pConnDbName:%s, pMeterDb:%p, pMeterDbName:%s",
// pCreateMsg->meterId, pDb, pDb->name, pMeterDb, pMeterDb->name);
// assert(pDb == pMeterDb);
//
// int32_t code = mgmtCreateTable(pDb, pCreateMsg);
//
// char stableName[TSDB_TABLE_ID_LEN] = {0};
// strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN);
// mTrace("table:%s is automatically created by %s from %s, code:%d", pCreateMsg->meterId, pConn->pUser->user,
// stableName, code);
//
// tfree(pCreateMsg);
//
// if (code != TSDB_CODE_SUCCESS) {
// if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
// return 0;
// }
//
// pRsp->code = code;
// pMsg++;
//
// goto _exit_code;
// }
//
// pMeterObj = mgmtGetTable(pInfo->meterId);
// }
//
// if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
// return 0;
// }
//
// if (pMeterObj == NULL) {
// if (pDb)
// pRsp->code = TSDB_CODE_INVALID_TABLE;
// else
// pRsp->code = TSDB_CODE_DB_NOT_SELECTED;
// pMsg++;
// } else {
// mTrace("%s, uid:%" PRIu64 " meter meta is retrieved", pInfo->meterId, pMeterObj->uid);
// pRsp->code = 0;
// pMsg += sizeof(STaosRsp);
// *pMsg = TSDB_IE_TYPE_META;
// pMsg++;
//
// pMeta = (SMeterMeta *)pMsg;
// pMeta->uid = htobe64(pMeterObj->uid);
// pMeta->sid = htonl(pMeterObj->gid.sid);
// pMeta->vgid = htonl(pMeterObj->gid.vgId);
// pMeta->sversion = htons(pMeterObj->sversion);
//
// pMeta->precision = pDb->cfg.precision;
//
// pMeta->numOfTags = pMeterObj->numOfTags;
// pMeta->numOfColumns = htons(pMeterObj->numOfColumns);
// pMeta->tableType = pMeterObj->tableType;
//
// pMsg += sizeof(SMeterMeta);
// pSchema = (SSchema *)pMsg; // schema locates at the end of SMeterMeta struct
//
// if (mgmtTableCreateFromSuperTable(pMeterObj)) {
// assert(pMeterObj->numOfTags == 0);
//
// STabObj *pMetric = mgmtGetTable(pMeterObj->pTagData);
// uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns;
//
// pMeta->numOfTags = pMetric->numOfTags; // update the numOfTags info
// mgmtSetSchemaFromMeters(pSchema, pMetric, numOfTotalCols);
// pMsg += numOfTotalCols * sizeof(SSchema);
//
// // for meters created from metric, we need the metric tag schema to parse the tag data
// int32_t tagsLen = mgmtSetMeterTagValue(pMsg, pMetric, pMeterObj);
// pMsg += tagsLen;
// } else {
// /*
// * for metrics, or meters that are not created from metric, set the schema directly
// * for meters created from metric, we use the schema of metric instead
// */
// uint32_t numOfTotalCols = (uint32_t)pMeterObj->numOfTags + pMeterObj->numOfColumns;
// mgmtSetSchemaFromMeters(pSchema, pMeterObj, numOfTotalCols);
// pMsg += numOfTotalCols * sizeof(SSchema);
// }
//
// if (mgmtIsNormalTable(pMeterObj)) {
// pVgroup = mgmtGetVgroup(pMeterObj->gid.vgId);
// if (pVgroup == NULL) {
// pRsp->code = TSDB_CODE_INVALID_TABLE;
// goto _exit_code;
// }
// for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
// if (pConn->usePublicIp) {
// pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
// pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
// } else {
// pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
// pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
// }
// }
// }
// }
//
//_exit_code:
// msgLen = pMsg - pStart;
//
// taosSendMsgToPeer(pConn->thandle, pStart, msgLen);
//
// return msgLen;
return 0;
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
strcpy(pCreateMsg->tableId, pInfo->tableId);
int32_t code = mgmtCreateTable(pDb, pCreateMsg);
char stableName[TSDB_TABLE_ID_LEN] = {0};
strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN);
mTrace("table:%s is auto created by %s from %s, code:%d", pCreateMsg->tableId, pUser->user, stableName, code);
tfree(pCreateMsg);
if (code != TSDB_CODE_SUCCESS) {
rpcSendResponse(ahandle, code, NULL, 0);
return code;
}
pTable = mgmtGetTable(pInfo->tableId);
}
if (pTable == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_TABLE, NULL, 0);
return TSDB_CODE_INVALID_TABLE;
}
SMeterMeta *pMeta = rpcMallocCont(sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS);
int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp);
if (code == TSDB_CODE_SUCCESS) {
rpcFreeCont(pMeta);
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, NULL, 0);
} else {
pMeta->contLen = htons(pMeta->contLen);
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pMeta, pMeta->contLen);
}
return TSDB_CODE_SUCCESS;
}
/**
* multi meter meta rsp pkg format:
* | STaosRsp | ieType | SMultiMeterInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2
* 1B 1B 4B
*
* | STaosHeader | STaosRsp | ieType | SMultiMeterInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | ......................|
* ^ ^ ^
* |<--------------------------------------size-----------------------------------------------|---------------------->|
* | | |
* pStart pCurMeter pTail
**/
int32_t mgmtProcessMultiMeterMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
// SDbObj * pDbObj = NULL;
// STabObj * pMeterObj = NULL;
// SVgObj * pVgroup = NULL;
// SMultiMeterMeta * pMeta = NULL;
// SSchema * pSchema = NULL;
// STaosRsp * pRsp = NULL;
// char * pStart = NULL;
//
// SMultiMeterInfoMsg * pInfo = (SMultiMeterInfoMsg *)pMsg;
// char * str = pMsg + sizeof(SMultiMeterInfoMsg);
// pInfo->numOfMeters = htonl(pInfo->numOfMeters);
//
// int32_t size = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice
//
// char *pNewMsg;
// if ((pStart = mgmtForMultiAllocMsg(pConn, size, &pNewMsg, &pRsp)) == NULL) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
// return 0;
// }
//
// int32_t totalNum = 0;
// char tblName[TSDB_TABLE_ID_LEN];
// char* nextStr;
//
// char* pCurMeter = pStart + sizeof(STaosRsp) + sizeof(SMultiMeterInfoMsg) + 1; // 1: ie type byte
// char* pTail = pStart + size;
//
// while (str - pMsg < msgLen) {
// nextStr = strchr(str, ',');
// if (nextStr == NULL) {
// break;
// }
//
// memcpy(tblName, str, nextStr - str);
// tblName[nextStr - str] = '\0';
// str = nextStr + 1;
//
// // judge whether the remaining memory is adequate
// if ((pTail - pCurMeter) < MAX_LEN_OF_METER_META) {
// char* pMsgHdr = pStart - sizeof(STaosHeader);
// size *= 2;
// pMsgHdr = (char*)realloc(pMsgHdr, size);
// if (NULL == pMsgHdr) {
// char* pTmp = pStart - sizeof(STaosHeader);
// tfree(pTmp);
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_MULTI_TABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
// break;
// }
//
// pCurMeter = (char*)pMsgHdr + sizeof(STaosHeader) + (pCurMeter - pStart);
// pStart = (char*)pMsgHdr + sizeof(STaosHeader);
// pNewMsg = pStart;
// pRsp = (STaosRsp *)pStart;
// pTail = pMsgHdr + size;
// }
//
// // get meter schema, and fill into resp payload
// pMeterObj = mgmtGetTable(tblName);
// pDbObj = mgmtGetDbByTableId(tblName);
//
// if (pMeterObj == NULL || (pDbObj == NULL)) {
// continue;
// } else {
// mTrace("%s, uid:%" PRIu64 " sversion:%d meter meta is retrieved", tblName, pMeterObj->uid, pMeterObj->sversion);
// pMeta = (SMultiMeterMeta *)pCurMeter;
//
// memcpy(pMeta->meterId, tblName, strlen(tblName));
// pMeta->meta.uid = htobe64(pMeterObj->uid);
// pMeta->meta.sid = htonl(pMeterObj->gid.sid);
// pMeta->meta.vgid = htonl(pMeterObj->gid.vgId);
// pMeta->meta.sversion = htons(pMeterObj->sversion);
// pMeta->meta.precision = pDbObj->cfg.precision;
// pMeta->meta.numOfTags = pMeterObj->numOfTags;
// pMeta->meta.numOfColumns = htons(pMeterObj->numOfColumns);
// pMeta->meta.tableType = pMeterObj->tableType;
//
// pCurMeter += sizeof(SMultiMeterMeta);
// pSchema = (SSchema *)pCurMeter; // schema locates at the end of SMeterMeta struct
//
// if (mgmtTableCreateFromSuperTable(pMeterObj)) {
// assert(pMeterObj->numOfTags == 0);
//
// STabObj *pMetric = mgmtGetTable(pMeterObj->pTagData);
// uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns;
//
// pMeta->meta.numOfTags = pMetric->numOfTags; // update the numOfTags info
// mgmtSetSchemaFromMeters(pSchema, pMetric, numOfTotalCols);
// pCurMeter += numOfTotalCols * sizeof(SSchema);
//
// // for meters created from metric, we need the metric tag schema to parse the tag data
// int32_t tagsLen = mgmtSetMeterTagValue(pCurMeter, pMetric, pMeterObj);
// pCurMeter += tagsLen;
// } else {
// /*
// * for metrics, or meters that are not created from metric, set the schema directly
// * for meters created from metric, we use the schema of metric instead
// */
// uint32_t numOfTotalCols = (uint32_t)pMeterObj->numOfTags + pMeterObj->numOfColumns;
// mgmtSetSchemaFromMeters(pSchema, pMeterObj, numOfTotalCols);
// pCurMeter += numOfTotalCols * sizeof(SSchema);
// }
//
// if (mgmtIsNormalTable(pMeterObj)) {
// pVgroup = mgmtGetVgroup(pMeterObj->gid.vgId);
// if (pVgroup == NULL) {
// pRsp->code = TSDB_CODE_INVALID_TABLE;
// pNewMsg++;
// mError("%s, uid:%" PRIu64 " sversion:%d vgId:%d pVgroup is NULL", tblName, pMeterObj->uid, pMeterObj->sversion,
// pMeterObj->gid.vgId);
// goto _error_exit_code;
// }
//
// for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
// if (pConn->usePublicIp) {
// pMeta->meta.vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
// pMeta->meta.vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
// } else {
// pMeta->meta.vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
// pMeta->meta.vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
// }
// }
// }
// }
//
// totalNum++;
// if (totalNum > pInfo->numOfMeters) {
// pNewMsg++;
// break;
// }
// }
//
// // fill rsp code, ieType
// msgLen = pCurMeter - pNewMsg;
//
// pRsp->code = 0;
// pNewMsg += sizeof(STaosRsp);
// *pNewMsg = TSDB_IE_TYPE_META;
// pNewMsg++;
//
// SMultiMeterInfoMsg *pRspInfo = (SMultiMeterInfoMsg *)pNewMsg;
//
// pRspInfo->numOfMeters = htonl(totalNum);
// goto _exit_code;
//
//_error_exit_code:
// msgLen = pNewMsg - pStart;
//
//_exit_code:
// taosSendMsgToPeer(pConn->thandle, pStart, msgLen);
//
// return msgLen;
return 0;
}
SRpcConnInfo connInfo;
rpcGetConnInfo(ahandle, &connInfo);
int32_t mgmtProcessMetricMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
// SSuperTableMetaMsg *pSuperTableMetaMsg = (SSuperTableMetaMsg *)pMsg;
// STabObj * pMetric;
// STaosRsp * pRsp;
// char * pStart;
//
// pSuperTableMetaMsg->numOfMeters = htonl(pSuperTableMetaMsg->numOfMeters);
//
// pSuperTableMetaMsg->join = htonl(pSuperTableMetaMsg->join);
// pSuperTableMetaMsg->joinCondLen = htonl(pSuperTableMetaMsg->joinCondLen);
//
// for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
// pSuperTableMetaMsg->metaElem[i] = htonl(pSuperTableMetaMsg->metaElem[i]);
// }
//
// SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)(((char *)pSuperTableMetaMsg) + pSuperTableMetaMsg->metaElem[0]);
// pMetric = mgmtGetTable(pElem->meterId);
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
//
// if (pMetric == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) {
// pStart = taosBuildRspMsg(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP);
// if (pStart == NULL) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
// return 0;
// }
//
// pMsg = pStart;
// pRsp = (STaosRsp *)pMsg;
// if (pDb)
// pRsp->code = TSDB_CODE_INVALID_TABLE;
// else
// pRsp->code = TSDB_CODE_DB_NOT_SELECTED;
// pMsg++;
//
// msgLen = pMsg - pStart;
// } else {
// msgLen = mgmtRetrieveMetricMeta(pConn, &pStart, pSuperTableMetaMsg);
// if (msgLen <= 0) {
// taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_STABLE_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
// return 0;
// }
// }
//
// taosSendMsgToPeer(pConn->thandle, pStart, msgLen);
//
// return msgLen;
return 0;
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
SUserObj *pUser = mgmtGetUser(connInfo.user);
if (pUser == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
return TSDB_CODE_INVALID_USER;
}
SMultiTableInfoMsg *pInfo = pCont;
pInfo->numOfTables = htonl(pInfo->numOfTables);
int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice
SMultiMeterMeta *pMultiMeta = rpcMallocCont(totalMallocLen);
if (pMultiMeta == NULL) {
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
pMultiMeta->contLen = sizeof(SMultiMeterMeta);
pMultiMeta->numOfTables = 0;
for (int t = 0; t < pInfo->numOfTables; ++t) {
char *tableId = (char*)(pInfo->tableIds + t * TSDB_TABLE_ID_LEN);
STableInfo *pTable = mgmtGetTable(tableId);
if (pTable == NULL) continue;
SDbObj *pDb = mgmtGetDbByTableId(tableId);
if (pDb == NULL) continue;
int availLen = totalMallocLen - pMultiMeta->contLen;
if (availLen <= sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS) {
//TODO realloc
//totalMallocLen *= 2;
//pMultiMeta = rpcReMalloc(pMultiMeta, totalMallocLen);
//if (pMultiMeta == NULL) {
/// rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
// return TSDB_CODE_SERV_OUT_OF_MEMORY;
//} else {
// t--;
// continue;
//}
}
SMeterMeta *pMeta = (SMeterMeta *)(pMultiMeta->metas + pMultiMeta->contLen);
int32_t code = mgmtGetTableMeta(pDb, pTable, pMeta, usePublicIp);
if (code == TSDB_CODE_SUCCESS) {
pMultiMeta->numOfTables ++;
pMultiMeta->contLen += pMeta->contLen;
}
}
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pMultiMeta, pMultiMeta->contLen);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) {
......@@ -876,11 +609,11 @@ static void mgmtInitShowMsgFp() {
mgmtGetMetaFp[TSDB_MGMT_TABLE_ACCT] = mgmtGetAcctMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_USER] = mgmtGetUserMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_DB] = mgmtGetDbMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_TABLE] = mgmtGetTableMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_TABLE] = mgmtGetShowTableMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_DNODE] = mgmtGetDnodeMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_MNODE] = mgmtGetMnodeMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_VGROUP] = mgmtGetVgroupMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_METRIC] = mgmtGetSuperTableMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_METRIC] = mgmtGetShowSuperTableMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_MODULE] = mgmtGetModuleMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_QUERIES] = mgmtGetQueryMeta;
mgmtGetMetaFp[TSDB_MGMT_TABLE_STREAMS] = mgmtGetStreamMeta;
......@@ -894,11 +627,11 @@ static void mgmtInitShowMsgFp() {
mgmtRetrieveFp[TSDB_MGMT_TABLE_ACCT] = mgmtRetrieveAccts;
mgmtRetrieveFp[TSDB_MGMT_TABLE_USER] = mgmtRetrieveUsers;
mgmtRetrieveFp[TSDB_MGMT_TABLE_DB] = mgmtRetrieveDbs;
mgmtRetrieveFp[TSDB_MGMT_TABLE_TABLE] = mgmtRetrieveTables;
mgmtRetrieveFp[TSDB_MGMT_TABLE_TABLE] = mgmtRetrieveShowTables;
mgmtRetrieveFp[TSDB_MGMT_TABLE_DNODE] = mgmtRetrieveDnodes;
mgmtRetrieveFp[TSDB_MGMT_TABLE_MNODE] = mgmtRetrieveMnodes;
mgmtRetrieveFp[TSDB_MGMT_TABLE_VGROUP] = mgmtRetrieveVgroups;
mgmtRetrieveFp[TSDB_MGMT_TABLE_METRIC] = mgmtRetrieveSuperTables;
mgmtRetrieveFp[TSDB_MGMT_TABLE_METRIC] = mgmtRetrieveShowSuperTables;
mgmtRetrieveFp[TSDB_MGMT_TABLE_MODULE] = mgmtRetrieveModules;
mgmtRetrieveFp[TSDB_MGMT_TABLE_QUERIES] = mgmtRetrieveQueries;
mgmtRetrieveFp[TSDB_MGMT_TABLE_STREAMS] = mgmtRetrieveStreams;
......@@ -1084,9 +817,9 @@ int32_t mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) {
} else {
SDbObj *pDb = mgmtGetDb(pDrop->db);
if (pDb) {
code = mgmtDropTable(pDb, pDrop->meterId, pDrop->igNotExists);
code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists);
if (code == TSDB_CODE_SUCCESS) {
mTrace("table:%s is dropped by user:%s", pDrop->meterId, pUser->user);
mTrace("table:%s is dropped by user:%s", pDrop->tableId, pUser->user);
}
} else {
code = TSDB_CODE_DB_NOT_SELECTED;
......@@ -1310,14 +1043,14 @@ connect_over:
* check if we need to add mgmtProcessMeterMetaMsg into tranQueue, which will be executed one-by-one.
*/
static bool mgmtCheckMeterMetaMsgType(void *pMsg) {
SMeterInfoMsg *pInfo = (SMeterInfoMsg *) pMsg;
STableInfoMsg *pInfo = (STableInfoMsg *) pMsg;
int16_t autoCreate = htons(pInfo->createFlag);
STableInfo *pTable = mgmtGetTable(pInfo->meterId);
STableInfo *pTable = mgmtGetTable(pInfo->tableId);
// If table does not exists and autoCreate flag is set, we add the handler into task queue
bool addIntoTranQueue = (pTable == NULL && autoCreate == 1);
if (addIntoTranQueue) {
mTrace("table:%s auto created task added", pInfo->meterId);
mTrace("table:%s auto created task added", pInfo->tableId);
}
return addIntoTranQueue;
......@@ -1382,8 +1115,8 @@ void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_RETRIEVE] = mgmtProcessRetrieveMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_TABLE_META] = mgmtProcessMeterMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessMetricMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_MULTI_TABLE_META] = mgmtProcessMultiMeterMetaMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_STABLE_META] = mgmtProcessUnSupportMsg;
}
static int32_t mgmtCheckRedirectMsgImp(void *pConn) {
......
......@@ -379,6 +379,47 @@ int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) {
return 0;
}
SStreamTableObj* mgmtGetStreamTable(char *tableId) {
return (SStreamTableObj *)sdbGetRow(tsStreamTableSdb, tableId);
void* mgmtGetStreamTable(char *tableId) {
return sdbGetRow(tsStreamTableSdb, tableId);
}
static int32_t mgmtSetSchemaFromStreamTable(SSchema *pSchema, SStreamTableObj *pTable) {
int32_t numOfCols = pTable->numOfColumns;
for (int32_t i = 0; i < numOfCols; ++i) {
strcpy(pSchema->name, pTable->schema[i].name);
pSchema->type = pTable->schema[i].type;
pSchema->bytes = htons(pTable->schema[i].bytes);
pSchema->colId = htons(pTable->schema[i].colId);
pSchema++;
}
return numOfCols * sizeof(SSchema);
}
int32_t mgmtGetStreamTableMeta(SDbObj *pDb, SStreamTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp) {
pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
pMeta->vgid = htonl(pTable->vgId);
pMeta->sversion = htons(pTable->sversion);
pMeta->precision = pDb->cfg.precision;
pMeta->numOfTags = 0;
pMeta->numOfColumns = htons(pTable->numOfColumns);
pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(SMeterMeta) + mgmtSetSchemaFromStreamTable(pMeta->schema, pTable);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_INVALID_TABLE;
}
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -235,8 +235,8 @@ int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) {
return sdbDeleteRow(tsSuperTableSdb, pSuperTable);
}
SSuperTableObj* mgmtGetSuperTable(char *tableId) {
return (SSuperTableObj *)sdbGetRow(tsSuperTableSdb, tableId);
void* mgmtGetSuperTable(char *tableId) {
return sdbGetRow(tsSuperTableSdb, tableId);
}
int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) {
......@@ -457,7 +457,7 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) {
return TSDB_CODE_SUCCESS;
}
int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t mgmtGetShowSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
//
// SDbObj *pDb = NULL;
......@@ -509,7 +509,7 @@ int32_t mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
return 0;
}
int32_t mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
// char * pWrite;
// int32_t cols = 0;
......@@ -602,3 +602,45 @@ int32_t mgmtGetTagsLength(SSuperTableObj* pSuperTable, int32_t col) { // length
return len;
}
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) {
int32_t numOfCols = pTable->numOfColumns + pTable->numOfTags;
for (int32_t i = 0; i < numOfCols; ++i) {
strcpy(pSchema->name, pTable->schema[i].name);
pSchema->type = pTable->schema[i].type;
pSchema->bytes = htons(pTable->schema[i].bytes);
pSchema->colId = htons(pTable->schema[i].colId);
pSchema++;
}
return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema);
}
int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, SMeterMeta *pMeta, bool usePublicIp) {
pMeta->uid = htobe64(pTable->uid);
pMeta->sid = htonl(pTable->sid);
pMeta->vgid = htonl(pTable->vgId);
pMeta->sversion = htons(pTable->sversion);
pMeta->precision = pDb->cfg.precision;
pMeta->numOfTags = pTable->numOfTags;
pMeta->numOfColumns = htons(pTable->numOfColumns);
pMeta->tableType = pTable->type;
pMeta->contLen = sizeof(SMeterMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable);
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
if (pVgroup == NULL) {
return TSDB_CODE_INVALID_TABLE;
}
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
if (usePublicIp) {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip;
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -99,6 +99,24 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid) {
return NULL;
}
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, SMeterMeta *pMeta, bool usePublicIp) {
if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) {
mgmtGetChildTableMeta(pDb, (SChildTableObj *) pTable, pMeta, usePublicIp);
} else if (pTable->type == TSDB_TABLE_TYPE_STREAM_TABLE) {
mgmtGetStreamTableMeta(pDb, (SStreamTableObj *) pTable, pMeta, usePublicIp);
} else if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) {
mgmtGetNormalTableMeta(pDb, (SNormalTableObj *) pTable, pMeta, usePublicIp);
} else if (pTable->type == TSDB_TABLE_TYPE_SUPER_TABLE) {
mgmtGetSuperTableMeta(pDb, (SSuperTableObj *) pTable, pMeta, usePublicIp);
} else {
mTrace("%s, uid:%" PRIu64 " table meta retrieve failed, invalid type", pTable->tableId, pTable->uid);
return TSDB_CODE_INVALID_TABLE;
}
mTrace("%s, uid:%" PRIu64 " table meta is retrieved", pTable->tableId, pTable->uid);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
STableInfo *pTable = mgmtGetTable(pCreate->tableId);
if (pTable != NULL) {
......@@ -229,7 +247,7 @@ void mgmtCleanUpMeters() {
mgmtCleanUpSuperTables();
}
int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t mgmtGetShowTableMeta(SMeterMeta *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
//
// SDbObj *pDb = NULL;
......@@ -292,7 +310,7 @@ static void mgmtVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_
}
}
int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
// int32_t numOfRead = 0;
// int32_t cols = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册