提交 c28e4bca 编写于 作者: H Hongze Cheng

refact meta 8

上级 12319320
......@@ -1521,45 +1521,6 @@ typedef struct SVCreateStbRsp {
} SVCreateStbRsp;
typedef struct SVCreateTbReq {
char* name;
uint32_t ttl;
uint32_t keep;
union {
uint8_t info;
struct {
uint8_t rollup : 1; // 1 means rollup sma
uint8_t type : 7;
union {
struct {
tb_uid_t suid;
col_id_t nCols;
col_id_t nBSmaCols;
SSchema* pSchema;
col_id_t nTagCols;
SSchema* pTagSchema;
SRSmaParam* pRSmaParam;
} stbCfg;
struct {
tb_uid_t suid;
SKVRow pTag;
} ctbCfg;
struct {
int16_t nCols;
SSchema* pSchema;
} ntbCfg;
} SVCreateTbReq, SVUpdateTbReq;
typedef struct {
int32_t code;
} SVCreateTbRsp, SVUpdateTbRsp;
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
typedef struct SVCreateTbReq2 {
tb_uid_t uid;
int64_t ctime;
const char* name;
......@@ -1576,18 +1537,28 @@ typedef struct SVCreateTbReq2 {
SSchema* pSchema;
} ntb;
} SVCreateTbReq2;
} SVCreateTbReq;
int tEncodeSVCreateTbReq2(SCoder* pCoder, const SVCreateTbReq2* pReq);
int tDecodeSVCreateTbReq2(SCoder* pCoder, SVCreateTbReq2* pReq);
int tEncodeSVCreateTbReq(SCoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SCoder* pCoder, SVCreateTbReq* pReq);
typedef struct {
int64_t ver; // use a general definition
int32_t nReqs;
union {
SVCreateTbReq* pReqs;
SArray* pArray;
} SVCreateTbBatchReq;
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
int tEncodeSVCreateTbBatchReq(SCoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SCoder* pCoder, SVCreateTbBatchReq* pReq);
typedef struct {
int32_t code;
} SVCreateTbRsp, SVUpdateTbRsp;
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
typedef struct {
SArray* rspList; // SArray<SVCreateTbRsp>
......@@ -398,179 +398,6 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
return 0;
int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedU32(buf, pReq->ttl);
tlen += taosEncodeFixedU32(buf, pReq->keep);
tlen += taosEncodeFixedU8(buf, pReq->type);
// tlen += taosEncodeFixedU8(buf, pReq->info);
switch (pReq->type) {
tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nCols);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols);
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].flags);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pSchema[i].name);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols);
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].flags);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
if (pReq->rollup && pReq->stbCfg.pRSmaParam) {
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
tlen += taosEncodeBinary(buf, (const void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
tlen += taosEncodeFixedI32(buf, param->delay);
tlen += taosEncodeFixedI8(buf, param->nFuncIds);
for (int8_t i = 0; i < param->nFuncIds; ++i) {
tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]);
tlen += taosEncodeFixedI32(buf, param->qmsg1Len);
if (param->qmsg1Len > 0) {
tlen += taosEncodeString(buf, param->qmsg1);
tlen += taosEncodeFixedI32(buf, param->qmsg2Len);
if (param->qmsg2Len > 0) {
tlen += taosEncodeString(buf, param->qmsg2);
tlen += taosEncodeFixedI64(buf, pReq->ctbCfg.suid);
tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag);
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nCols);
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->ntbCfg.pSchema[i].flags);
tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name);
return tlen;
void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeFixedU32(buf, &(pReq->ttl));
buf = taosDecodeFixedU32(buf, &(pReq->keep));
// buf = taosDecodeFixedU8(buf, &pReq->type);
buf = taosDecodeFixedU8(buf, &(pReq->info));
switch (pReq->type) {
buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nCols));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols));
pReq->stbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nCols * sizeof(SSchema));
for (col_id_t i = 0; i < pReq->stbCfg.nCols; ++i) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].type));
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pSchema[i].flags));
buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.pSchema[i].colId));
buf = taosDecodeFixedI32(buf, &(pReq->stbCfg.pSchema[i].bytes));
buf = taosDecodeStringTo(buf, pReq->stbCfg.pSchema[i].name);
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.nTagCols);
pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema));
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type));
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].flags));
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
if (pReq->rollup) {
pReq->stbCfg.pRSmaParam = (SRSmaParam *)taosMemoryCalloc(1, sizeof(SRSmaParam));
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
buf = taosDecodeBinaryTo(buf, (void *)&param->xFilesFactor, sizeof(param->xFilesFactor));
buf = taosDecodeFixedI32(buf, &param->delay);
buf = taosDecodeFixedI8(buf, &param->nFuncIds);
if (param->nFuncIds > 0) {
param->pFuncIds = (func_id_t *)taosMemoryCalloc(param->nFuncIds, sizeof(func_id_t));
for (int8_t i = 0; i < param->nFuncIds; ++i) {
buf = taosDecodeFixedI32(buf, param->pFuncIds + i);
buf = taosDecodeFixedI32(buf, &param->qmsg1Len);
if (param->qmsg1Len > 0) {
buf = taosDecodeString(buf, &param->qmsg1);
buf = taosDecodeFixedI32(buf, &param->qmsg2Len);
if (param->qmsg2Len > 0) {
buf = taosDecodeString(buf, &param->qmsg2);
} else {
pReq->stbCfg.pRSmaParam = NULL;
buf = taosDecodeFixedI64(buf, &pReq->ctbCfg.suid);
buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag);
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.nCols);
pReq->ntbCfg.pSchema = (SSchema *)taosMemoryMalloc(pReq->ntbCfg.nCols * sizeof(SSchema));
for (col_id_t i = 0; i < pReq->ntbCfg.nCols; ++i) {
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].type);
buf = taosDecodeFixedI8(buf, &pReq->ntbCfg.pSchema[i].flags);
buf = taosDecodeFixedI16(buf, &pReq->ntbCfg.pSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name);
return buf;
int32_t tSerializeSVCreateTbBatchReq(void **buf, SVCreateTbBatchReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->ver);
tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray));
for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i);
tlen += tSerializeSVCreateTbReq(buf, pCreateTbReq);
return tlen;
void *tDeserializeSVCreateTbBatchReq(void *buf, SVCreateTbBatchReq *pReq) {
uint32_t nsize = 0;
buf = taosDecodeFixedI64(buf, &pReq->ver);
buf = taosDecodeFixedU32(buf, &nsize);
pReq->pArray = taosArrayInit(nsize, sizeof(SVCreateTbReq));
for (size_t i = 0; i < nsize; i++) {
SVCreateTbReq req = {0};
buf = tDeserializeSVCreateTbReq(buf, &req);
taosArrayPush(pReq->pArray, &req);
return buf;
int32_t tSerializeSVDropTbReq(void **buf, SVDropTbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->ver);
......@@ -3545,48 +3372,49 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR
int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (pRsp->rspList) {
int32_t num = taosArrayGetSize(pRsp->rspList);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SVCreateTbRsp *rsp = taosArrayGet(pRsp->rspList, i);
if (tEncodeI32(&encoder, rsp->code) < 0) return -1;
} else {
if (tEncodeI32(&encoder, 0) < 0) return -1;
// SCoder encoder = {0};
// tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
// if (tStartEncode(&encoder) < 0) return -1;
// if (pRsp->rspList) {
// int32_t num = taosArrayGetSize(pRsp->rspList);
// if (tEncodeI32(&encoder, num) < 0) return -1;
// for (int32_t i = 0; i < num; ++i) {
// SVCreateTbRsp *rsp = taosArrayGet(pRsp->rspList, i);
// if (tEncodeI32(&encoder, rsp->code) < 0) return -1;
// }
// } else {
// if (tEncodeI32(&encoder, 0) < 0) return -1;
// }
// tEndEncode(&encoder);
int32_t tlen = encoder.pos;
return tlen;
// int32_t tlen = encoder.pos;
// tCoderClear(&encoder);
// reture tlen;
return 0;
int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) {
SCoder decoder = {0};
int32_t num = 0;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
pRsp->rspList = taosArrayInit(num, sizeof(SVCreateTbRsp));
if (NULL == pRsp->rspList) return -1;
for (int32_t i = 0; i < num; ++i) {
SVCreateTbRsp rsp = {0};
if (tDecodeI32(&decoder, &rsp.code) < 0) return -1;
if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1;
} else {
pRsp->rspList = NULL;
// SCoder decoder = {0};
// int32_t num = 0;
// tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
// if (tStartDecode(&decoder) < 0) return -1;
// if (tDecodeI32(&decoder, &num) < 0) return -1;
// if (num > 0) {
// pRsp->rspList = taosArrayInit(num, sizeof(SVCreateTbRsp));
// if (NULL == pRsp->rspList) return -1;
// for (int32_t i = 0; i < num; ++i) {
// SVCreateTbRsp rsp = {0};
// if (tDecodeI32(&decoder, &rsp.code) < 0) return -1;
// if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1;
// }
// } else {
// pRsp->rspList = NULL;
// }
// tEndDecode(&decoder);
// tCoderClear(&decoder);
return 0;
......@@ -3765,7 +3593,7 @@ STSchema *tdGetSTSChemaFromSSChema(SSchema **pSchema, int32_t nCols) {
return pNSchema;
int tEncodeSVCreateTbReq2(SCoder *pCoder, const SVCreateTbReq2 *pReq) {
int tEncodeSVCreateTbReq(SCoder *pCoder, const SVCreateTbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->uid) < 0) return -1;
......@@ -3793,7 +3621,7 @@ int tEncodeSVCreateTbReq2(SCoder *pCoder, const SVCreateTbReq2 *pReq) {
return 0;
int tDecodeSVCreateTbReq2(SCoder *pCoder, SVCreateTbReq2 *pReq) {
int tDecodeSVCreateTbReq(SCoder *pCoder, SVCreateTbReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->uid) < 0) return -1;
......@@ -3821,3 +3649,29 @@ int tDecodeSVCreateTbReq2(SCoder *pCoder, SVCreateTbReq2 *pReq) {
return 0;
int tEncodeSVCreateTbBatchReq(SCoder *pCoder, const SVCreateTbBatchReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, taosArrayGetSize(pReq->pArray)) < 0) return -1;
for (int iReq = 0; iReq < pReq->nReqs; iReq++) {
if (tEncodeSVCreateTbReq(pCoder, (SVCreateTbReq *)taosArrayGet(pReq->pArray, iReq)) < 0) return -1;
return 0;
int tDecodeSVCreateTbBatchReq(SCoder *pCoder, SVCreateTbBatchReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->nReqs) < 0) return -1;
pReq->pReqs = (SVCreateTbReq *)TCODER_MALLOC(pCoder, sizeof(SVCreateTbReq) * pReq->nReqs);
if (pReq->pReqs == NULL) return -1;
for (int iReq = 0; iReq < pReq->nReqs; iReq++) {
if (tDecodeSVCreateTbReq(pCoder, pReq->pReqs + iReq) < 0) return -1;
return 0;
\ No newline at end of file
......@@ -93,7 +93,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
tb_uid_t quid;
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid);
if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid;
quid = pTbCfg->ctb.suid;
} else {
quid = pHandle->pBlock->uid;
......@@ -245,6 +245,7 @@ _err:
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp) {
#if 0
SVCreateTbBatchReq vCreateTbBatchReq = {0};
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
......@@ -302,6 +303,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
pRsp->contLen = contLen;
return 0;
......@@ -22,6 +22,7 @@
#include "ttime.h"
#include "ttypes.h"
// clang-format off
#define NEXT_TOKEN(pSql, sToken) \
do { \
int32_t index = 0; \
......@@ -769,8 +770,8 @@ static int32_t buildCreateTbReq(SVCreateTbReq *pTbReq, const SName* pName, SKVRo
tNameGetFullDbName(pName, dbFName);
pTbReq->type = TD_CHILD_TABLE;
pTbReq->name = strdup(pName->tname);
pTbReq->ctbCfg.suid = suid;
pTbReq->ctbCfg.pTag = row;
pTbReq->ctb.suid = suid;
pTbReq->ctb.pTag = row;
......@@ -1008,7 +1009,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
void destroyCreateSubTbReq(SVCreateTbReq* pReq) {
static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
......@@ -12,7 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
// clang-format off
#include "parInsertData.h"
#include "catalog.h"
......@@ -157,7 +157,11 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq) {
int32_t len = tSerializeSVCreateTbReq(NULL, pCreateTbReq);
SCoder coder = {0};
char* pBuf;
int32_t len;
tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len);
if (pBlocks->nAllocSize - pBlocks->size < len) {
pBlocks->nAllocSize += len + pBlocks->rowSize;
char* pTmp = taosMemoryRealloc(pBlocks->pData, pBlocks->nAllocSize);
......@@ -169,8 +173,13 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq)
char* pBuf = pBlocks->pData + pBlocks->size;
tSerializeSVCreateTbReq((void**)&pBuf, pCreateTbReq);
pBuf= pBlocks->pData + pBlocks->size;
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, len, TD_ENCODER);
tEncodeSVCreateTbReq(&coder, pCreateTbReq);
pBlocks->size += len;
pBlocks->createTbReqLen = len;
......@@ -190,7 +199,7 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3
return ret;
if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctbCfg.pTag) {
if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctb.pTag) {
ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
......@@ -562,8 +562,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
} else if (nodesIsComparisonOp(pOp)) {
if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type ||
TSDB_DATA_TYPE_BLOB == rdt.type) {
if (TSDB_DATA_TYPE_BLOB == ldt.type || TSDB_DATA_TYPE_JSON == rdt.type || TSDB_DATA_TYPE_BLOB == rdt.type) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
......@@ -571,7 +570,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
} else if (nodesIsJsonOp(pOp)){
} else if (nodesIsJsonOp(pOp)) {
if (TSDB_DATA_TYPE_JSON != ldt.type || TSDB_DATA_TYPE_BINARY != rdt.type) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
......@@ -590,7 +589,9 @@ static EDealRes haveAggFunction(SNode* pNode, void* pContext) {
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
.pRpc = pCxt->pParseCxt->pTransporter,
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(&param, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
......@@ -1908,7 +1909,7 @@ static int32_t checkTableTags(STranslateContext* pCxt, SCreateTableStmt* pStmt)
SNode* pNode;
FOREACH(pNode, pStmt->pTags) {
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
if(pCol->dataType.type == TSDB_DATA_TYPE_JSON && LIST_LENGTH(pStmt->pTags) > 1){
if (pCol->dataType.type == TSDB_DATA_TYPE_JSON && LIST_LENGTH(pStmt->pTags) > 1) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_ONE_JSON_TAG);
......@@ -1924,7 +1925,9 @@ static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION);
SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0);
SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
.pRpc = pCxt->pParseCxt->pTransporter,
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet};
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(&param, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName);
......@@ -2053,7 +2056,7 @@ static SNode* makeIntervalVal(SRetention* pRetension, int8_t precision) {
int64_t timeVal = convertTimeFromPrecisionToUnit(pRetension->freq, precision, pRetension->freqUnit);
char buf[20] = {0};
int32_t len = snprintf(buf, sizeof(buf), "%"PRId64"%c", timeVal, pRetension->freqUnit);
int32_t len = snprintf(buf, sizeof(buf), "%" PRId64 "%c", timeVal, pRetension->freqUnit);
pVal->literal = strndup(buf, len);
if (NULL == pVal->literal) {
......@@ -2138,8 +2141,8 @@ static STableMeta* createRollupTableMeta(SCreateTableStmt* pStmt, int8_t precisi
return pMeta;
static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt,
SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, SSampleAstInfo* pInfo) {
static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, SRetention* pRetension,
int8_t precision, SSampleAstInfo* pInfo) {
pInfo->pDbName = pStmt->dbName;
pInfo->pTableName = pStmt->tableName;
pInfo->pFuncs = createRollupFuncs(pStmt);
......@@ -2151,8 +2154,8 @@ static int32_t buildSampleAstInfoByTable(STranslateContext* pCxt,
static int32_t getRollupAst(STranslateContext* pCxt,
SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision, char** pAst, int32_t* pLen) {
static int32_t getRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SRetention* pRetension, int8_t precision,
char** pAst, int32_t* pLen) {
SSampleAstInfo info = {0};
int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info);
if (TSDB_CODE_SUCCESS == code) {
......@@ -2170,11 +2173,11 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
return code;
for (int32_t i = 1; i < num; ++i) {
SRetention *pRetension = taosArrayGet(dbCfg.pRetensions, i);
SRetention* pRetension = taosArrayGet(dbCfg.pRetensions, i);
STranslateContext cxt = {0};
initTranslateContext(pCxt->pParseCxt, &cxt);
code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision,
1 == i ? &pReq->pAst1 : &pReq->pAst2, 1 == i ? &pReq->ast1Len : &pReq->ast2Len);
code = getRollupAst(&cxt, pStmt, pRetension, dbCfg.precision, 1 == i ? &pReq->pAst1 : &pReq->pAst2,
1 == i ? &pReq->ast1Len : &pReq->ast2Len);
if (TSDB_CODE_SUCCESS != code) {
......@@ -2712,7 +2715,7 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt
static int32_t readFromFile(char* pName, int32_t *len, char **buf) {
static int32_t readFromFile(char* pName, int32_t* len, char** buf) {
int64_t filesize = 0;
if (taosStatFile(pName, &filesize, NULL) < 0) {
return TAOS_SYSTEM_ERROR(errno);
......@@ -3155,7 +3158,7 @@ typedef struct SVgroupTablesBatch {
static void destroyCreateTbReq(SVCreateTbReq* pReq) {
static int32_t buildSmaParam(STableOptions* pOptions, SVCreateTbReq* pReq) {
......@@ -3193,16 +3196,17 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
SVCreateTbReq req = {0};
req.type = TD_NORMAL_TABLE;
req.name = strdup(pStmt->tableName);
req.ntbCfg.nCols = LIST_LENGTH(pStmt->pCols);
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchema));
if (NULL == req.name || NULL == req.ntbCfg.pSchema) {
req.ntb.nCols = LIST_LENGTH(pStmt->pCols);
req.ntb.sver = 0;
req.ntb.pSchema = taosMemoryCalloc(req.ntb.nCols, sizeof(SSchema));
if (NULL == req.name || NULL == req.ntb.pSchema) {
SNode* pCol;
col_id_t index = 0;
FOREACH(pCol, pStmt->pCols) {
toSchema((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
toSchema((SColumnDefNode*)pCol, index + 1, req.ntb.pSchema + index);
if (TSDB_CODE_SUCCESS != buildSmaParam(pStmt->pOptions, &req)) {
......@@ -3223,7 +3227,11 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
int tlen;
SCoder coder = {0};
tEncodeSize(tEncodeSVCreateTbBatchReq, &pTbBatch->req, tlen);
tlen += sizeof(SMsgHead); //+ tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
void* buf = taosMemoryMalloc(tlen);
if (NULL == buf) {
......@@ -3231,7 +3239,10 @@ static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray*
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSerializeSVCreateTbBatchReq(&pBuf, &(pTbBatch->req));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, tlen - sizeof(SMsgHead), TD_ENCODER);
tEncodeSVCreateTbBatchReq(&coder, &pTbBatch->req);
SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) {
......@@ -3253,9 +3264,9 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
if (pTableReq->type == TSDB_NORMAL_TABLE) {
} else if (pTableReq->type == TSDB_CHILD_TABLE) {
......@@ -3336,10 +3347,11 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE;
req.name = strdup(pTableName);
req.ctbCfg.suid = suid;
req.ctbCfg.pTag = row;
req.ctb.suid = suid;
req.ctb.pTag = row;
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
#if 0
if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0};
tBatch.info = *pVgInfo;
......@@ -3352,12 +3364,13 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, c
} else { // add to the correct vgroup
taosArrayPush(pTableBatch->req.pArray, &req);
static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema,
SKVRowBuilder* pBuilder) {
if(pSchema->type == TSDB_DATA_TYPE_JSON){
if(pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
if (pSchema->type == TSDB_DATA_TYPE_JSON) {
if (pVal->literal && strlen(pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pVal->literal);
......@@ -3368,10 +3381,11 @@ static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SS
return pCxt->errCode;
if(pVal->node.resType.type == TSDB_DATA_TYPE_NULL){
if (pVal->node.resType.type == TSDB_DATA_TYPE_NULL) {
// todo
tdAddColToKVRow(pBuilder, pSchema->colId, &(pVal->datum.p), IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
} else {
tdAddColToKVRow(pBuilder, pSchema->colId, &(pVal->datum.p),
IS_VAR_DATA_TYPE(pSchema->type) ? varDataTLen(pVal->datum.p) : TYPE_BYTES[pSchema->type]);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册