提交 0b0c1f28 编写于 作者: H hjxilinx

Merge branch '2.0' into liaohj_2

...@@ -30,10 +30,10 @@ extern "C" { ...@@ -30,10 +30,10 @@ extern "C" {
#include "tsclient.h" #include "tsclient.h"
#define UTIL_METER_IS_SUPERTABLE(metaInfo) \ #define UTIL_METER_IS_SUPERTABLE(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_TABLE_TYPE_SUPER_TABLE)) (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_SUPERTABLE(metaInfo))) #define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_SUPERTABLE(metaInfo)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \ #define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE)) (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_CHILD_TABLE))
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0) #define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
......
...@@ -79,8 +79,8 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type ...@@ -79,8 +79,8 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) { static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) {
STableMeta *pMeta = tscGetMeterMetaInfo(&pSql->cmd, 0, 0)->pMeterMeta; STableMeta *pMeta = tscGetMeterMetaInfo(&pSql->cmd, 0, 0)->pMeterMeta;
if (pMeta->tableType == TSDB_TABLE_TYPE_SUPER_TABLE || pMeta->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE || if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) { pMeta->tableType == TSDB_STREAM_TABLE) {
return 0; return 0;
} }
......
...@@ -2616,7 +2616,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) { ...@@ -2616,7 +2616,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
// int32_t tagLen = 0; // int32_t tagLen = 0;
// SSchema *pTagsSchema = tsGetTagSchema(pMeta); // SSchema *pTagsSchema = tsGetTagSchema(pMeta);
// //
// if (pMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) { // if (pMeta->tableType == TSDB_CHILD_TABLE) {
// for (int32_t i = 0; i < pMeta->numOfTags; ++i) { // for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
// tagLen += pTagsSchema[i].bytes; // tagLen += pTagsSchema[i].bytes;
// } // }
...@@ -2731,7 +2731,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { ...@@ -2731,7 +2731,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
int32_t tagLen = 0; int32_t tagLen = 0;
SSchema *pTagsSchema = tsGetTagSchema(pMeta); SSchema *pTagsSchema = tsGetTagSchema(pMeta);
if (pMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) { if (pMeta->tableType == TSDB_CHILD_TABLE) {
for (int32_t j = 0; j < pMeta->numOfTags; ++j) { for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
tagLen += pTagsSchema[j].bytes; tagLen += pTagsSchema[j].bytes;
} }
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include "taosdef.h" #include "taosdef.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
......
...@@ -87,18 +87,7 @@ int32_t dnodeInitMgmt() { ...@@ -87,18 +87,7 @@ int32_t dnodeInitMgmt() {
cfg.cfg.maxSessions = 1000; cfg.cfg.maxSessions = 1000;
cfg.cfg.daysPerFile = 10; cfg.cfg.daysPerFile = 10;
dnodeCreateVnode(&cfg); return dnodeOpenVnodes();
SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId);
dnodeDropVnode(pVnode);
// dnodeCreateVnode(&cfg);
// SVnodeObj *pVnode = dnodeGetVnode(cfg.cfg.vgId);
// dnodeCleanupVnodes();
dnodeOpenVnodes();
dnodeCleanupVnodes();
//return dnodeOpenVnodes();
} }
void dnodeCleanupMgmt() { void dnodeCleanupMgmt() {
...@@ -182,9 +171,9 @@ static int32_t dnodeOpenVnodes() { ...@@ -182,9 +171,9 @@ static int32_t dnodeOpenVnodes() {
int32_t vnode = atoi(de->d_name + 5); int32_t vnode = atoi(de->d_name + 5);
if (vnode == 0) continue; if (vnode == 0) continue;
char tsdbDir[TSDB_FILENAME_LEN]; char vnodeDir[TSDB_FILENAME_LEN];
sprintf(tsdbDir, "%s/%s", tsVnodeDir, de->d_name); sprintf(vnodeDir, "%s/%s", tsVnodeDir, de->d_name);
int32_t code = dnodeOpenVnode(vnode, tsdbDir); int32_t code = dnodeOpenVnode(vnode, vnodeDir);
if (code == 0) { if (code == 0) {
numOfVnodes++; numOfVnodes++;
} }
...@@ -203,9 +192,11 @@ static void dnodeCleanupVnodes() { ...@@ -203,9 +192,11 @@ static void dnodeCleanupVnodes() {
} }
static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
void *pTsdb = tsdbOpenRepo(rootDir); char tsdbDir[TSDB_FILENAME_LEN];
sprintf(tsdbDir, "%s/tsdb", rootDir);
void *pTsdb = tsdbOpenRepo(tsdbDir);
if (pTsdb == NULL) { if (pTsdb == NULL) {
dError("failed to open vnode:%d in dir:%s, reason:%s", vnode, rootDir, tstrerror(terrno)); dError("failed to open tsdb in vnode:%d %s, reason:%s", vnode, tsdbDir, tstrerror(terrno));
return terrno; return terrno;
} }
...@@ -260,7 +251,7 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) { ...@@ -260,7 +251,7 @@ static void dnodeCleanupVnode(SVnodeObj *pVnode) {
} }
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg = {0};
tsdbCfg.vgId = pVnodeCfg->cfg.vgId; tsdbCfg.vgId = pVnodeCfg->cfg.vgId;
tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->vnode; tsdbCfg.tsdbId = pVnodeCfg->vnode;
...@@ -273,9 +264,32 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -273,9 +264,32 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
char rootDir[TSDB_FILENAME_LEN] = {0}; char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
if (mkdir(rootDir, 0755) != 0) {
if (errno == EACCES) {
return TSDB_CODE_NO_DISK_PERMISSIONS;
} else if (errno == ENOSPC) {
return TSDB_CODE_SERV_NO_DISKSPACE;
} else if (errno == EEXIST) {
} else {
return TSDB_CODE_VG_INIT_FAILED;
}
}
sprintf(rootDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
if (mkdir(rootDir, 0755) != 0) {
if (errno == EACCES) {
return TSDB_CODE_NO_DISK_PERMISSIONS;
} else if (errno == ENOSPC) {
return TSDB_CODE_SERV_NO_DISKSPACE;
} else if (errno == EEXIST) {
} else {
return TSDB_CODE_VG_INIT_FAILED;
}
}
void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL); void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
if (pTsdb == NULL) { if (pTsdb == NULL) {
dError("failed to create tsdb in vnode:%d, reason:%s", pVnodeCfg->vnode, tstrerror(terrno));
return terrno; return terrno;
} }
...@@ -295,6 +309,7 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -295,6 +309,7 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj));
dPrint("vnode:%d is created", pVnodeCfg->vnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -324,13 +339,16 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -324,13 +339,16 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
dTrace("start to create vnode:%d", pCreate->vnode);
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId); SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
if (pVnodeObj != NULL) { if (pVnodeObj != NULL) {
rpcRsp.code = TSDB_CODE_SUCCESS; rpcRsp.code = TSDB_CODE_SUCCESS;
dPrint("vnode:%d is already exist", pCreate->vnode);
} else { } else {
rpcRsp.code = dnodeCreateVnode(pCreate); rpcRsp.code = dnodeCreateVnode(pCreate);
} }
rpcRsp.code = TSDB_CODE_SUCCESS;
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#include "tlog.h" #include "tlog.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "tsdb.h"
#include "dataformat.h"
#include "dnodeWrite.h" #include "dnodeWrite.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
...@@ -106,13 +108,13 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -106,13 +108,13 @@ void dnodeWrite(SRpcMsg *pMsg) {
while (leftLen > 0) { while (leftLen > 0) {
SWriteMsgHead *pHead = (SWriteMsgHead *) pCont; SWriteMsgHead *pHead = (SWriteMsgHead *) pCont;
int32_t vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
int32_t contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
void *pVnode = dnodeGetVnode(vgId); void *pVnode = dnodeGetVnode(pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
leftLen -= contLen; leftLen -= pHead->contLen;
pCont -= contLen; pCont -= pHead->contLen;
continue; continue;
} }
...@@ -120,7 +122,7 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -120,7 +122,7 @@ void dnodeWrite(SRpcMsg *pMsg) {
SWriteMsg writeMsg; SWriteMsg writeMsg;
writeMsg.rpcMsg = *pMsg; writeMsg.rpcMsg = *pMsg;
writeMsg.pCont = pCont; writeMsg.pCont = pCont;
writeMsg.contLen = contLen; writeMsg.contLen = pHead->contLen;
writeMsg.pRpcContext = pRpcContext; writeMsg.pRpcContext = pRpcContext;
writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later
...@@ -128,8 +130,8 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -128,8 +130,8 @@ void dnodeWrite(SRpcMsg *pMsg) {
taosWriteQitem(queue, &writeMsg); taosWriteQitem(queue, &writeMsg);
// next vnode // next vnode
leftLen -= contLen; leftLen -= pHead->contLen;
pCont -= contLen; pCont -= pHead->contLen;
queuedMsgNum++; queuedMsgNum++;
} }
...@@ -148,7 +150,7 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -148,7 +150,7 @@ void dnodeWrite(SRpcMsg *pMsg) {
void *dnodeAllocateWriteWorker() { void *dnodeAllocateWriteWorker() {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg)); taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg));
if (queue != NULL) return queue; if (queue == NULL) return NULL;
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset(); pWorker->qset = taosOpenQset();
...@@ -289,45 +291,68 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { ...@@ -289,45 +291,68 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
if (pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) { dTrace("start to create table:%s in vgroup:%d", pTable->tableId, pTable->vgId);
dTrace("table:%s, start to create child table, stable:%s", pTable->tableId, pTable->superTableId);
} else if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE){ SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
dTrace("table:%s, start to create normal table", pTable->tableId); void *pVnode = dnodeGetVnode(pTable->vgId);
} else if (pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE){ if (pVnode == NULL) {
dTrace("table:%s, start to create stream table", pTable->tableId); rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID;
} else { dTrace("failed to create table:%s in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
dError("table:%s, invalid table type:%d", pTable->tableType); rpcSendResponse(&rpcRsp);
return;
} }
// pTable->numOfColumns = htons(pTable->numOfColumns); void *pTsdb = dnodeGetVnodeTsdb(pVnode);
// pTable->numOfTags = htons(pTable->numOfTags); if (pTsdb == NULL) {
// pTable->sid = htonl(pTable->sid); dnodeReleaseVnode(pVnode);
// pTable->sversion = htonl(pTable->sversion); rpcRsp.code = TSDB_CODE_NOT_ACTIVE_VNODE;
// pTable->tagDataLen = htonl(pTable->tagDataLen); dTrace("failed to create table:%s in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
// pTable->sqlDataLen = htonl(pTable->sqlDataLen); rpcSendResponse(&rpcRsp);
// pTable->contLen = htonl(pTable->contLen); return;
// pTable->numOfVPeers = htonl(pTable->numOfVPeers); }
// pTable->uid = htobe64(pTable->uid);
// pTable->superTableUid = htobe64(pTable->superTableUid);
// pTable->createdTime = htobe64(pTable->createdTime);
//
// for (int i = 0; i < pTable->numOfVPeers; ++i) {
// pTable->vpeerDesc[i].ip = htonl(pTable->vpeerDesc[i].ip);
// pTable->vpeerDesc[i].vnode = htonl(pTable->vpeerDesc[i].vnode);
// }
//
// int32_t totalCols = pTable->numOfColumns + pTable->numOfTags;
// SSchema *pSchema = (SSchema *) pTable->data;
// for (int32_t col = 0; col < totalCols; ++col) {
// pSchema->bytes = htons(pSchema->bytes);
// pSchema->colId = htons(pSchema->colId);
// pSchema++;
// }
//
// int32_t code = dnodeCreateTable(pTable);
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; pTable->numOfColumns = htons(pTable->numOfColumns);
rpcSendResponse(&rpcRsp); pTable->numOfTags = htons(pTable->numOfTags);
pTable->sid = htonl(pTable->sid);
pTable->sversion = htonl(pTable->sversion);
pTable->tagDataLen = htonl(pTable->tagDataLen);
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
pTable->uid = htobe64(pTable->uid);
pTable->superTableUid = htobe64(pTable->superTableUid);
pTable->createdTime = htobe64(pTable->createdTime);
SSchema *pSchema = (SSchema *) pTable->data;
STableCfg tCfg;
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
for (int i = 0; i < pTable->numOfColumns; i++) {
tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetSchema(&tCfg, pDestSchema, false);
if (pTable->numOfTags != NULL) {
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
for (int i = pTable->numOfColumns; i < pTable->numOfColumns + pTable->numOfTags; i++) {
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
}
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
}
if (pTable->tableType == TSDB_CHILD_TABLE) {
// TODO: add data row
}
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
dnodeReleaseVnode(pVnode);
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
dError("failed to create table:%s in vgroup:%d, reason:%s", pTable->tableId, pTable->vgId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp);
} else {
dTrace("create table:%s in vgroup:%d finished", pTable->tableId, pTable->vgId);
rpcSendResponse(&rpcRsp);
}
} }
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
......
...@@ -297,6 +297,20 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -297,6 +297,20 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_SESSIONS_PER_VNODE (300) #define TSDB_SESSIONS_PER_VNODE (300)
#define TSDB_SESSIONS_PER_DNODE (TSDB_SESSIONS_PER_VNODE * TSDB_MAX_VNODES) #define TSDB_SESSIONS_PER_DNODE (TSDB_SESSIONS_PER_VNODE * TSDB_MAX_VNODES)
enum {
TSDB_PRECISION_MILLI,
TSDB_PRECISION_MICRO,
TSDB_PRECISION_NANO
};
typedef enum {
TSDB_SUPER_TABLE = 0, // super table
TSDB_CHILD_TABLE = 1, // table created from super table
TSDB_NORMAL_TABLE = 2, // ordinary table
TSDB_STREAM_TABLE = 3, // table created from stream computing
TSDB_TABLE_MAX = 4
} TSDB_TABLE_TYPE;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -251,7 +251,6 @@ typedef struct { ...@@ -251,7 +251,6 @@ typedef struct {
typedef struct { typedef struct {
int32_t contLen; int32_t contLen;
int32_t vgId; int32_t vgId;
int8_t tableType; int8_t tableType;
int16_t numOfColumns; int16_t numOfColumns;
int16_t numOfTags; int16_t numOfTags;
......
...@@ -4,6 +4,7 @@ PROJECT(TDengine) ...@@ -4,6 +4,7 @@ PROJECT(TDengine)
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
......
...@@ -291,31 +291,26 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr ...@@ -291,31 +291,26 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr
int32_t totalCols = pTable->numOfColumns; int32_t totalCols = pTable->numOfColumns;
int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen; int32_t contLen = sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema) + pTable->sqlLen;
SMDCreateTableMsg *pCreateTable = rpcMallocCont(contLen); SMDCreateTableMsg *pCreate = rpcMallocCont(contLen);
if (pCreateTable == NULL) { if (pCreate == NULL) {
return NULL; return NULL;
} }
memcpy(pCreateTable->tableId, pTable->tableId, TSDB_TABLE_ID_LEN); memcpy(pCreate->tableId, pTable->tableId, TSDB_TABLE_ID_LEN + 1);
pCreateTable->tableType = pTable->type; pCreate->contLen = htonl(contLen);
pCreateTable->numOfColumns = htons(pTable->numOfColumns); pCreate->vgId = htonl(pVgroup->vgId);
pCreateTable->numOfTags = htons(0); pCreate->tableType = pTable->type;
pCreateTable->sid = htonl(pTable->sid); pCreate->numOfColumns = htons(pTable->numOfColumns);
pCreateTable->sversion = htonl(pTable->sversion); pCreate->numOfTags = htons(0);
pCreateTable->tagDataLen = htonl(0); pCreate->sid = htonl(pTable->sid);
pCreateTable->sqlDataLen = htonl(pTable->sqlLen); pCreate->sversion = htonl(pTable->sversion);
pCreateTable->contLen = htonl(contLen); pCreate->tagDataLen = htonl(0);
pCreateTable->numOfVPeers = htonl(pVgroup->numOfVnodes); pCreate->sqlDataLen = htonl(pTable->sqlLen);
pCreateTable->uid = htobe64(pTable->uid); pCreate->uid = htobe64(pTable->uid);
pCreateTable->superTableUid = htobe64(0); pCreate->superTableUid = htobe64(0);
pCreateTable->createdTime = htobe64(pTable->createdTime); pCreate->createdTime = htobe64(pTable->createdTime);
for (int i = 0; i < pVgroup->numOfVnodes; ++i) { SSchema *pSchema = (SSchema *) pCreate->data;
pCreateTable->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip);
pCreateTable->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
SSchema *pSchema = (SSchema *) pCreateTable->data;
memcpy(pSchema, pTable->schema, totalCols * sizeof(SSchema)); memcpy(pSchema, pTable->schema, totalCols * sizeof(SSchema));
for (int32_t col = 0; col < totalCols; ++col) { for (int32_t col = 0; col < totalCols; ++col) {
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htons(pSchema->bytes);
...@@ -323,9 +318,9 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr ...@@ -323,9 +318,9 @@ static void *mgmtBuildCreateNormalTableMsg(SNormalTableObj *pTable, SVgObj *pVgr
pSchema++; pSchema++;
} }
memcpy(pCreateTable + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen); memcpy(pCreate + sizeof(SMDCreateTableMsg) + totalCols * sizeof(SSchema), pTable->sql, pTable->sqlLen);
return pCreateTable; return pCreate;
} }
int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid, int32_t mgmtCreateNormalTable(SCMCreateTableMsg *pCreate, int32_t contLen, SVgObj *pVgroup, int32_t sid,
......
...@@ -213,7 +213,7 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) { ...@@ -213,7 +213,7 @@ int32_t mgmtCreateSuperTable(SDbObj *pDb, SCMCreateTableMsg *pCreate) {
} }
strcpy(pStable->tableId, pCreate->tableId); strcpy(pStable->tableId, pCreate->tableId);
pStable->type = TSDB_TABLE_TYPE_SUPER_TABLE; pStable->type = TSDB_SUPER_TABLE;
pStable->createdTime = taosGetTimestampMs(); pStable->createdTime = taosGetTimestampMs();
pStable->vgId = 0; pStable->vgId = 0;
pStable->sid = 0; pStable->sid = 0;
......
...@@ -119,11 +119,11 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid) { ...@@ -119,11 +119,11 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid) {
} }
int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp) { int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, bool usePublicIp) {
if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) { if (pTable->type == TSDB_CHILD_TABLE) {
mgmtGetChildTableMeta(pDb, (SChildTableObj *) pTable, pMeta, usePublicIp); mgmtGetChildTableMeta(pDb, (SChildTableObj *) pTable, pMeta, usePublicIp);
} else if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { } else if (pTable->type == TSDB_NORMAL_TABLE) {
mgmtGetNormalTableMeta(pDb, (SNormalTableObj *) pTable, pMeta, usePublicIp); mgmtGetNormalTableMeta(pDb, (SNormalTableObj *) pTable, pMeta, usePublicIp);
} else if (pTable->type == TSDB_TABLE_TYPE_SUPER_TABLE) { } else if (pTable->type == TSDB_SUPER_TABLE) {
mgmtGetSuperTableMeta(pDb, (SSuperTableObj *) pTable, pMeta, usePublicIp); mgmtGetSuperTableMeta(pDb, (SSuperTableObj *) pTable, pMeta, usePublicIp);
} else { } else {
mTrace("%s, uid:%" PRIu64 " table meta retrieve failed, invalid type", pTable->tableId, pTable->uid); mTrace("%s, uid:%" PRIu64 " table meta retrieve failed, invalid type", pTable->tableId, pTable->uid);
...@@ -176,7 +176,7 @@ static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) { ...@@ -176,7 +176,7 @@ static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) {
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = pMsg, .handle = pMsg,
.pCont = pCreate, .pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen), .contLen = htonl(pMDCreate->contLen),
.code = 0, .code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
...@@ -204,16 +204,16 @@ int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) { ...@@ -204,16 +204,16 @@ int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) {
} }
switch (pTable->type) { switch (pTable->type) {
case TSDB_TABLE_TYPE_SUPER_TABLE: case TSDB_SUPER_TABLE:
mTrace("table:%s, start to drop super table", tableId); mTrace("table:%s, start to drop super table", tableId);
return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable); return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable);
case TSDB_TABLE_TYPE_CHILD_TABLE: case TSDB_CHILD_TABLE:
mTrace("table:%s, start to drop child table", tableId); mTrace("table:%s, start to drop child table", tableId);
return mgmtDropChildTable(pDb, (SChildTableObj *) pTable); return mgmtDropChildTable(pDb, (SChildTableObj *) pTable);
case TSDB_TABLE_TYPE_NORMAL_TABLE: case TSDB_NORMAL_TABLE:
mTrace("table:%s, start to drop normal table", tableId); mTrace("table:%s, start to drop normal table", tableId);
return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
case TSDB_TABLE_TYPE_STREAM_TABLE: case TSDB_STREAM_TABLE:
mTrace("table:%s, start to drop stream table", tableId); mTrace("table:%s, start to drop stream table", tableId);
return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable); return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
default: default:
...@@ -233,31 +233,31 @@ int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) { ...@@ -233,31 +233,31 @@ int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) {
} }
if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) {
if (pTable->type == TSDB_TABLE_TYPE_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
return mgmtAddSuperTableTag((SSuperTableObj *) pTable, pAlter->schema, 1); return mgmtAddSuperTableTag((SSuperTableObj *) pTable, pAlter->schema, 1);
} }
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) {
if (pTable->type == TSDB_TABLE_TYPE_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
return mgmtDropSuperTableTag((SSuperTableObj *) pTable, pAlter->schema[0].name); return mgmtDropSuperTableTag((SSuperTableObj *) pTable, pAlter->schema[0].name);
} }
} else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) {
if (pTable->type == TSDB_TABLE_TYPE_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
return mgmtModifySuperTableTagNameByName((SSuperTableObj *) pTable, pAlter->schema[0].name, pAlter->schema[1].name); return mgmtModifySuperTableTagNameByName((SSuperTableObj *) pTable, pAlter->schema[0].name, pAlter->schema[1].name);
} }
} else if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { } else if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
if (pTable->type == TSDB_TABLE_TYPE_CHILD_TABLE) { if (pTable->type == TSDB_CHILD_TABLE) {
return mgmtModifyChildTableTagValueByName((SChildTableObj *) pTable, pAlter->schema[0].name, pAlter->tagVal); return mgmtModifyChildTableTagValueByName((SChildTableObj *) pTable, pAlter->schema[0].name, pAlter->tagVal);
} }
} else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { if (pTable->type == TSDB_NORMAL_TABLE) {
return mgmtAddNormalTableColumn((SNormalTableObj *) pTable, pAlter->schema, 1); return mgmtAddNormalTableColumn((SNormalTableObj *) pTable, pAlter->schema, 1);
} else if (pTable->type == TSDB_TABLE_TYPE_SUPER_TABLE) { } else if (pTable->type == TSDB_SUPER_TABLE) {
return mgmtAddSuperTableColumn((SSuperTableObj *) pTable, pAlter->schema, 1); return mgmtAddSuperTableColumn((SSuperTableObj *) pTable, pAlter->schema, 1);
} else {} } else {}
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
if (pTable->type == TSDB_TABLE_TYPE_NORMAL_TABLE) { if (pTable->type == TSDB_NORMAL_TABLE) {
return mgmtDropNormalTableColumnByName((SNormalTableObj *) pTable, pAlter->schema[0].name); return mgmtDropNormalTableColumnByName((SNormalTableObj *) pTable, pAlter->schema[0].name);
} else if (pTable->type == TSDB_TABLE_TYPE_SUPER_TABLE) { } else if (pTable->type == TSDB_SUPER_TABLE) {
return mgmtDropSuperTableColumnByName((SSuperTableObj *) pTable, pAlter->schema[0].name); return mgmtDropSuperTableColumnByName((SSuperTableObj *) pTable, pAlter->schema[0].name);
} else {} } else {}
} else {} } else {}
......
...@@ -253,6 +253,7 @@ static __compar_fn_t getKeyComparator(int32_t keyType) { ...@@ -253,6 +253,7 @@ static __compar_fn_t getKeyComparator(int32_t keyType) {
comparFn = compareInt32Val; comparFn = compareInt32Val;
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
comparFn = compareInt64Val; comparFn = compareInt64Val;
break; break;
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
......
...@@ -33,13 +33,6 @@ extern "C" { ...@@ -33,13 +33,6 @@ extern "C" {
#define TSDB_INVALID_SUPER_TABLE_ID -1 #define TSDB_INVALID_SUPER_TABLE_ID -1
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION // --------- TSDB REPOSITORY CONFIGURATION DEFINITION
enum { TSDB_PRECISION_MILLI, TSDB_PRECISION_MICRO, TSDB_PRECISION_NANO };
typedef enum {
TSDB_SUPER_TABLE, // super table
TSDB_NTABLE, // table not created from super table
TSDB_STABLE // table created from super table
} TSDB_TABLE_TYPE;
typedef struct { typedef struct {
int8_t precision; int8_t precision;
int32_t vgId; int32_t vgId;
...@@ -75,6 +68,7 @@ typedef struct { ...@@ -75,6 +68,7 @@ typedef struct {
typedef struct { typedef struct {
TSDB_TABLE_TYPE type; TSDB_TABLE_TYPE type;
STableId tableId; STableId tableId;
int32_t sversion;
int64_t superUid; int64_t superUid;
STSchema * schema; STSchema * schema;
STSchema * tagSchema; STSchema * tagSchema;
......
...@@ -23,8 +23,6 @@ ...@@ -23,8 +23,6 @@
extern "C" { extern "C" {
#endif #endif
typedef int32_t file_id_t;
typedef enum { typedef enum {
TSDB_FILE_TYPE_HEAD, // .head file type TSDB_FILE_TYPE_HEAD, // .head file type
TSDB_FILE_TYPE_DATA, // .data file type TSDB_FILE_TYPE_DATA, // .data file type
...@@ -40,19 +38,33 @@ typedef struct { ...@@ -40,19 +38,33 @@ typedef struct {
} SFileInfo; } SFileInfo;
typedef struct { typedef struct {
char * fname; int fd;
SFileInfo fInfo; int64_t size; // total size of the file
} SFILE; int64_t tombSize; // unused file size
} SFile;
// typedef struct { typedef struct {
// int64_t offset; int32_t fileId;
// int64_t skey; SFile fhead;
// int64_t ekey; SFile fdata;
// int16_t numOfBlocks; SFile flast;
// } SDataBlock; } SFileGroup;
// TSDB file handle
typedef struct {
int32_t daysPerFile;
int32_t keep;
int32_t minRowPerFBlock;
int32_t maxRowsPerFBlock;
SFileGroup fGroup[];
} STsdbFileH;
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) <= TSDB_FILE_TYPE_META) #define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) <= TSDB_FILE_TYPE_META)
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
int32_t maxRowsPerFBlock);
void tsdbCloseFile(STsdbFileH *pFileH);
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type); char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -38,11 +38,12 @@ typedef struct STable { ...@@ -38,11 +38,12 @@ typedef struct STable {
TSDB_TABLE_TYPE type; TSDB_TABLE_TYPE type;
STableId tableId; STableId tableId;
int32_t superUid; // Super table UID int32_t superUid; // Super table UID
int32_t sversion;
STSchema * schema; STSchema * schema;
STSchema * tagSchema; STSchema * tagSchema;
SDataRow tagVal; SDataRow tagVal;
union { union {
void *pData; // For TSDB_NTABLE and TSDB_STABLE, it is the skiplist for cache data void *pData; // For TSDB_NORMAL_TABLE and TSDB_CHILD_TABLE, it is the skiplist for cache data
void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
} content; } content;
void * eventHandler; // TODO void * eventHandler; // TODO
......
...@@ -14,9 +14,21 @@ ...@@ -14,9 +14,21 @@
*/ */
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdint.h>
#include <string.h> #include <string.h>
#include <dirent.h>
#include "tsdbFile.h" #include "tsdbFile.h"
#include "tglobalcfg.h"
// int64_t tsMsPerDay[] = {
// 86400000L, // TSDB_PRECISION_MILLI
// 86400000000L, // TSDB_PRECISION_MICRO
// 86400000000000L // TSDB_PRECISION_NANO
// };
#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
typedef struct { typedef struct {
int64_t offset; int64_t offset;
...@@ -71,6 +83,55 @@ const char *tsdbFileSuffix[] = { ...@@ -71,6 +83,55 @@ const char *tsdbFileSuffix[] = {
".meta" // TSDB_FILE_TYPE_META ".meta" // TSDB_FILE_TYPE_META
}; };
/**
* Initialize the TSDB file handle
*/
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
int32_t maxRowsPerFBlock) {
STsdbFileH *pTsdbFileH =
(STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile));
if (pTsdbFileH == NULL) return NULL;
pTsdbFileH->daysPerFile = daysPerFile;
pTsdbFileH->keep = keep;
pTsdbFileH->minRowPerFBlock = minRowsPerFBlock;
pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock;
// Open the directory to read information of each file
DIR *dir = opendir(dataDir);
if (dir == NULL) {
free(pTsdbFileH);
return NULL;
}
struct dirent *dp;
char fname[256];
while ((dp = readdir(dir)) != NULL) {
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue;
if (true /* check if the file is the .head file */) {
int fileId = 0;
int vgId = 0;
sscanf(dp->d_name, "v%df%d.head", &vgId, &fileId);
// TODO
// Open head file
// Open data file
// Open last file
}
}
return pTsdbFileH;
}
/**
* Closet the file handle
*/
void tsdbCloseFile(STsdbFileH *pFileH) {
// TODO
}
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) { char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) {
if (!IS_VALID_TSDB_FILE_TYPE(type)) return NULL; if (!IS_VALID_TSDB_FILE_TYPE(type)) return NULL;
...@@ -79,4 +140,10 @@ char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) { ...@@ -79,4 +140,10 @@ char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) {
sprintf(fileName, "%s/%s%s", dirName, fname, tsdbFileSuffix[type]); sprintf(fileName, "%s/%s%s", dirName, fname, tsdbFileSuffix[type]);
return fileName; return fileName;
}
static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
} }
\ No newline at end of file
...@@ -308,7 +308,6 @@ int tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) { ...@@ -308,7 +308,6 @@ int tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) {
} }
int tsdbDropTable(tsdb_repo_t *repo, STableId tableId) { int tsdbDropTable(tsdb_repo_t *repo, STableId tableId) {
// TODO
if (repo == NULL) return -1; if (repo == NULL) return -1;
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
...@@ -340,7 +339,7 @@ int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) { ...@@ -340,7 +339,7 @@ int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
*/ */
int tsdbInitTableCfg(STableCfg *config, TSDB_TABLE_TYPE type, int64_t uid, int32_t tid) { int tsdbInitTableCfg(STableCfg *config, TSDB_TABLE_TYPE type, int64_t uid, int32_t tid) {
if (config == NULL) return -1; if (config == NULL) return -1;
if (type != TSDB_NTABLE && type != TSDB_STABLE) return -1; if (type != TSDB_NORMAL_TABLE && type != TSDB_CHILD_TABLE) return -1;
memset((void *)config, 0, sizeof(STableCfg)); memset((void *)config, 0, sizeof(STableCfg));
...@@ -355,7 +354,7 @@ int tsdbInitTableCfg(STableCfg *config, TSDB_TABLE_TYPE type, int64_t uid, int32 ...@@ -355,7 +354,7 @@ int tsdbInitTableCfg(STableCfg *config, TSDB_TABLE_TYPE type, int64_t uid, int32
* Set the super table UID of the created table * Set the super table UID of the created table
*/ */
int tsdbTableSetSuperUid(STableCfg *config, int64_t uid) { int tsdbTableSetSuperUid(STableCfg *config, int64_t uid) {
if (config->type != TSDB_STABLE) return -1; if (config->type != TSDB_CHILD_TABLE) return -1;
if (uid == TSDB_INVALID_SUPER_TABLE_ID) return -1; if (uid == TSDB_INVALID_SUPER_TABLE_ID) return -1;
config->superUid = uid; config->superUid = uid;
...@@ -388,7 +387,7 @@ int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup) { ...@@ -388,7 +387,7 @@ int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup) {
* @return 0 for success and -1 for failure * @return 0 for success and -1 for failure
*/ */
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) { int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) {
if (config->type != TSDB_STABLE) return -1; if (config->type != TSDB_CHILD_TABLE) return -1;
if (dup) { if (dup) {
config->tagSchema = tdDupSchema(pSchema); config->tagSchema = tdDupSchema(pSchema);
...@@ -399,7 +398,7 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) { ...@@ -399,7 +398,7 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) {
} }
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup) { int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup) {
if (config->type != TSDB_STABLE) return -1; if (config->type != TSDB_CHILD_TABLE) return -1;
if (dup) { if (dup) {
config->tagValues = tdDataRowDup(row); config->tagValues = tdDataRowDup(row);
......
...@@ -18,6 +18,7 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable); ...@@ -18,6 +18,7 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbEstimateTableEncodeSize(STable *pTable); static int tsdbEstimateTableEncodeSize(STable *pTable);
static char * getTupleKey(const void *data);
/** /**
* Encode a TSDB table object as a binary content * Encode a TSDB table object as a binary content
...@@ -137,7 +138,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -137,7 +138,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
STable *super = NULL; STable *super = NULL;
int newSuper = 0; int newSuper = 0;
if (pCfg->type == TSDB_STABLE) { if (pCfg->type == TSDB_CHILD_TABLE) {
super = tsdbGetTableByUid(pMeta, pCfg->superUid); super = tsdbGetTableByUid(pMeta, pCfg->superUid);
if (super == NULL) { // super table not exists, try to create it if (super == NULL) { // super table not exists, try to create it
newSuper = 1; newSuper = 1;
...@@ -153,7 +154,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -153,7 +154,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
super->tagSchema = tdDupSchema(pCfg->tagSchema); super->tagSchema = tdDupSchema(pCfg->tagSchema);
super->tagVal = tdDataRowDup(pCfg->tagValues); super->tagVal = tdDataRowDup(pCfg->tagValues);
super->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, super->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
0, NULL); // Allow duplicate key, no lock 0, getTupleKey); // Allow duplicate key, no lock
if (super->content.pIndex == NULL) { if (super->content.pIndex == NULL) {
tdFreeSchema(super->schema); tdFreeSchema(super->schema);
...@@ -174,16 +175,16 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { ...@@ -174,16 +175,16 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
} }
table->tableId = pCfg->tableId; table->tableId = pCfg->tableId;
if (IS_CREATE_STABLE(pCfg)) { // TSDB_STABLE if (IS_CREATE_STABLE(pCfg)) { // TSDB_CHILD_TABLE
table->type = TSDB_STABLE; table->type = TSDB_CHILD_TABLE;
table->superUid = pCfg->superUid; table->superUid = pCfg->superUid;
table->tagVal = tdDataRowDup(pCfg->tagValues); table->tagVal = tdDataRowDup(pCfg->tagValues);
} else { // TSDB_NTABLE } else { // TSDB_NORMAL_TABLE
table->type = TSDB_NTABLE; table->type = TSDB_NORMAL_TABLE;
table->superUid = -1; table->superUid = -1;
table->schema = tdDupSchema(pCfg->schema); table->schema = tdDupSchema(pCfg->schema);
} }
table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, 0, 8, 0, 0, NULL); table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
if (newSuper) tsdbAddTableToMeta(pMeta, super); if (newSuper) tsdbAddTableToMeta(pMeta, super);
tsdbAddTableToMeta(pMeta, table); tsdbAddTableToMeta(pMeta, table);
...@@ -220,7 +221,7 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { ...@@ -220,7 +221,7 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
pMeta->tables[pTable->tableId.tid] = NULL; pMeta->tables[pTable->tableId.tid] = NULL;
pMeta->nTables--; pMeta->nTables--;
assert(pMeta->nTables >= 0); assert(pMeta->nTables >= 0);
if (pTable->type == TSDB_STABLE) { if (pTable->type == TSDB_CHILD_TABLE) {
tsdbRemoveTableFromIndex(pMeta, pTable); tsdbRemoveTableFromIndex(pMeta, pTable);
} }
...@@ -237,7 +238,7 @@ int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) { ...@@ -237,7 +238,7 @@ int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
static int tsdbFreeTable(STable *pTable) { static int tsdbFreeTable(STable *pTable) {
// TODO: finish this function // TODO: finish this function
if (pTable->type == TSDB_STABLE) { if (pTable->type == TSDB_CHILD_TABLE) {
tdFreeDataRow(pTable->tagVal); tdFreeDataRow(pTable->tagVal);
} else { } else {
tdFreeSchema(pTable->schema); tdFreeSchema(pTable->schema);
...@@ -281,7 +282,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable) { ...@@ -281,7 +282,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable) {
} else { } else {
// add non-super table to the array // add non-super table to the array
pMeta->tables[pTable->tableId.tid] = pTable; pMeta->tables[pTable->tableId.tid] = pTable;
if (pTable->type == TSDB_STABLE) { if (pTable->type == TSDB_CHILD_TABLE) {
// add STABLE to the index // add STABLE to the index
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
...@@ -305,13 +306,13 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) { ...@@ -305,13 +306,13 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) {
return 0; return 0;
} }
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
assert(pTable->type == TSDB_STABLE); assert(pTable->type == TSDB_CHILD_TABLE);
// TODO // TODO
return 0; return 0;
} }
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
assert(pTable->type == TSDB_STABLE); assert(pTable->type == TSDB_CHILD_TABLE);
// TODO // TODO
return 0; return 0;
} }
...@@ -319,4 +320,10 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { ...@@ -319,4 +320,10 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
static int tsdbEstimateTableEncodeSize(STable *pTable) { static int tsdbEstimateTableEncodeSize(STable *pTable) {
// TODO // TODO
return 0; return 0;
}
static char *getTupleKey(const void * data) {
SDataRow row = (SDataRow)data;
return dataRowAt(row, TD_DATA_ROW_HEAD_SIZE);
} }
\ No newline at end of file
...@@ -83,7 +83,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co ...@@ -83,7 +83,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
} }
// TODO: make below a function to implement // TODO: make below a function to implement
if (fseek(mfh->fd, info.offset, SEEK_CUR) < 0) { if (lseek(mfh->fd, info.offset, SEEK_CUR) < 0) {
return -1; return -1;
} }
...@@ -114,7 +114,7 @@ int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid) { ...@@ -114,7 +114,7 @@ int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid) {
// Remove record from file // Remove record from file
info.offset = -info.offset; info.offset = -info.offset;
if (fseek(mfh->fd, -info.offset, SEEK_CUR) < 0) { if (lseek(mfh->fd, -info.offset, SEEK_CUR) < 0) {
return -1; return -1;
} }
...@@ -149,7 +149,7 @@ int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co ...@@ -149,7 +149,7 @@ int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
mfh->size += contLen; mfh->size += contLen;
} }
if (fseek(mfh->fd, -info.offset, SEEK_CUR) < 0) { if (lseek(mfh->fd, -info.offset, SEEK_CUR) < 0) {
return -1; return -1;
} }
...@@ -212,7 +212,7 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) { ...@@ -212,7 +212,7 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) {
return -1; return -1;
} }
if (fseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET) < 0) { if (lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET) < 0) {
// TODO: deal with the error // TODO: deal with the error
close(fd); close(fd);
return -1; return -1;
......
...@@ -15,7 +15,7 @@ TEST(TsdbTest, createRepo) { ...@@ -15,7 +15,7 @@ TEST(TsdbTest, createRepo) {
// 2. Create a normal table // 2. Create a normal table
STableCfg tCfg; STableCfg tCfg;
ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1); ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1);
ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NTABLE, 987607499877672L, 0), 0); ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0);
int nCols = 5; int nCols = 5;
STSchema *schema = tdNewSchema(nCols); STSchema *schema = tdNewSchema(nCols);
...@@ -48,7 +48,7 @@ TEST(TsdbTest, createRepo) { ...@@ -48,7 +48,7 @@ TEST(TsdbTest, createRepo) {
for (int j = 0; j < schemaNCols(schema); j++) { for (int j = 0; j < schemaNCols(schema); j++) {
if (j == 0) { // Just for timestamp if (j == 0) { // Just for timestamp
tdAppendColVal(row, (void *)(&time), schemaColAt(schema, j)); tdAppendColVal(row, (void *)(&ttime), schemaColAt(schema, j));
} else { // For int } else { // For int
int val = 10; int val = 10;
tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j));
...@@ -61,5 +61,7 @@ TEST(TsdbTest, createRepo) { ...@@ -61,5 +61,7 @@ TEST(TsdbTest, createRepo) {
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len; pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
tsdbInsertData(pRepo, pMsg); tsdbInsertData(pRepo, pMsg);
int k = 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册