提交 c1db8a9e 编写于 作者: H Hongze Cheng

refact code

上级 55105797
......@@ -70,9 +70,6 @@ int32_t tBufferInit(SBuffer *pBuffer, int64_t size);
int32_t tBufferPut(SBuffer *pBuffer, const void *pData, int64_t nData);
int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData);
// STSchema ================================
void tDestroyTSchema(STSchema *pTSchema);
// SColVal ================================
#define CV_FLAG_VALUE ((int8_t)0x0)
#define CV_FLAG_NONE ((int8_t)0x1)
......@@ -225,23 +222,9 @@ struct STag {
memcpy(varDataVal(x), (str), (_size)); \
} while (0);
// ----------------- SCHEMA BUILDER DEFINITION
typedef struct {
int32_t tCols;
int32_t nCols;
schema_ver_t version;
uint16_t flen;
int32_t tlen;
STColumn *columns;
} STSchemaBuilder;
int32_t tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version);
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes);
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
// STSchema ================================
STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version);
void tDestroyTSchema(STSchema *pTSchema);
#endif
......
......@@ -470,8 +470,6 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
return 0;
}
STSchema* tdGetSTSChemaFromSSChema(SSchema* pSchema, int32_t nCols, int32_t sver);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
......@@ -1620,14 +1618,14 @@ typedef struct SSubQueryMsg {
int8_t explain;
int8_t needFetch;
uint32_t sqlLen;
char *sql;
char* sql;
uint32_t msgLen;
char *msg;
char* msg;
} SSubQueryMsg;
int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq);
void tFreeSSubQueryMsg(SSubQueryMsg *pReq);
int32_t tSerializeSSubQueryMsg(void* buf, int32_t bufLen, SSubQueryMsg* pReq);
int32_t tDeserializeSSubQueryMsg(void* buf, int32_t bufLen, SSubQueryMsg* pReq);
void tFreeSSubQueryMsg(SSubQueryMsg* pReq);
typedef struct {
SMsgHead header;
......@@ -1666,9 +1664,8 @@ typedef struct {
int32_t execId;
} SResFetchReq;
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq);
int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq);
int32_t tSerializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq);
int32_t tDeserializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq);
typedef struct {
SMsgHead header;
......@@ -1741,12 +1738,11 @@ typedef struct {
int32_t execId;
} STaskDropReq;
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq);
int32_t tDeserializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq);
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);
int32_t tSerializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq);
int32_t tDeserializeSTaskDropReq(void* buf, int32_t bufLen, STaskDropReq* pReq);
int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
typedef struct {
int32_t code;
......@@ -2951,9 +2947,8 @@ typedef struct {
STqOffsetVal reqOffset;
} SMqPollReq;
int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq);
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq);
int32_t tSerializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
int32_t tDeserializeSMqPollReq(void* buf, int32_t bufLen, SMqPollReq* pReq);
typedef struct {
int32_t vgId;
......@@ -3189,7 +3184,7 @@ typedef struct {
typedef struct {
SMsgHead header;
SArray* pMsgs; //SArray<SBatchMsg>
SArray* pMsgs; // SArray<SBatchMsg>
} SBatchReq;
typedef struct {
......@@ -3201,11 +3196,11 @@ typedef struct {
} SBatchRspMsg;
typedef struct {
SArray* pRsps; //SArray<SBatchRspMsg>
SArray* pRsps; // SArray<SBatchRspMsg>
} SBatchRsp;
int32_t tSerializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq);
int32_t tDeserializeSBatchReq(void *buf, int32_t bufLen, SBatchReq *pReq);
int32_t tSerializeSBatchReq(void* buf, int32_t bufLen, SBatchReq* pReq);
int32_t tDeserializeSBatchReq(void* buf, int32_t bufLen, SBatchReq* pReq);
static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) {
if (NULL == msg) {
return;
......@@ -3214,8 +3209,8 @@ static FORCE_INLINE void tFreeSBatchReqMsg(void* msg) {
taosMemoryFree(pMsg->msg);
}
int32_t tSerializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp);
int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp);
int32_t tSerializeSBatchRsp(void* buf, int32_t bufLen, SBatchRsp* pRsp);
int32_t tDeserializeSBatchRsp(void* buf, int32_t bufLen, SBatchRsp* pRsp);
static FORCE_INLINE void tFreeSBatchRspMsg(void* p) {
if (NULL == p) {
......@@ -3226,11 +3221,10 @@ static FORCE_INLINE void tFreeSBatchRspMsg(void* p) {
taosMemoryFree(pRsp->msg);
}
int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq);
int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq);
int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq);
int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq);
int32_t tSerializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq);
int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
#pragma pack(pop)
......
......@@ -1086,91 +1086,6 @@ void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid) {
tPutI16v(p + offset, cid);
}
#if 1 // ===================================================================================================================
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
if (pBuilder == NULL) return -1;
pBuilder->tCols = 256;
pBuilder->columns = (STColumn *)taosMemoryMalloc(sizeof(STColumn) * pBuilder->tCols);
if (pBuilder->columns == NULL) return -1;
tdResetTSchemaBuilder(pBuilder, version);
return 0;
}
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
if (pBuilder) {
taosMemoryFreeClear(pBuilder->columns);
}
}
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) {
pBuilder->nCols = 0;
pBuilder->tlen = 0;
pBuilder->flen = 0;
pBuilder->version = version;
}
int32_t tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int8_t flags, col_id_t colId, col_bytes_t bytes) {
if (!isValidDataType(type)) return -1;
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
STColumn *columns = (STColumn *)taosMemoryRealloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
if (columns == NULL) return -1;
pBuilder->columns = columns;
}
STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
pCol->type = type;
pCol->colId = colId;
pCol->flags = flags;
if (pBuilder->nCols == 0) {
pCol->offset = -1;
} else {
pCol->offset = pBuilder->flen;
pBuilder->flen += TYPE_BYTES[type];
}
if (IS_VAR_DATA_TYPE(type)) {
pCol->bytes = bytes;
pBuilder->tlen += (TYPE_BYTES[type] + bytes);
} else {
pCol->bytes = TYPE_BYTES[type];
pBuilder->tlen += TYPE_BYTES[type];
}
pBuilder->nCols++;
ASSERT(pCol->offset < pBuilder->flen);
return 0;
}
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
if (pBuilder->nCols <= 0) return NULL;
int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;
STSchema *pSchema = (STSchema *)taosMemoryMalloc(tlen);
if (pSchema == NULL) return NULL;
pSchema->version = pBuilder->version;
pSchema->numOfCols = pBuilder->nCols;
pSchema->tlen = pBuilder->tlen;
pSchema->flen = pBuilder->flen;
#ifdef TD_SUPPORT_BITMAP
pSchema->tlen += (int)TD_BITMAP_BYTES(pSchema->numOfCols);
#endif
memcpy(&pSchema->columns[0], pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);
return pSchema;
}
#endif
STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
STSchema *pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
if (pTSchema == NULL) return NULL;
......@@ -1184,7 +1099,7 @@ STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
pTSchema->columns[0].colId = aSchema[0].colId;
pTSchema->columns[0].type = aSchema[0].type;
pTSchema->columns[0].flags = aSchema[0].flags;
pTSchema->columns[0].bytes = aSchema[0].bytes;
pTSchema->columns[0].bytes = TYPE_BYTES[aSchema[0].type];
pTSchema->columns[0].offset = -1;
// other columns
......@@ -1195,12 +1110,23 @@ STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
pTColumn->colId = pSchema->colId;
pTColumn->type = pSchema->type;
pTColumn->flags = pSchema->flags;
pTColumn->bytes = pSchema->bytes;
pTColumn->offset = pTSchema->flen;
if (IS_VAR_DATA_TYPE(pSchema->type)) {
pTColumn->bytes = pSchema->bytes;
pTSchema->tlen += (TYPE_BYTES[pSchema->type] + pSchema->bytes); // todo: remove
} else {
pTColumn->bytes = TYPE_BYTES[pSchema->type];
pTSchema->tlen += TYPE_BYTES[pSchema->type]; // todo: remove
}
pTSchema->flen += TYPE_BYTES[pTColumn->type];
}
#if 1 // todo : remove this
pTSchema->tlen += (int32_t)TD_BITMAP_BYTES(numOfCols);
#endif
return pTSchema;
}
......
......@@ -4578,7 +4578,6 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) {
return 0;
}
int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......@@ -4666,7 +4665,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
if (tEncodeU32(&encoder, pReq->sqlLen) < 0) return -1;
if (tEncodeCStrWithLen(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1;
if (tEncodeU32(&encoder, pReq->msgLen) < 0) return -1;
if (tEncodeBinary(&encoder, (uint8_t*)pReq->msg, pReq->msgLen) < 0) return -1;
if (tEncodeBinary(&encoder, (uint8_t *)pReq->msg, pReq->msgLen) < 0) return -1;
tEndEncode(&encoder);
......@@ -4706,7 +4705,7 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq)
if (tDecodeU32(&decoder, &pReq->sqlLen) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1;
if (tDecodeU32(&decoder, &pReq->msgLen) < 0) return -1;
if (tDecodeBinaryAlloc(&decoder, (void**)&pReq->msg, NULL) < 0) return -1;
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->msg, NULL) < 0) return -1;
tEndDecode(&decoder);
......@@ -4723,7 +4722,6 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) {
taosMemoryFreeClear(pReq->msg);
}
int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
......@@ -4777,7 +4775,6 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq)
return 0;
}
int32_t tSerializeSTqOffsetVal(SEncoder *pEncoder, STqOffsetVal *pOffset) {
if (tEncodeI8(pEncoder, pOffset->type) < 0) return -1;
if (tEncodeI64(pEncoder, pOffset->uid) < 0) return -1;
......@@ -4855,7 +4852,6 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
return 0;
}
int32_t tSerializeSTaskDropReq(void *buf, int32_t bufLen, STaskDropReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
......@@ -4948,7 +4944,6 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR
return 0;
}
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
......@@ -5591,30 +5586,6 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
return 0;
}
STSchema *tdGetSTSChemaFromSSChema(SSchema *pSchema, int32_t nCols, int32_t sver) {
STSchemaBuilder schemaBuilder = {0};
if (tdInitTSchemaBuilder(&schemaBuilder, sver) < 0) {
return NULL;
}
for (int i = 0; i < nCols; i++) {
SSchema *schema = pSchema + i;
if (tdAddColToSchema(&schemaBuilder, schema->type, schema->flags, schema->colId, schema->bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL;
}
}
STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder);
if (pNSchema == NULL) {
tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL;
}
tdDestroyTSchemaBuilder(&schemaBuilder);
return pNSchema;
}
int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
......
......@@ -117,7 +117,7 @@ STSchema *genSTSchema(int16_t nCols) {
}
STSchema *pResult = NULL;
pResult = tdGetSTSChemaFromSSChema(pSchema, nCols, 1);
pResult = tBuildTSchema(pSchema, nCols, 1);
taosMemoryFree(pSchema);
return pResult;
......
......@@ -583,23 +583,14 @@ tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
}
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
// SMetaReader mr = {0};
STSchema *pTSchema = NULL;
SSchemaWrapper *pSW = NULL;
STSchemaBuilder sb = {0};
SSchema *pSchema;
SSchema *pSchema = NULL;
pSW = metaGetTableSchema(pMeta, uid, sver, lock);
if (!pSW) return NULL;
tdInitTSchemaBuilder(&sb, pSW->version);
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
}
pTSchema = tdGetSchemaFromBuilder(&sb);
tdDestroyTSchemaBuilder(&sb);
pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version);
taosMemoryFree(pSW->pSchema);
taosMemoryFree(pSW);
......@@ -680,21 +671,11 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
tdbFree(pData);
// convert
STSchemaBuilder sb = {0};
tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
for (int i = 0; i < pSchemaWrapper->nCols; i++) {
SSchema *pSchema = pSchemaWrapper->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
}
STSchema *pTSchema = tdGetSchemaFromBuilder(&sb);
STSchema *pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
if (pTSchema == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
tdDestroyTSchemaBuilder(&sb);
*ppTSchema = pTSchema;
taosMemoryFree(pSchemaWrapper->pSchema);
......
......@@ -951,7 +951,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
pTask->tbSink.pTSchema =
tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
ASSERT(pTask->tbSink.pTSchema);
}
......
......@@ -1227,7 +1227,7 @@ static int parseOneRow(SInsertParseContext* pCxt, const char** pSql, STableDataB
*pGotRow = true;
#ifdef TD_DEBUG_PRINT_ROW
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(schema, spd->numOfCols, 1);
STSchema* pSTSchema = tBuildTSchema(schema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema);
#endif
......
......@@ -208,7 +208,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
}
tdSRowEnd(pBuilder);
#ifdef TD_DEBUG_PRINT_ROW
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1);
STSchema* pSTSchema = tBuildTSchema(pSchema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema);
#endif
......@@ -287,7 +287,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
}
#ifdef TD_DEBUG_PRINT_ROW
if (rowEnd) {
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1);
STSchema* pSTSchema = tBuildTSchema(pSchema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema);
}
......
......@@ -458,7 +458,7 @@ static int32_t tdBlockRowMerge(STableMeta* pTableMeta, SBlockKeyTuple* pEndKeyTp
if (!(*pBlkRowMerger)->pSchema) {
(*pBlkRowMerger)->pSchema =
tdGetSTSChemaFromSSChema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
tBuildTSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
if (!(*pBlkRowMerger)->pSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册