提交 78812e6d 编写于 作者: S slguan

create super table

上级 a563fb8f
......@@ -139,7 +139,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols,
} else {
pNode->colId = -1;
pNode->pSchema->type = TSDB_DATA_TYPE_BINARY;
pNode->pSchema->bytes = TSDB_METER_NAME_LEN;
pNode->pSchema->bytes = TSDB_TABLE_NAME_LEN;
strcpy(pNode->pSchema->name, TSQL_TBNAME_L);
pNode->pSchema->colId = -1;
}
......
......@@ -293,7 +293,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
for (int32_t f = 1; f < pMeterMetaInfo->numOfTags; ++f) {
int16_t tagColumnIndex = pMeterMetaInfo->tagColumnIndex[f - 1];
if (tagColumnIndex == -1) {
vOffset[f] = vOffset[f - 1] + TSDB_METER_NAME_LEN;
vOffset[f] = vOffset[f - 1] + TSDB_TABLE_NAME_LEN;
} else {
vOffset[f] = vOffset[f - 1] + pSchema[tagColumnIndex].bytes;
}
......
......@@ -363,7 +363,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pToken->n > TSDB_METER_NAME_LEN) {
if (pToken->n > TSDB_TABLE_NAME_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -1054,12 +1054,12 @@ int32_t setObjFullName(char* fullName, const char* account, SSQLToken* pDB, SSQL
totalLen += 1;
/* here we only check the table name length limitation */
if (tableName->n > TSDB_METER_NAME_LEN) {
if (tableName->n > TSDB_TABLE_NAME_LEN) {
return TSDB_CODE_INVALID_SQL;
}
} else { // pDB == NULL, the db prefix name is specified in tableName
/* the length limitation includes tablename + dbname + sep */
if (tableName->n > TSDB_METER_NAME_LEN + TSDB_DB_NAME_LEN + tListLen(TS_PATH_DELIMITER)) {
if (tableName->n > TSDB_TABLE_NAME_LEN + TSDB_DB_NAME_LEN + tListLen(TS_PATH_DELIMITER)) {
return TSDB_CODE_INVALID_SQL;
}
}
......@@ -1361,7 +1361,7 @@ int32_t addProjectionExprAndResultField(SQueryInfo* pQueryInfo, tSQLExprItem* pI
}
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_METER_NAME_LEN};
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN};
strcpy(colSchema.name, TSQL_TBNAME_L);
pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY;
......@@ -2085,7 +2085,7 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
if (pCmd->payloadLen > TSDB_METER_NAME_LEN) {
if (pCmd->payloadLen > TSDB_TABLE_NAME_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
}
......@@ -2679,7 +2679,7 @@ static int32_t tablenameListToString(tSQLExpr* pExpr, /*char* str*/ SStringBuild
taosStringBuilderAppendString(sb, TBNAME_LIST_SEP);
}
if (pSub->val.nLen <= 0 || pSub->val.nLen > TSDB_METER_NAME_LEN) {
if (pSub->val.nLen <= 0 || pSub->val.nLen > TSDB_TABLE_NAME_LEN) {
return TSDB_CODE_INVALID_SQL;
}
}
......@@ -4951,7 +4951,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SQueryInfo* pQueryInfo) {
int16_t colIndex = pColIndex->colIdx;
if (pColIndex->colIdx == TSDB_TBNAME_COLUMN_INDEX) {
type = TSDB_DATA_TYPE_BINARY;
bytes = TSDB_METER_NAME_LEN;
bytes = TSDB_TABLE_NAME_LEN;
name = TSQL_TBNAME_L;
} else {
colIndex = (TSDB_COL_IS_TAG(pColIndex->flag)) ? pMeterMetaInfo->pMeterMeta->numOfColumns + pColIndex->colIdx
......
......@@ -84,7 +84,7 @@ struct SSchema* tsGetColumnSchema(STableMeta* pMeta, int32_t startCol) {
}
struct SSchema tsGetTbnameColumnSchema() {
struct SSchema s = {.colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_METER_NAME_LEN};
struct SSchema s = {.colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN};
strcpy(s.name, TSQL_TBNAME_L);
return s;
......
......@@ -1575,7 +1575,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
if (pMeterMetaInfo->tagColumnIndex[j] == TSDB_TBNAME_COLUMN_INDEX) {
SSchema tbSchema = {
.bytes = TSDB_METER_NAME_LEN, .colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY};
.bytes = TSDB_TABLE_NAME_LEN, .colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY};
memcpy(pMsg, &tbSchema, sizeof(SSchema));
} else {
memcpy(pMsg, &pTagSchema[pMeterMetaInfo->tagColumnIndex[j]], sizeof(SSchema));
......
......@@ -101,40 +101,8 @@ typedef struct {
int32_t vgId; // vnode group ID
} STableGid;
typedef struct _tab_obj {
char tableId[TSDB_TABLE_ID_LEN + 1];
uint64_t uid;
STableGid gid;
int32_t sversion; // schema version
int64_t createdTime;
int32_t numOfTags; // for metric
int32_t numOfTables; // for metric
int32_t numOfColumns;
int32_t schemaSize;
short nextColId;
char tableType : 4;
char status : 3;
char isDirty : 1; // if the table change tag column 1 value
char reserved[15];
char updateEnd[1];
pthread_rwlock_t rwLock;
tSkipList * pSkipList;
struct _tab_obj *pHead; // for metric, a link list for all meters created
// according to this metric
char *pTagData; // TSDB_TABLE_ID_LEN(metric_name)+
// tags_value1/tags_value2/tags_value3
struct _tab_obj *prev, *next;
char * pSql; // pointer to SQL, for SC, null-terminated string
char * pReserve1;
char * pReserve2;
char * schema;
// SSchema schema[];
} STabObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
uint64_t uid;
int32_t sid;
......@@ -142,38 +110,40 @@ typedef struct {
int64_t createdTime;
} STableInfo;
struct _vg_obj;
typedef struct SSuperTableObj {
char tableId[TSDB_TABLE_ID_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
int32_t sversion;
int32_t numOfTables;
int32_t numOfColumns;
int32_t numOfTags;
int8_t reserved[7];
int8_t updateEnd[1];
int32_t numOfTables;
int16_t nextColId;
SSchema *schema;
SSchema *schema;
} SSuperTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
uint64_t uid;
int32_t sid;
int32_t vgId;
int64_t createdTime;
char superTableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN];
int8_t reserved[7];
int8_t updateEnd[1];
SSuperTableObj *superTable;
} SChildTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
uint64_t uid;
int32_t sid;
......@@ -188,7 +158,7 @@ typedef struct {
} SNormalTableObj;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char tableId[TSDB_TABLE_ID_LEN];
int8_t type;
uint64_t uid;
int32_t sid;
......@@ -224,24 +194,20 @@ typedef struct _vg_obj {
} SVgObj;
typedef struct _db_obj {
/*
* this length will cause the storage structure to change, rollback
*/
char name[TSDB_DB_NAME_LEN + 1];
char name[TSDB_DB_NAME_LEN];
int64_t createdTime;
SDbCfg cfg;
int32_t numOfVgroups;
int32_t numOfTables;
int32_t numOfMetrics;
uint8_t vgStatus;
uint8_t dropStatus;
int8_t dropStatus;
char reserved[16];
char updateEnd[1];
struct _db_obj *prev, *next;
SVgObj * pHead; // empty vgroup first
SVgObj * pTail; // empty vgroup end
void * vgTimer;
int32_t numOfVgroups;
int32_t numOfTables;
int32_t numOfSuperTables;
int32_t vgStatus;
SVgObj *pHead; // empty vgroup first
SVgObj *pTail; // empty vgroup end
void * vgTimer;
} SDbObj;
struct _acctObj;
......
......@@ -86,7 +86,7 @@ extern "C" {
#define TS_PATH_DELIMITER_LEN 1
#define TSDB_METER_ID_LEN_MARGIN 10
#define TSDB_TABLE_ID_LEN (TSDB_DB_NAME_LEN+TSDB_METER_NAME_LEN+2*TS_PATH_DELIMITER_LEN+TSDB_USERID_LEN+TSDB_METER_ID_LEN_MARGIN) //TSDB_DB_NAME_LEN+TSDB_METER_NAME_LEN+2*strlen(TS_PATH_DELIMITER)+strlen(USERID)
#define TSDB_TABLE_ID_LEN (TSDB_DB_NAME_LEN+TSDB_TABLE_NAME_LEN+2*TS_PATH_DELIMITER_LEN+TSDB_USERID_LEN+TSDB_METER_ID_LEN_MARGIN) //TSDB_DB_NAME_LEN+TSDB_TABLE_NAME_LEN+2*strlen(TS_PATH_DELIMITER)+strlen(USERID)
#define TSDB_UNI_LEN 24
#define TSDB_USER_LEN TSDB_UNI_LEN
#define TSDB_ACCT_LEN TSDB_UNI_LEN
......@@ -95,7 +95,7 @@ extern "C" {
#define TSDB_MAX_COLUMNS 256
#define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns
#define TSDB_METER_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 64
#define TSDB_DB_NAME_LEN 32
#define TSDB_COL_NAME_LEN 64
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 16
......
......@@ -116,8 +116,8 @@ typedef struct {
} SDbInfo;
typedef struct {
char name[TSDB_METER_NAME_LEN + 1];
char metric[TSDB_METER_NAME_LEN + 1];
char name[TSDB_TABLE_NAME_LEN + 1];
char metric[TSDB_TABLE_NAME_LEN + 1];
} STableRecord;
typedef struct {
......
......@@ -42,6 +42,11 @@ int32_t mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists);
int32_t mgmtDropDb(SDbObj *pDb);
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
void mgmtAddSuperTableIntoDb(SDbObj *pDb);
void mgmtRemoveSuperTableFromDb(SDbObj *pDb);
void mgmtAddTableIntoDb(SDbObj *pDb);
void mgmtRemoveTableFromDb(SDbObj *pDb);
#ifdef __cplusplus
}
#endif
......
......@@ -221,6 +221,8 @@ int32_t mgmtInitChildTables() {
pNode = pLastNode;
continue;
}
mgmtAddTableIntoDb(pDb);
}
mgmtSetVgroupIdPool();
......@@ -305,12 +307,12 @@ int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVgr
}
mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1);
mgmtSendCreateTableMsg(pTable, pVgroup);
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
mgmtAddTableIntoDb(pDb);
return 0;
}
......@@ -337,6 +339,7 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
mgmtDropVgroup(pDb, pVgroup);
}
mgmtRemoveTableFromDb(pDb);
return 0;
}
......
......@@ -70,7 +70,7 @@ int mgmtGetConns(SShowObj *pShow, void *pConn) {
int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int cols = 0;
pShow->bytes[cols] = TSDB_METER_NAME_LEN;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
SSchema *pSchema = tsGetSchema(pMeta);
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
......
......@@ -86,7 +86,7 @@ int32_t mgmtInitDbs() {
pDb->next = NULL;
pDb->numOfTables = 0;
pDb->numOfVgroups = 0;
pDb->numOfMetrics = 0;
pDb->numOfSuperTables = 0;
pDb->vgStatus = TSDB_VG_STATUS_READY;
pDb->vgTimer = NULL;
pAcct = mgmtGetAcct(pDb->cfg.acct);
......@@ -853,3 +853,18 @@ void *mgmtDbActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
tfree(row);
return NULL;
}
void mgmtAddSuperTableIntoDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfSuperTables, 1);
}
void mgmtRemoveSuperTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfSuperTables, -1);
}
void mgmtAddTableIntoDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, 1);
}
void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, -1);
}
......@@ -37,7 +37,7 @@
void mgmtProcessMsgFromDnode(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn);
int mgmtSendVPeersMsg(SVgObj *pVgroup);
char *mgmtBuildVpeersIe(char *pMsg, SVgObj *pVgroup, int vnode);
char *mgmtBuildCreateMeterIe(STabObj *pTable, char *pMsg, int vnode);
//char *mgmtBuildCreateMeterIe(STabObj *pTable, char *pMsg, int vnode);
extern void *tsDnodeMgmtQhandle;
void * mgmtStatusTimer = NULL;
......@@ -323,7 +323,7 @@ int mgmtSendRemoveMeterMsgToDnode(STableInfo *pTable, SVgObj *pVgroup) {
return 0;
}
int mgmtSendAlterStreamMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
int mgmtSendAlterStreamMsgToDnode(void *pTable, SVgObj *pVgroup) {
// SAlterStreamMsg *pAlter;
// char * pMsg, *pStart;
// int i, msgLen = 0;
......
......@@ -236,6 +236,7 @@ int32_t mgmtInitNormalTables() {
pNode = pLastNode;
continue;
}
mgmtAddTableIntoDb(pDb);
}
mgmtSetVgroupIdPool();
......@@ -326,6 +327,7 @@ int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
" db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
mgmtAddTableIntoDb(pDb);
return 0;
}
......@@ -354,6 +356,7 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
mgmtDropVgroup(pDb, pVgroup);
}
mgmtRemoveTableFromDb(pDb);
return 0;
}
......
......@@ -688,7 +688,7 @@ int32_t mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) {
SShowObj *pShow = (SShowObj *) calloc(1, sizeof(SShowObj) + htons(pShowMsg->payloadLen));
pShow->signature = pShow;
pShow->type = pShowMsg->type;
strcpy(pShow->db, pShow->db);
strcpy(pShow->db, pShowMsg->db);
mTrace("pShow:%p is allocated", pShow);
// set the table name query condition
......@@ -805,20 +805,12 @@ int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) {
SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb) {
code = mgmtCreateTable(pDb, pCreate);
if (code == TSDB_CODE_TABLE_ALREADY_EXIST) {
if (pCreate->igExists) {
code = TSDB_CODE_SUCCESS;
}
}
} else {
code = TSDB_CODE_DB_NOT_SELECTED;
}
}
if (code != TSDB_CODE_SUCCESS) {
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, NULL, 0);
}
rpcSendResponse(ahandle, code, NULL, 0);
return code;
}
......
......@@ -190,7 +190,7 @@ void *mgmtStreamTableActionDecode(void *row, char *str, int32_t size, int32_t *s
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(STabObj));
memset(pTable, 0, sizeof(SStreamTableObj));
int32_t tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
......@@ -251,6 +251,7 @@ int32_t mgmtInitStreamTables() {
pNode = pLastNode;
continue;
}
mgmtAddTableIntoDb(pDb);
}
mgmtSetVgroupIdPool();
......@@ -348,6 +349,7 @@ int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, SVgObj *pVg
" db:%s",
pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name);
mgmtAddTableIntoDb(pDb);
return 0;
}
......@@ -376,6 +378,7 @@ int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) {
mgmtDropVgroup(pDb, pVgroup);
}
mgmtRemoveTableFromDb(pDb);
return 0;
}
......
......@@ -34,9 +34,11 @@
#include "mgmtGrant.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
void *tsSuperTableSdb;
int32_t tsSuperTableUpdateSize;
void *(*mgmtSuperTableActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int32_t size, int32_t *ssize);
void *mgmtSuperTableActionInsert(void *row, char *str, int32_t size, int32_t *ssize);
......@@ -64,12 +66,11 @@ static void mgmtSuperTableActionInit() {
void *mgmtSuperTableActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SSuperTableObj *pTable = (SSuperTableObj *) row;
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
memcpy(pTable, str, tsize);
memcpy(pTable, str, tsDbUpdateSize);
int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags);
pTable->schema = realloc(pTable->schema, schemaSize);
memcpy(pTable->schema, str + tsize, schemaSize);
memcpy(pTable->schema, str + tsDbUpdateSize, schemaSize);
return NULL;
}
......@@ -96,17 +97,16 @@ void *mgmtSuperTableActionEncode(void *row, char *str, int32_t size, int32_t *ss
SSuperTableObj *pTable = (SSuperTableObj *) row;
assert(row != NULL && str != NULL);
int32_t tsize = pTable->updateEnd - (int8_t *) pTable;
int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags);
if (size < tsize + schemaSize + 1) {
if (size < tsSuperTableUpdateSize + schemaSize + 1) {
*ssize = -1;
return NULL;
}
memcpy(str, pTable, tsize);
memcpy(str + tsize, pTable->schema, schemaSize);
*ssize = tsize + schemaSize;
memcpy(str, pTable, tsSuperTableUpdateSize);
memcpy(str + tsSuperTableUpdateSize, pTable->schema, schemaSize);
*ssize = tsSuperTableUpdateSize + schemaSize;
return NULL;
}
......@@ -118,14 +118,13 @@ void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ss
if (pTable == NULL) {
return NULL;
}
memset(pTable, 0, sizeof(STabObj));
memset(pTable, 0, sizeof(SSuperTableObj));
int32_t tsize = pTable->updateEnd - (int8_t *)pTable;
if (size < tsize) {
if (size < tsSuperTableUpdateSize) {
mgmtDestroySuperTable(pTable);
return NULL;
}
memcpy(pTable, str, tsize);
memcpy(pTable, str, tsSuperTableUpdateSize);
int32_t schemaSize = sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags);
pTable->schema = malloc(schemaSize);
......@@ -134,7 +133,7 @@ void *mgmtSuperTableActionDecode(void *row, char *str, int32_t size, int32_t *ss
return NULL;
}
memcpy(pTable->schema, str + tsize, schemaSize);
memcpy(pTable->schema, str + tsSuperTableUpdateSize, schemaSize);
return (void *)pTable;
}
......@@ -151,8 +150,10 @@ int32_t mgmtInitSuperTables() {
SSuperTableObj * pTable = NULL;
mgmtSuperTableActionInit();
SSuperTableObj tObj;
tsSuperTableUpdateSize = tObj.updateEnd - (int8_t *)&tObj;
tsSuperTableSdb = sdbOpenTable(tsMaxTables, sizeof(STabObj) + sizeof(SSchema) * TSDB_MAX_COLUMNS,
tsSuperTableSdb = sdbOpenTable(tsMaxTables, tsSuperTableUpdateSize + sizeof(SSchema) * TSDB_MAX_COLUMNS,
"stables", SDB_KEYTYPE_STRING, tsMgmtDirectory, mgmtSuperTableAction);
if (tsSuperTableSdb == NULL) {
mError("failed to init super table data");
......@@ -173,7 +174,8 @@ int32_t mgmtInitSuperTables() {
pNode = pLastNode;
continue;
}
pTable->numOfTables = 0;
mgmtAddSuperTableIntoDb(pDb);
}
mgmtSetVgroupIdPool();
......@@ -183,6 +185,7 @@ int32_t mgmtInitSuperTables() {
}
void mgmtCleanUpSuperTables() {
sdbCloseTable(tsSuperTableSdb);
}
int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
......@@ -205,7 +208,6 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
pStable->sversion = 0;
pStable->numOfColumns = pCreate->numOfColumns;
pStable->numOfTags = pCreate->numOfTags;
pStable->numOfTables = 0;
int32_t numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
int32_t schemaSize = numOfCols * sizeof(SSchema);
......@@ -228,11 +230,14 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) {
return TSDB_CODE_SDB_ERROR;
}
return 0;
mgmtAddSuperTableIntoDb(pDb);
return TSDB_CODE_SUCCESS;
}
int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) {
//TODO drop all child tables
mgmtRemoveSuperTableFromDb(pDb);
return sdbDeleteRow(tsSuperTableSdb, pSuperTable);
}
......@@ -241,6 +246,7 @@ void* mgmtGetSuperTable(char *tableId) {
}
void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) {
//TODO get vgroup of dnodes
SSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum());
rsp->numOfDnodes = 1;
rsp->dnodeIps[0] = 0;
......@@ -358,7 +364,7 @@ int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pStable, char *oldTagN
strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN);
// Encode string
int32_t size = 1 + sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW;
int32_t size = 1 + sizeof(SSuperTableObj) + TSDB_MAX_BYTES_PER_ROW;
char *msg = (char *) malloc(size);
if (msg == NULL) return TSDB_CODE_APP_ERROR;
memset(msg, 0, size);
......@@ -466,129 +472,122 @@ int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) {
}
int32_t mgmtGetShowSuperTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
//
// if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
//
// SSchema *pSchema = tsGetSchema(pMeta);
//
// pShow->bytes[cols] = TSDB_METER_NAME_LEN;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "name");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 8;
// pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
// strcpy(pSchema[cols].name, "created_time");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 2;
// pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
// strcpy(pSchema[cols].name, "columns");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 2;
// pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
// strcpy(pSchema[cols].name, "tags");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 4;
// pSchema[cols].type = TSDB_DATA_TYPE_INT;
// strcpy(pSchema[cols].name, "tables");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pMeta->numOfColumns = htons(cols);
// pShow->numOfColumns = cols;
//
// pShow->offset[0] = 0;
// for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
//
// pShow->numOfRows = pDb->numOfMetrics;
// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
SDbObj *pDb = mgmtGetDb(pShow->db);
if (pDb == NULL) {
return TSDB_CODE_DB_NOT_SELECTED;
}
int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tables");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = pDb->numOfSuperTables;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
// char * pWrite;
// int32_t cols = 0;
// SSuperTableObj *pTable = NULL;
// char prefix[20] = {0};
// int32_t prefixLen;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) {
// pDb = mgmtGetDb(pConn->pDb->name);
// }
//
// if (pDb == NULL) {
// return 0;
// }
//
// if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
// if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && strcmp(pConn->pUser->user, "monitor") != 0 ) {
// return 0;
// }
// }
//
// strcpy(prefix, pDb->name);
// strcat(prefix, TS_PATH_DELIMITER);
// prefixLen = strlen(prefix);
//
// SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
// char metricName[TSDB_METER_NAME_LEN] = {0};
//
// while (numOfRows < rows) {
// pTable = (SSuperTableObj *)pShow->pNode;
// if (pTable == NULL) break;
// //pShow->pNode = (void *)pTable->next;
//
// if (strncmp(pTable->tableId, prefix, prefixLen)) {
// continue;
// }
//
// memset(metricName, 0, tListLen(metricName));
// extractTableName(pTable->tableId, metricName);
//
// if (pShow->payloadLen > 0 &&
// patternMatch(pShow->payload, metricName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
// continue;
//
// cols = 0;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// extractTableName(pTable->tableId, pWrite);
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int64_t *)pWrite = pTable->createdTime;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int16_t *)pWrite = pTable->numOfColumns;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int16_t *)pWrite = pTable->numOfTags;
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int32_t *)pWrite = pTable->numOfTables;
// cols++;
//
// numOfRows++;
// }
//
// pShow->numOfReads += numOfRows;
char * pWrite;
int32_t cols = 0;
SSuperTableObj *pTable = NULL;
char prefix[20] = {0};
int32_t prefixLen;
SDbObj *pDb = mgmtGetDb(pShow->db);
if (pDb == NULL) return 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && strcmp(pUser->user, "monitor") != 0 ) {
return 0;
}
}
strcpy(prefix, pDb->name);
strcat(prefix, TS_PATH_DELIMITER);
prefixLen = strlen(prefix);
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char stableName[TSDB_TABLE_NAME_LEN] = {0};
while (numOfRows < rows) {
pShow->pNode = sdbFetchRow(tsSuperTableSdb, pShow->pNode, (void **) &pTable);
if (pTable == NULL) break;
if (strncmp(pTable->tableId, prefix, prefixLen)) {
continue;
}
memset(stableName, 0, tListLen(stableName));
extractTableName(pTable->tableId, stableName);
if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, stableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
continue;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strncpy(pWrite, stableName, TSDB_TABLE_NAME_LEN);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pTable->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pTable->numOfColumns;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pTable->numOfTags;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pTable->numOfTables;
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
......
......@@ -248,53 +248,48 @@ void mgmtCleanUpMeters() {
}
int32_t mgmtGetShowTableMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
//
// SDbObj *pDb = NULL;
// if (pConn->pDb != NULL) {
// pDb = mgmtGetDb(pConn->pDb->name);
// }
//
// if (pDb == NULL) {
// return TSDB_CODE_DB_NOT_SELECTED;
// }
//
// SSchema *pSchema = tsGetSchema(pMeta);
//
// pShow->bytes[cols] = TSDB_METER_NAME_LEN;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "table_name");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 8;
// pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
// strcpy(pSchema[cols].name, "created_time");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 2;
// pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
// strcpy(pSchema[cols].name, "columns");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = TSDB_METER_NAME_LEN;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "stable");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pMeta->numOfColumns = htons(cols);
// pShow->numOfColumns = cols;
//
// pShow->offset[0] = 0;
// for (int32_t i = 1; i < cols; ++i) {
// pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
// }
//
// pShow->numOfRows = pDb->numOfTables;
// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
SDbObj *pDb = mgmtGetDb(pShow->db);
if (pDb == NULL) {
return TSDB_CODE_DB_NOT_SELECTED;
}
int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "table_name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "stable");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = pDb->numOfTables;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
......@@ -382,7 +377,7 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
// continue;
// }
//
// char meterName[TSDB_METER_NAME_LEN] = {0};
// char meterName[TSDB_TABLE_NAME_LEN] = {0};
// memset(meterName, 0, tListLen(meterName));
// numOfRead++;
//
......@@ -390,14 +385,14 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
// extractTableName(tableId, meterName);
//
// if (pShow->payloadLen > 0 &&
// patternMatch(pShow->payload, meterName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH) {
// patternMatch(pShow->payload, meterName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) {
// continue;
// }
//
// cols = 0;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// strncpy(pWrite, meterName, TSDB_METER_NAME_LEN);
// strncpy(pWrite, meterName, TSDB_TABLE_NAME_LEN);
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
......@@ -312,7 +312,7 @@ void httpTrimTableName(char *name) {
for (int i = 0; name[i] != 0; i++) {
if (name[i] == ' ' || name[i] == ':' || name[i] == '.' || name[i] == '-' || name[i] == '/' || name[i] == '\'')
name[i] = '_';
if (i == TSDB_METER_NAME_LEN + 1) {
if (i == TSDB_TABLE_NAME_LEN + 1) {
name[i] = 0;
break;
}
......@@ -328,7 +328,7 @@ int httpShrinkTableName(HttpContext *pContext, int pos, char *name) {
len++;
}
if (len < TSDB_METER_NAME_LEN) {
if (len < TSDB_TABLE_NAME_LEN) {
return pos;
}
......
......@@ -199,7 +199,7 @@ void tgParseSchemaMetric(cJSON *metric) {
goto ParseEnd;
}
int nameLen = (int)strlen(field->valuestring);
if (nameLen == 0 || nameLen > TSDB_METER_NAME_LEN) {
if (nameLen == 0 || nameLen > TSDB_TABLE_NAME_LEN) {
parsedOk = false;
goto ParseEnd;
}
......@@ -395,7 +395,7 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) {
httpSendErrorResp(pContext, HTTP_TG_METRIC_NAME_NULL);
return false;
}
if (nameLen >= TSDB_METER_NAME_LEN - 7) {
if (nameLen >= TSDB_TABLE_NAME_LEN - 7) {
httpSendErrorResp(pContext, HTTP_TG_METRIC_NAME_LONG);
return false;
}
......@@ -484,7 +484,7 @@ bool tgProcessSingleMetric(HttpContext *pContext, cJSON *metric, char *db) {
return false;
}
if (strlen(host->valuestring) >= TSDB_METER_NAME_LEN) {
if (strlen(host->valuestring) >= TSDB_TABLE_NAME_LEN) {
httpSendErrorResp(pContext, HTTP_TG_TABLE_SIZE);
return false;
}
......
......@@ -101,7 +101,7 @@ extern uint32_t cdebugFlag;
#define tscError(...) \
if (cdebugFlag & DEBUG_ERROR) { \
tprintf("ERROR TSC ", cdebugFlag, __VA_ARGS__); \
tprintf("ERROR TSC ", 255, __VA_ARGS__); \
}
#define tscWarn(...) \
if (cdebugFlag & DEBUG_WARN) { \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册