提交 2ea3a427 编写于 作者: S Shengliang Guan

sma

上级 8323cad2
...@@ -272,11 +272,14 @@ typedef struct { ...@@ -272,11 +272,14 @@ typedef struct {
float xFilesFactor; float xFilesFactor;
int32_t aggregationMethod; int32_t aggregationMethod;
int32_t delay; int32_t delay;
int32_t ttl;
int32_t numOfColumns; int32_t numOfColumns;
int32_t numOfTags; int32_t numOfTags;
int32_t numOfSmas;
int32_t commentLen; int32_t commentLen;
SArray* pColumns; SArray* pColumns; // array of SField
SArray* pTags; SArray* pTags; // array of SField
SArray* pSmas; // array of SField
char* comment; char* comment;
} SMCreateStbReq; } SMCreateStbReq;
......
...@@ -511,8 +511,10 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq ...@@ -511,8 +511,10 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeFloat(&encoder, pReq->xFilesFactor) < 0) return -1; if (tEncodeFloat(&encoder, pReq->xFilesFactor) < 0) return -1;
if (tEncodeI32(&encoder, pReq->aggregationMethod) < 0) return -1; if (tEncodeI32(&encoder, pReq->aggregationMethod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->delay) < 0) return -1; if (tEncodeI32(&encoder, pReq->delay) < 0) return -1;
if (tEncodeI32(&encoder, pReq->ttl) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfColumns) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfTags) < 0) return -1;
if (tEncodeI32(&encoder, pReq->numOfSmas) < 0) return -1;
if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1; if (tEncodeI32(&encoder, pReq->commentLen) < 0) return -1;
for (int32_t i = 0; i < pReq->numOfColumns; ++i) { for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
...@@ -529,7 +531,16 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq ...@@ -529,7 +531,16 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeCStr(&encoder, pField->name) < 0) return -1; if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
} }
if (tEncodeBinary(&encoder, pReq->comment, pReq->commentLen) < 0) return -1; for (int32_t i = 0; i < pReq->numOfSmas; ++i) {
SField *pField = taosArrayGet(pReq->pSmas, i);
if (tEncodeI8(&encoder, pField->type) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (pReq->commentLen > 0) {
if (tEncodeBinary(&encoder, pReq->comment, pReq->commentLen) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -547,13 +558,16 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR ...@@ -547,13 +558,16 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeFloat(&decoder, &pReq->xFilesFactor) < 0) return -1; if (tDecodeFloat(&decoder, &pReq->xFilesFactor) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->aggregationMethod) < 0) return -1; if (tDecodeI32(&decoder, &pReq->aggregationMethod) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->delay) < 0) return -1; if (tDecodeI32(&decoder, &pReq->delay) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->ttl) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfColumns) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfTags) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->numOfSmas) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1; if (tDecodeI32(&decoder, &pReq->commentLen) < 0) return -1;
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField)); pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField)); pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
if (pReq->pColumns == NULL || pReq->pTags == NULL) { pReq->pSmas = taosArrayInit(pReq->numOfSmas, sizeof(SField));
if (pReq->pColumns == NULL || pReq->pTags == NULL || pReq->pSmas == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -580,13 +594,23 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR ...@@ -580,13 +594,23 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
} }
} }
for (int32_t i = 0; i < pReq->numOfSmas; ++i) {
SField field = {0};
if (tDecodeI8(&decoder, &field.type) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (taosArrayPush(pReq->pSmas, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
if (pReq->commentLen > 0) { if (pReq->commentLen > 0) {
pReq->comment = malloc(pReq->commentLen); pReq->comment = malloc(pReq->commentLen);
if (pReq->comment == NULL) return -1; if (pReq->comment == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
} }
if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tCoderClear(&decoder);
...@@ -596,8 +620,11 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR ...@@ -596,8 +620,11 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
void tFreeSMCreateStbReq(SMCreateStbReq *pReq) { void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
taosArrayDestroy(pReq->pColumns); taosArrayDestroy(pReq->pColumns);
taosArrayDestroy(pReq->pTags); taosArrayDestroy(pReq->pTags);
taosArrayDestroy(pReq->pSmas);
tfree(pReq->comment);
pReq->pColumns = NULL; pReq->pColumns = NULL;
pReq->pTags = NULL; pReq->pTags = NULL;
pReq->pSmas = NULL;
} }
int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) { int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
......
...@@ -344,11 +344,14 @@ typedef struct { ...@@ -344,11 +344,14 @@ typedef struct {
float xFilesFactor; float xFilesFactor;
int32_t aggregationMethod; int32_t aggregationMethod;
int32_t delay; int32_t delay;
int32_t ttl;
int32_t numOfColumns; int32_t numOfColumns;
int32_t numOfTags; int32_t numOfTags;
int32_t numOfSmas;
int32_t commentLen; int32_t commentLen;
SSchema* pColumns; SSchema* pColumns;
SSchema* pTags; SSchema* pTags;
SSchema* pSmas;
char* comment; char* comment;
SRWLatch lock; SRWLatch lock;
} SStbObj; } SStbObj;
......
...@@ -72,7 +72,8 @@ void mndCleanupStb(SMnode *pMnode) {} ...@@ -72,7 +72,8 @@ void mndCleanupStb(SMnode *pMnode) {}
SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + TSDB_STB_RESERVE_SIZE; int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags + pStb->numOfSmas) * sizeof(SSchema) +
TSDB_STB_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
if (pRaw == NULL) goto STB_ENCODE_OVER; if (pRaw == NULL) goto STB_ENCODE_OVER;
...@@ -88,8 +89,10 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { ...@@ -88,8 +89,10 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->aggregationMethod, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->aggregationMethod, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->delay, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->delay, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->ttl, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfSmas, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->commentLen, STB_ENCODE_OVER)
for (int32_t i = 0; i < pStb->numOfColumns; ++i) { for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
...@@ -108,7 +111,17 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { ...@@ -108,7 +111,17 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
} }
SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, STB_ENCODE_OVER) for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pSmas[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
}
if (pStb->commentLen > 0) {
SDB_SET_BINARY(pRaw, dataPos, pStb->comment, pStb->commentLen, STB_ENCODE_OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, STB_ENCODE_OVER)
SDB_SET_DATALEN(pRaw, dataPos, STB_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, STB_ENCODE_OVER)
...@@ -156,13 +169,16 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -156,13 +169,16 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
pStb->xFilesFactor = xFilesFactor / 10000.0f; pStb->xFilesFactor = xFilesFactor / 10000.0f;
SDB_GET_INT32(pRaw, dataPos, &pStb->aggregationMethod, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->aggregationMethod, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->delay, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->delay, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->ttl, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfSmas, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->commentLen, STB_DECODE_OVER)
pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema)); pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema));
pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema)); pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema));
if (pStb->pColumns == NULL || pStb->pTags == NULL) { pStb->pSmas = calloc(pStb->numOfSmas, sizeof(SSchema));
if (pStb->pColumns == NULL || pStb->pTags == NULL || pStb->pSmas == NULL) {
goto STB_DECODE_OVER; goto STB_DECODE_OVER;
} }
...@@ -182,6 +198,14 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -182,6 +198,14 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
} }
for (int32_t i = 0; i < pStb->numOfSmas; ++i) {
SSchema *pSchema = &pStb->pSmas[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
}
if (pStb->commentLen > 0) { if (pStb->commentLen > 0) {
pStb->comment = calloc(pStb->commentLen, 1); pStb->comment = calloc(pStb->commentLen, 1);
if (pStb->comment == NULL) goto STB_DECODE_OVER; if (pStb->comment == NULL) goto STB_DECODE_OVER;
...@@ -505,6 +529,16 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -505,6 +529,16 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0; return 0;
} }
static SSchema *mndFindStbColumns(const SStbObj *pStb, const char *colName) {
for (int32_t col = 0; col < pStb->numOfColumns; col++) {
SSchema *pSchema = &pStb->pColumns[col];
if (strcasecmp(pStb->pColumns[col].name, colName) == 0) {
return pSchema;
}
}
return NULL;
}
static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
SStbObj stbObj = {0}; SStbObj stbObj = {0};
memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
...@@ -515,19 +549,24 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre ...@@ -515,19 +549,24 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
stbObj.dbUid = pDb->uid; stbObj.dbUid = pDb->uid;
stbObj.version = 1; stbObj.version = 1;
stbObj.nextColId = 1; stbObj.nextColId = 1;
stbObj.ttl = pCreate->ttl;
stbObj.numOfColumns = pCreate->numOfColumns; stbObj.numOfColumns = pCreate->numOfColumns;
stbObj.numOfTags = pCreate->numOfTags; stbObj.numOfTags = pCreate->numOfTags;
stbObj.numOfSmas = pCreate->numOfSmas;
stbObj.commentLen = pCreate->commentLen; stbObj.commentLen = pCreate->commentLen;
stbObj.comment = calloc(stbObj.commentLen, 1); if (stbObj.commentLen > 0) {
if (stbObj.comment == NULL) { stbObj.comment = calloc(stbObj.commentLen, 1);
terrno = TSDB_CODE_OUT_OF_MEMORY; if (stbObj.comment == NULL) {
return -1; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen);
} }
memcpy(stbObj.comment, pCreate->comment, stbObj.commentLen);
stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema)); stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema));
stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema)); stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema));
if (stbObj.pColumns == NULL || stbObj.pTags == NULL) { stbObj.pSmas = malloc(stbObj.numOfSmas * sizeof(SSchema));
if (stbObj.pColumns == NULL || stbObj.pTags == NULL || stbObj.pSmas == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -552,6 +591,18 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre ...@@ -552,6 +591,18 @@ static int32_t mndCreateStb(SMnode *pMnode, SNodeMsg *pReq, SMCreateStbReq *pCre
stbObj.nextColId++; stbObj.nextColId++;
} }
for (int32_t i = 0; i < stbObj.numOfSmas; ++i) {
SField *pField = taosArrayGet(pCreate->pSmas, i);
SSchema *pSchema = &stbObj.pSmas[i];
SSchema *pColSchema = mndFindStbColumns(&stbObj, pField->name);
if (pColSchema == NULL) {
mError("stb:%s, sma:%s not found in columns", stbObj.name, pSchema->name);
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
memcpy(pSchema, pColSchema, sizeof(SSchema));
}
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_STB_OVER; if (pTrans == NULL) goto CREATE_STB_OVER;
...@@ -1544,11 +1595,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32 ...@@ -1544,11 +1595,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb); pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
if (pDb != NULL && pStb->dbUid != pDb->uid) { if (pStb->dbUid != pDb->uid) {
if (strncmp(pStb->db, pDb->name, prefixLen) == 0) {
mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pStb->name, pDb->name, pDb->uid);
}
sdbRelease(pSdb, pStb); sdbRelease(pSdb, pStb);
continue; continue;
} }
...@@ -1562,12 +1609,12 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32 ...@@ -1562,12 +1609,12 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
STR_TO_VARSTR(pWrite, stbName); STR_TO_VARSTR(pWrite, stbName);
cols++; cols++;
char db[TSDB_DB_NAME_LEN] = {0}; // char db[TSDB_DB_NAME_LEN] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT|T_NAME_DB); // tNameFromString(&name, pStb->db, T_NAME_ACCT|T_NAME_DB);
tNameGetDbName(&name, db); // tNameGetDbName(&name, db);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; // pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, db); // STR_TO_VARSTR(pWrite, db);
cols++; // cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pStb->createdTime; *(int64_t *)pWrite = pStb->createdTime;
...@@ -1580,7 +1627,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32 ...@@ -1580,7 +1627,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pStb->numOfTags; *(int32_t *)pWrite = pStb->numOfTags;
cols++; cols++;
#if 0
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = 0; // number of tables *(int32_t *)pWrite = 0; // number of tables
cols++; cols++;
...@@ -1596,7 +1643,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32 ...@@ -1596,7 +1643,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
STR_TO_VARSTR(pWrite, ""); STR_TO_VARSTR(pWrite, "");
} }
cols++; cols++;
#endif
numOfRows++; numOfRows++;
sdbRelease(pSdb, pStb); sdbRelease(pSdb, pStb);
} }
......
...@@ -26,9 +26,12 @@ class MndTestSma : public ::testing::Test { ...@@ -26,9 +26,12 @@ class MndTestSma : public ::testing::Test {
void* BuildDropDbReq(const char* dbname, int32_t* pContLen); void* BuildDropDbReq(const char* dbname, int32_t* pContLen);
void* BuildCreateStbReq(const char* stbname, int32_t* pContLen); void* BuildCreateStbReq(const char* stbname, int32_t* pContLen);
void* BuildDropStbReq(const char* stbname, int32_t* pContLen); void* BuildDropStbReq(const char* stbname, int32_t* pContLen);
void* BuildCreateSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr, void* BuildCreateBSmaStbReq(const char* stbname, int32_t* pContLen);
const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen); void* BuildCreateTSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr,
void* BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen); const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen);
void* BuildDropTSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen);
void PushField(SArray* pArray, int32_t bytes, int8_t type, const char* name);
}; };
Testbase MndTestSma::test; Testbase MndTestSma::test;
...@@ -76,6 +79,14 @@ void* MndTestSma::BuildDropDbReq(const char* dbname, int32_t* pContLen) { ...@@ -76,6 +79,14 @@ void* MndTestSma::BuildDropDbReq(const char* dbname, int32_t* pContLen) {
return pReq; return pReq;
} }
void MndTestSma::PushField(SArray* pArray, int32_t bytes, int8_t type, const char* name) {
SField field = {0};
field.bytes = bytes;
field.type = type;
strcpy(field.name, name);
taosArrayPush(pArray, &field);
}
void* MndTestSma::BuildCreateStbReq(const char* stbname, int32_t* pContLen) { void* MndTestSma::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
SMCreateStbReq createReq = {0}; SMCreateStbReq createReq = {0};
createReq.numOfColumns = 3; createReq.numOfColumns = 3;
...@@ -85,37 +96,35 @@ void* MndTestSma::BuildCreateStbReq(const char* stbname, int32_t* pContLen) { ...@@ -85,37 +96,35 @@ void* MndTestSma::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField)); createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
strcpy(createReq.name, stbname); strcpy(createReq.name, stbname);
{ PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_TIMESTAMP, "ts");
SField field = {0}; PushField(createReq.pColumns, 2, TSDB_DATA_TYPE_TINYINT, "col1");
field.bytes = 8; PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_BIGINT, "col2");
field.type = TSDB_DATA_TYPE_TIMESTAMP; PushField(createReq.pTags, 2, TSDB_DATA_TYPE_TINYINT, "tag1");
strcpy(field.name, "ts");
taosArrayPush(createReq.pColumns, &field);
}
{ int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq);
SField field = {0}; void* pHead = rpcMallocCont(tlen);
field.bytes = 2; tSerializeSMCreateStbReq(pHead, tlen, &createReq);
field.type = TSDB_DATA_TYPE_TINYINT; tFreeSMCreateStbReq(&createReq);
strcpy(field.name, "col1"); *pContLen = tlen;
taosArrayPush(createReq.pColumns, &field); return pHead;
} }
{ void* MndTestSma::BuildCreateBSmaStbReq(const char* stbname, int32_t* pContLen) {
SField field = {0}; SMCreateStbReq createReq = {0};
field.bytes = 8; createReq.numOfColumns = 3;
field.type = TSDB_DATA_TYPE_BIGINT; createReq.numOfTags = 1;
strcpy(field.name, "col2"); createReq.numOfSmas = 1;
taosArrayPush(createReq.pColumns, &field); createReq.igExists = 0;
} createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
createReq.pSmas = taosArrayInit(createReq.numOfSmas, sizeof(SField));
strcpy(createReq.name, stbname);
{ PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_TIMESTAMP, "ts");
SField field = {0}; PushField(createReq.pColumns, 2, TSDB_DATA_TYPE_TINYINT, "col1");
field.bytes = 2; PushField(createReq.pColumns, 8, TSDB_DATA_TYPE_BIGINT, "col2");
field.type = TSDB_DATA_TYPE_TINYINT; PushField(createReq.pTags, 2, TSDB_DATA_TYPE_TINYINT, "tag1");
strcpy(field.name, "tag1"); PushField(createReq.pSmas, 2, TSDB_DATA_TYPE_TINYINT, "col1");
taosArrayPush(createReq.pTags, &field);
}
int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq); int32_t tlen = tSerializeSMCreateStbReq(NULL, 0, &createReq);
void* pHead = rpcMallocCont(tlen); void* pHead = rpcMallocCont(tlen);
...@@ -137,8 +146,8 @@ void* MndTestSma::BuildDropStbReq(const char* stbname, int32_t* pContLen) { ...@@ -137,8 +146,8 @@ void* MndTestSma::BuildDropStbReq(const char* stbname, int32_t* pContLen) {
return pReq; return pReq;
} }
void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr, void* MndTestSma::BuildCreateTSmaReq(const char* smaname, const char* stbname, int8_t igExists, const char* expr,
const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen) { const char* tagsFilter, const char* sql, const char* ast, int32_t* pContLen) {
SMCreateSmaReq createReq = {0}; SMCreateSmaReq createReq = {0};
strcpy(createReq.name, smaname); strcpy(createReq.name, smaname);
strcpy(createReq.stb, stbname); strcpy(createReq.stb, stbname);
...@@ -166,7 +175,7 @@ void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, in ...@@ -166,7 +175,7 @@ void* MndTestSma::BuildCreateSmaReq(const char* smaname, const char* stbname, in
return pHead; return pHead;
} }
void* MndTestSma::BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen) { void* MndTestSma::BuildDropTSmaReq(const char* smaname, int8_t igNotExists, int32_t* pContLen) {
SMDropSmaReq dropsmaReq = {0}; SMDropSmaReq dropsmaReq = {0};
dropsmaReq.igNotExists = igNotExists; dropsmaReq.igNotExists = igNotExists;
strcpy(dropsmaReq.name, smaname); strcpy(dropsmaReq.name, smaname);
...@@ -179,6 +188,7 @@ void* MndTestSma::BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32 ...@@ -179,6 +188,7 @@ void* MndTestSma::BuildDropSmaReq(const char* smaname, int8_t igNotExists, int32
return pReq; return pReq;
} }
#if 0
TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
const char* dbname = "1.d1"; const char* dbname = "1.d1";
const char* stbname = "1.d1.stb"; const char* stbname = "1.d1.stb";
...@@ -203,7 +213,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -203,7 +213,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
} }
#if 0 #if 0
{ {
pReq = BuildCreateSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen); pReq = BuildCreateTSmaReq(smaname, stbname, 0, "expr", "tagsFilter", "sql", "ast", &contLen);
pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen); pRsp = test.SendReq(TDMT_MND_CREATE_SMA, pReq, contLen);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname); test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname);
...@@ -226,7 +236,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -226,7 +236,7 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
} }
{ {
pReq = BuildDropSmaReq(smaname, 0, &contLen); pReq = BuildDropTSmaReq(smaname, 0, &contLen);
pRsp = test.SendReq(TDMT_MND_DROP_SMA, pReq, contLen); pRsp = test.SendReq(TDMT_MND_DROP_SMA, pReq, contLen);
ASSERT_EQ(pRsp->code, 0); ASSERT_EQ(pRsp->code, 0);
test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname); test.SendShowMetaReq(TSDB_MGMT_TABLE_INDEX, dbname);
...@@ -235,3 +245,51 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) { ...@@ -235,3 +245,51 @@ TEST_F(MndTestSma, 01_Create_Show_Meta_Drop_Restart_Stb) {
} }
#endif #endif
} }
#endif
TEST_F(MndTestSma, 02_Create_Show_Meta_Drop_Restart_BSma) {
const char* dbname = "1.d1";
const char* stbname = "1.d1.bsmastb";
int32_t contLen = 0;
void* pReq;
SRpcMsg* pRsp;
{
pReq = BuildCreateDbReq(dbname, &contLen);
pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
ASSERT_EQ(pRsp->code, 0);
}
{
pReq = BuildCreateBSmaStbReq(stbname, &contLen);
pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, 0);
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 1);
CheckBinary("bsmastb", TSDB_TABLE_NAME_LEN);
}
test.Restart();
{
pReq = BuildCreateBSmaStbReq(stbname, &contLen);
pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_STB_ALREADY_EXIST);
}
{
pReq = BuildDropStbReq(stbname, &contLen);
pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, 0);
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
test.SendShowRetrieveReq();
EXPECT_EQ(test.GetShowRows(), 0);
}
{
pReq = BuildDropStbReq(stbname, &contLen);
pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_STB_NOT_EXIST);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册