“5b03ab17057198afa9f6068e2ef6e037dadd9dbc”上不存在“source/libs/catalog/git@gitcode.net:taosdata/tdengine.git”
提交 e99e03ee 编写于 作者: D dapan

Merge branch 'feature/qnode' of https://github.com/taosdata/TDengine into feature/qnode

...@@ -14,6 +14,25 @@ MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR}) ...@@ -14,6 +14,25 @@ MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH}) MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH})
MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH}) MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH})
find_package(Git QUIET)
if(GIT_FOUND AND EXISTS "${PROJECT_SOURCE_DIR}/.git")
# Update submodules as needed
option(GIT_SUBMODULE "Check submodules during build" ON)
if(GIT_SUBMODULE)
message(STATUS "Submodule update")
execute_process(COMMAND ${GIT_EXECUTABLE} submodule update --init --recursive
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
RESULT_VARIABLE GIT_SUBMOD_RESULT)
if(NOT GIT_SUBMOD_RESULT EQUAL "0")
message(WARNING "git submodule update --init --recursive failed with ${GIT_SUBMOD_RESULT}, please checkout submodules")
endif()
endif()
endif()
if(NOT EXISTS "${PROJECT_SOURCE_DIR}/tools/taos-tools/CMakeLists.txt")
message(WARNING "The submodules were not downloaded! GIT_SUBMODULE was turned off or failed. Please update submodules manually if you need build them.")
endif()
if (NOT DEFINED TD_GRANT) if (NOT DEFINED TD_GRANT)
SET(TD_GRANT FALSE) SET(TD_GRANT FALSE)
endif() endif()
...@@ -46,7 +65,7 @@ ENDIF () ...@@ -46,7 +65,7 @@ ENDIF ()
IF (TD_WINDOWS) IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}") MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/W3 /D_WIN32 /vmg") SET(COMMON_FLAGS "/W3 /D_WIN32")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900)) # IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18") # SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
......
...@@ -195,7 +195,7 @@ DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress); ...@@ -195,7 +195,7 @@ DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
#endif #endif
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision, bool dataFormat); DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
/* --------------------------TMQ INTERFACE------------------------------- */ /* --------------------------TMQ INTERFACE------------------------------- */
......
...@@ -45,6 +45,12 @@ enum { ...@@ -45,6 +45,12 @@ enum {
STREAM_TRIGGER__BY_EVENT_TIME, STREAM_TRIGGER__BY_EVENT_TIME,
}; };
typedef enum EStreamType {
STREAM_NORMAL = 1,
STREAM_INVERT,
STREAM_INVALID,
} EStreamType;
typedef struct { typedef struct {
uint32_t numOfTables; uint32_t numOfTables;
SArray* pGroupList; SArray* pGroupList;
...@@ -71,6 +77,7 @@ typedef struct SDataBlockInfo { ...@@ -71,6 +77,7 @@ typedef struct SDataBlockInfo {
int16_t numOfCols; int16_t numOfCols;
int16_t hasVarCol; int16_t hasVarCol;
int32_t capacity; int32_t capacity;
EStreamType type;
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SSDataBlock { typedef struct SSDataBlock {
...@@ -115,8 +122,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks); ...@@ -115,8 +122,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData); void colDataDestroy(SColumnInfoData* pColData);
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols,
// sometimes info.numOfCols != array size
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
......
...@@ -328,11 +328,11 @@ typedef struct { ...@@ -328,11 +328,11 @@ typedef struct {
int8_t alterType; int8_t alterType;
int32_t numOfFields; int32_t numOfFields;
SArray* pFields; SArray* pFields;
} SMAltertbReq; } SMAlterStbReq;
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
void tFreeSMAltertbReq(SMAltertbReq* pReq); void tFreeSMAltertbReq(SMAlterStbReq* pReq);
typedef struct SEpSet { typedef struct SEpSet {
int8_t inUse; int8_t inUse;
...@@ -340,8 +340,8 @@ typedef struct SEpSet { ...@@ -340,8 +340,8 @@ typedef struct SEpSet {
SEp eps[TSDB_MAX_REPLICA]; SEp eps[TSDB_MAX_REPLICA];
} SEpSet; } SEpSet;
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp); int32_t tEncodeSEpSet(SEncoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp); int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp); void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
...@@ -613,8 +613,8 @@ typedef struct { ...@@ -613,8 +613,8 @@ typedef struct {
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp); int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp); int32_t tSerializeSUseDbRspImp(SEncoder* pEncoder, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp); int32_t tDeserializeSUseDbRspImp(SDecoder* pDecoder, SUseDbRsp* pRsp);
void tFreeSUsedbRsp(SUseDbRsp* pRsp); void tFreeSUsedbRsp(SUseDbRsp* pRsp);
typedef struct { typedef struct {
...@@ -1529,8 +1529,8 @@ typedef struct { ...@@ -1529,8 +1529,8 @@ typedef struct {
char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2 char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2
} SRSmaParam; } SRSmaParam;
int32_t tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam); int32_t tEncodeSRSmaParam(SEncoder* pCoder, const SRSmaParam* pRSmaParam);
int32_t tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam); int32_t tDecodeSRSmaParam(SDecoder* pCoder, SRSmaParam* pRSmaParam);
// TDMT_VND_CREATE_STB ============== // TDMT_VND_CREATE_STB ==============
typedef struct SVCreateStbReq { typedef struct SVCreateStbReq {
...@@ -1542,8 +1542,8 @@ typedef struct SVCreateStbReq { ...@@ -1542,8 +1542,8 @@ typedef struct SVCreateStbReq {
SRSmaParam pRSmaParam; SRSmaParam pRSmaParam;
} SVCreateStbReq; } SVCreateStbReq;
int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq); int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq); int tDecodeSVCreateStbReq(SDecoder* pCoder, SVCreateStbReq* pReq);
// TDMT_VND_DROP_STB ============== // TDMT_VND_DROP_STB ==============
typedef struct SVDropStbReq { typedef struct SVDropStbReq {
...@@ -1551,8 +1551,8 @@ typedef struct SVDropStbReq { ...@@ -1551,8 +1551,8 @@ typedef struct SVDropStbReq {
tb_uid_t suid; tb_uid_t suid;
} SVDropStbReq; } SVDropStbReq;
int32_t tEncodeSVDropStbReq(SCoder* pCoder, const SVDropStbReq* pReq); int32_t tEncodeSVDropStbReq(SEncoder* pCoder, const SVDropStbReq* pReq);
int32_t tDecodeSVDropStbReq(SCoder* pCoder, SVDropStbReq* pReq); int32_t tDecodeSVDropStbReq(SDecoder* pCoder, SVDropStbReq* pReq);
#define TD_CREATE_IF_NOT_EXISTS 0x1 #define TD_CREATE_IF_NOT_EXISTS 0x1
typedef struct SVCreateTbReq { typedef struct SVCreateTbReq {
...@@ -1564,8 +1564,8 @@ typedef struct SVCreateTbReq { ...@@ -1564,8 +1564,8 @@ typedef struct SVCreateTbReq {
int8_t type; int8_t type;
union { union {
struct { struct {
tb_uid_t suid; tb_uid_t suid;
const void* pTag; const uint8_t* pTag;
} ctb; } ctb;
struct { struct {
SSchemaWrapper schema; SSchemaWrapper schema;
...@@ -1573,8 +1573,8 @@ typedef struct SVCreateTbReq { ...@@ -1573,8 +1573,8 @@ typedef struct SVCreateTbReq {
}; };
} SVCreateTbReq; } SVCreateTbReq;
int tEncodeSVCreateTbReq(SCoder* pCoder, const SVCreateTbReq* pReq); int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SCoder* pCoder, SVCreateTbReq* pReq); int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
typedef struct { typedef struct {
int32_t nReqs; int32_t nReqs;
...@@ -1584,15 +1584,15 @@ typedef struct { ...@@ -1584,15 +1584,15 @@ typedef struct {
}; };
} SVCreateTbBatchReq; } SVCreateTbBatchReq;
int tEncodeSVCreateTbBatchReq(SCoder* pCoder, const SVCreateTbBatchReq* pReq); int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SCoder* pCoder, SVCreateTbBatchReq* pReq); int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
typedef struct { typedef struct {
int32_t code; int32_t code;
} SVCreateTbRsp, SVUpdateTbRsp; } SVCreateTbRsp, SVUpdateTbRsp;
int tEncodeSVCreateTbRsp(SCoder* pCoder, const SVCreateTbRsp* pRsp); int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SCoder* pCoder, SVCreateTbRsp* pRsp); int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp);
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
...@@ -1605,8 +1605,8 @@ typedef struct { ...@@ -1605,8 +1605,8 @@ typedef struct {
}; };
} SVCreateTbBatchRsp; } SVCreateTbBatchRsp;
int tEncodeSVCreateTbBatchRsp(SCoder* pCoder, const SVCreateTbBatchRsp* pRsp); int tEncodeSVCreateTbBatchRsp(SEncoder* pCoder, const SVCreateTbBatchRsp* pRsp);
int tDecodeSVCreateTbBatchRsp(SCoder* pCoder, SVCreateTbBatchRsp* pRsp); int tDecodeSVCreateTbBatchRsp(SDecoder* pCoder, SVCreateTbBatchRsp* pRsp);
int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
...@@ -1629,8 +1629,8 @@ typedef struct { ...@@ -1629,8 +1629,8 @@ typedef struct {
}; };
} SVDropTbBatchReq; } SVDropTbBatchReq;
int32_t tEncodeSVDropTbBatchReq(SCoder* pCoder, const SVDropTbBatchReq* pReq); int32_t tEncodeSVDropTbBatchReq(SEncoder* pCoder, const SVDropTbBatchReq* pReq);
int32_t tDecodeSVDropTbBatchReq(SCoder* pCoder, SVDropTbBatchReq* pReq); int32_t tDecodeSVDropTbBatchReq(SDecoder* pCoder, SVDropTbBatchReq* pReq);
typedef struct { typedef struct {
int32_t nRsps; int32_t nRsps;
...@@ -1640,8 +1640,8 @@ typedef struct { ...@@ -1640,8 +1640,8 @@ typedef struct {
}; };
} SVDropTbBatchRsp; } SVDropTbBatchRsp;
int32_t tEncodeSVDropTbBatchRsp(SCoder* pCoder, const SVDropTbBatchRsp* pRsp); int32_t tEncodeSVDropTbBatchRsp(SEncoder* pCoder, const SVDropTbBatchRsp* pRsp);
int32_t tDecodeSVDropTbBatchRsp(SCoder* pCoder, SVDropTbBatchRsp* pRsp); int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
...@@ -1823,14 +1823,14 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { ...@@ -1823,14 +1823,14 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp); int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp); int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) { static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) {
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1; if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1; if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1; if (tEncodeBinary(pEncoder, (uint8_t*)pKv->value, pKv->valueLen) < 0) return -1;
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) { static FORCE_INLINE int32_t tDecodeSKv(SDecoder* pDecoder, SKv* pKv) {
if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1; if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1;
if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1; if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1;
pKv->value = taosMemoryMalloc(pKv->valueLen + 1); pKv->value = taosMemoryMalloc(pKv->valueLen + 1);
...@@ -1839,13 +1839,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) { ...@@ -1839,13 +1839,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
return 0; return 0;
} }
static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) { static FORCE_INLINE int32_t tEncodeSClientHbKey(SEncoder* pEncoder, const SClientHbKey* pKey) {
if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1; if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1;
if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1; if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1;
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) { static FORCE_INLINE int32_t tDecodeSClientHbKey(SDecoder* pDecoder, SClientHbKey* pKey) {
if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1; if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1;
if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1; if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1;
return 0; return 0;
...@@ -2048,10 +2048,10 @@ typedef struct { ...@@ -2048,10 +2048,10 @@ typedef struct {
int32_t reserved; int32_t reserved;
} SMqCMCommitOffsetRsp; } SMqCMCommitOffsetRsp;
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset); int32_t tEncodeSMqOffset(SEncoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset); int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq); int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq); int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq);
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) { static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
...@@ -2091,7 +2091,7 @@ static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) { ...@@ -2091,7 +2091,7 @@ static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
return (void*)buf; return (void*)buf;
} }
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) { static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1; if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1; if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1; if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
...@@ -2100,7 +2100,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch ...@@ -2100,7 +2100,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) { static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1; if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1; if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1; if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
...@@ -2133,7 +2133,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp ...@@ -2133,7 +2133,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
return (void*)buf; return (void*)buf;
} }
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) { static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1; if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1; if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) { for (int32_t i = 0; i < pSW->nCols; i++) {
...@@ -2143,7 +2143,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchem ...@@ -2143,7 +2143,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchem
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapper* pSW) { static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
...@@ -2595,12 +2595,12 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { ...@@ -2595,12 +2595,12 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
#define TD_AUTO_CREATE_TABLE 0x1 #define TD_AUTO_CREATE_TABLE 0x1
typedef struct { typedef struct {
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
int32_t sver; int32_t sver;
uint64_t nData; uint32_t nData;
const void* pData; const uint8_t* pData;
SVCreateTbReq cTbReq; SVCreateTbReq cTbReq;
} SVSubmitBlk; } SVSubmitBlk;
typedef struct { typedef struct {
...@@ -2612,8 +2612,8 @@ typedef struct { ...@@ -2612,8 +2612,8 @@ typedef struct {
}; };
} SVSubmitReq; } SVSubmitReq;
int32_t tEncodeSVSubmitReq(SCoder* pCoder, const SVSubmitReq* pReq); int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SCoder* pCoder, SVSubmitReq* pReq); int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
#pragma pack(pop) #pragma pack(pop)
......
...@@ -165,15 +165,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t ...@@ -165,15 +165,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
*/ */
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
/**
* release the query handle and decrease the reference count in cache
* @param pMgmt
* @param pQInfo
* @param freeHandle
* @return
*/
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes);
......
...@@ -168,6 +168,9 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin ...@@ -168,6 +168,9 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet); int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -44,7 +44,8 @@ enum { ...@@ -44,7 +44,8 @@ enum {
UDFC_CODE_PIPE_READ_ERR = -2, UDFC_CODE_PIPE_READ_ERR = -2,
UDFC_CODE_CONNECT_PIPE_ERR = -3, UDFC_CODE_CONNECT_PIPE_ERR = -3,
UDFC_CODE_LOAD_UDF_FAILURE = -4, UDFC_CODE_LOAD_UDF_FAILURE = -4,
UDFC_CODE_INVALID_STATE = -5 UDFC_CODE_INVALID_STATE = -5,
UDFC_CODE_NO_PIPE = -6,
}; };
typedef void *UdfcFuncHandle; typedef void *UdfcFuncHandle;
...@@ -140,6 +141,44 @@ typedef int32_t (*TUdfDestroyFunc)(); ...@@ -140,6 +141,44 @@ typedef int32_t (*TUdfDestroyFunc)();
#define UDF_MEMORY_EXP_GROWTH 1.5 #define UDF_MEMORY_EXP_GROWTH 1.5
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
#define udfColDataIsNull_f(pColumn, row) ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
#define udfColDataSetNull_f(pColumn, row) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
} while (0)
#define udfColDataSetNotNull_f(pColumn, r_) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
static FORCE_INLINE char* udfColDataGetData(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
} else {
return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
}
}
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
if (udfColDataIsNull_var(pColumn, row)) {
return true;
}
char* data = udfColDataGetData(pColumn, row);
return (*data == TSDB_DATA_TYPE_NULL);
} else {
return udfColDataIsNull_var(pColumn, row);
}
} else {
return udfColDataIsNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) { static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta; SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData; SUdfColumnData *data = &pColumn->colData;
...@@ -186,17 +225,22 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t ne ...@@ -186,17 +225,22 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t ne
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) { static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
udfColEnsureCapacity(pColumn, row+1);
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
udfColDataSetNull_var(pColumn, row);
} else {
udfColDataSetNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta; SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData; SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1); udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type); bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) { if (isNull) {
if (isVarCol) { udfColDataSetNull(pColumn, currentRow);
data->varLenCol.varOffsets[currentRow] = -1;
} else {
colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow);
}
} else { } else {
if (!isVarCol) { if (!isVarCol) {
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow); colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
......
...@@ -154,8 +154,8 @@ typedef struct { ...@@ -154,8 +154,8 @@ typedef struct {
} SStreamTask; } SStreamTask;
SStreamTask* tNewSStreamTask(int64_t streamId); SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask);
typedef struct { typedef struct {
......
...@@ -36,7 +36,7 @@ typedef struct SUpdateInfo { ...@@ -36,7 +36,7 @@ typedef struct SUpdateInfo {
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts); bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoDestroy(SUpdateInfo *pInfo);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -328,29 +328,32 @@ int32_t* taosGetErrno(); ...@@ -328,29 +328,32 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601) #define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601)
#define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602) #define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602)
#define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603) #define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603)
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0604) #define TSDB_CODE_TDB_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0604)
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0605) #define TSDB_CODE_TDB_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0605)
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0606) #define TSDB_CODE_TDB_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0606)
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0607) #define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0607)
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0608) #define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0608)
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0609) #define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0609)
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060A) #define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x060A)
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060B) #define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x060B)
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060C) #define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x060C)
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x060D) #define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060D)
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x060E) #define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060E)
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x060F) #define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060F)
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0610) #define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x0600)
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0611) #define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x0601)
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0612) #define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x0602)
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613) #define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0613)
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) #define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0614)
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) #define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) #define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617) #define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0617)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618) #define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0618)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0619) #define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x0620) #define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x061A)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061B)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x061C)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x062D)
// query // query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
......
此差异已折叠。
...@@ -1724,13 +1724,13 @@ cleanup: ...@@ -1724,13 +1724,13 @@ cleanup:
* *
*/ */
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision, bool dataFormat) { TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
if(!request){ if(!request){
return NULL; return NULL;
} }
SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, dataFormat); SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, false);
if(!info){ if(!info){
return (TAOS_RES*)request; return (TAOS_RES*)request;
} }
......
...@@ -547,21 +547,21 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -547,21 +547,21 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
req.offsets = (SMqOffset*)offsets->container.pData; req.offsets = (SMqOffset*)offsets->container.pData;
} }
SCoder encoder; SEncoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tEncoderInit(&encoder, NULL, 0);
tEncodeSMqCMCommitOffsetReq(&encoder, &req); tEncodeSMqCMCommitOffsetReq(&encoder, &req);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
void* buf = taosMemoryMalloc(tlen); void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
tCoderClear(&encoder); tEncoderClear(&encoder);
return -1; return -1;
} }
tCoderClear(&encoder); tEncoderClear(&encoder);
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); tEncoderInit(&encoder, buf, tlen);
tEncodeSMqCMCommitOffsetReq(&encoder, &req); tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tCoderClear(&encoder); tEncoderClear(&encoder);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET); pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
if (pRequest == NULL) { if (pRequest == NULL) {
......
此差异已折叠。
...@@ -115,7 +115,10 @@ typedef enum { ...@@ -115,7 +115,10 @@ typedef enum {
TRN_TYPE_STB_SCOPE_END, TRN_TYPE_STB_SCOPE_END,
} ETrnType; } ETrnType;
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; typedef enum {
TRN_POLICY_ROLLBACK = 0,
TRN_POLICY_RETRY = 1,
} ETrnPolicy;
typedef enum { typedef enum {
DND_REASON_ONLINE = 0, DND_REASON_ONLINE = 0,
...@@ -131,6 +134,15 @@ typedef enum { ...@@ -131,6 +134,15 @@ typedef enum {
DND_REASON_OTHERS DND_REASON_OTHERS
} EDndReason; } EDndReason;
typedef enum {
CONSUMER_UPDATE__TOUCH = 1,
CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER,
CONSUMER_UPDATE__MODIFY,
} ECsmUpdateType;
typedef struct { typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
...@@ -386,7 +398,6 @@ typedef struct { ...@@ -386,7 +398,6 @@ typedef struct {
int32_t codeSize; int32_t codeSize;
char* pComment; char* pComment;
char* pCode; char* pCode;
char pData[];
} SFuncObj; } SFuncObj;
typedef struct { typedef struct {
...@@ -425,18 +436,8 @@ typedef struct { ...@@ -425,18 +436,8 @@ typedef struct {
int64_t offset; int64_t offset;
} SMqOffsetObj; } SMqOffsetObj;
static FORCE_INLINE int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset) { int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
int32_t tlen = 0; void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
tlen += taosEncodeString(buf, pOffset->key);
tlen += taosEncodeFixedI64(buf, pOffset->offset);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) {
buf = taosDecodeStringTo(buf, pOffset->key);
buf = taosDecodeFixedI64(buf, &pOffset->offset);
return buf;
}
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
...@@ -459,26 +460,15 @@ typedef struct { ...@@ -459,26 +460,15 @@ typedef struct {
SSchemaWrapper schema; SSchemaWrapper schema;
} SMqTopicObj; } SMqTopicObj;
enum {
CONSUMER_UPDATE__TOUCH = 1,
CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER,
CONSUMER_UPDATE__MODIFY,
};
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
char appId[TSDB_CGROUP_LEN]; char appId[TSDB_CGROUP_LEN];
int8_t updateType; // used only for update int8_t updateType; // used only for update
int32_t epoch; int32_t epoch;
int32_t status; int32_t status;
// hbStatus is not applicable to serialization int32_t hbStatus; // hbStatus is not applicable to serialization
int32_t hbStatus; SRWLatch lock; // lock is used for topics update
// lock is used for topics update
SRWLatch lock;
SArray* currentTopics; // SArray<char*> SArray* currentTopics; // SArray<char*>
SArray* rebNewTopics; // SArray<char*> SArray* rebNewTopics; // SArray<char*>
SArray* rebRemovedTopics; // SArray<char*> SArray* rebRemovedTopics; // SArray<char*>
...@@ -492,7 +482,6 @@ typedef struct { ...@@ -492,7 +482,6 @@ typedef struct {
int64_t upTime; int64_t upTime;
int64_t subscribeTime; int64_t subscribeTime;
int64_t rebalanceTime; int64_t rebalanceTime;
} SMqConsumerObj; } SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
...@@ -581,19 +570,18 @@ typedef struct { ...@@ -581,19 +570,18 @@ typedef struct {
} SMqRebOutputObj; } SMqRebOutputObj;
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
char sourceDb[TSDB_DB_FNAME_LEN]; char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN]; char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN]; char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t createTime; int64_t createTime;
int64_t updateTime; int64_t updateTime;
int64_t uid; int64_t uid;
int64_t dbUid; int64_t dbUid;
int32_t version; int32_t version;
int32_t vgNum; int32_t vgNum;
SRWLatch lock; SRWLatch lock;
int8_t status; int8_t status;
// int32_t sqlLen;
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
int32_t fixedSinkVgId; // 0 for shuffle int32_t fixedSinkVgId; // 0 for shuffle
int64_t smaId; // 0 for unused int64_t smaId; // 0 for unused
...@@ -606,8 +594,8 @@ typedef struct { ...@@ -606,8 +594,8 @@ typedef struct {
SSchemaWrapper outputSchema; SSchemaWrapper outputSchema;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -29,7 +29,8 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser); ...@@ -29,7 +29,8 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
// for trans test // for trans test
SSdbRaw *mndUserActionEncode(SUserObj *pUser); SSdbRaw *mndUserActionEncode(SUserObj *pUser);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen); int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
#include "mndCluster.h" #include "mndCluster.h"
#include "mndShow.h" #include "mndShow.h"
#define TSDB_CLUSTER_VER_NUMBE 1 #define CLUSTER_VER_NUMBE 1
#define TSDB_CLUSTER_RESERVE_SIZE 64 #define CLUSTER_RESERVE_SIZE 64
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster); static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw); static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw);
...@@ -30,14 +30,16 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock ...@@ -30,14 +30,16 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter); static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
int32_t mndInitCluster(SMnode *pMnode) { int32_t mndInitCluster(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CLUSTER, SSdbTable table = {
.keyType = SDB_KEY_INT64, .sdbType = SDB_CLUSTER,
.deployFp = (SdbDeployFp)mndCreateDefaultCluster, .keyType = SDB_KEY_INT64,
.encodeFp = (SdbEncodeFp)mndClusterActionEncode, .deployFp = (SdbDeployFp)mndCreateDefaultCluster,
.decodeFp = (SdbDecodeFp)mndClusterActionDecode, .encodeFp = (SdbEncodeFp)mndClusterActionEncode,
.insertFp = (SdbInsertFp)mndClusterActionInsert, .decodeFp = (SdbDecodeFp)mndClusterActionDecode,
.updateFp = (SdbUpdateFp)mndClusterActionUpdate, .insertFp = (SdbInsertFp)mndClusterActionInsert,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete}; .updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete,
};
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
...@@ -79,19 +81,19 @@ int64_t mndGetClusterId(SMnode *pMnode) { ...@@ -79,19 +81,19 @@ int64_t mndGetClusterId(SMnode *pMnode) {
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, TSDB_CLUSTER_VER_NUMBE, sizeof(SClusterObj) + TSDB_CLUSTER_RESERVE_SIZE); SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + CLUSTER_RESERVE_SIZE);
if (pRaw == NULL) goto CLUSTER_ENCODE_OVER; if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT64(pRaw, dataPos, pCluster->id, CLUSTER_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pCluster->id, _OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, CLUSTER_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, CLUSTER_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
CLUSTER_ENCODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to encode to raw:%p since %s", pCluster->id, pRaw, terrstr()); mError("cluster:%" PRId64 ", failed to encode to raw:%p since %s", pCluster->id, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
...@@ -106,29 +108,29 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { ...@@ -106,29 +108,29 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CLUSTER_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_CLUSTER_VER_NUMBE) { if (sver != CLUSTER_VER_NUMBE) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto CLUSTER_DECODE_OVER; goto _OVER;
} }
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj)); SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
if (pRow == NULL) goto CLUSTER_DECODE_OVER; if (pRow == NULL) goto _OVER;
SClusterObj *pCluster = sdbGetRowObj(pRow); SClusterObj *pCluster = sdbGetRowObj(pRow);
if (pCluster == NULL) goto CLUSTER_DECODE_OVER; if (pCluster == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT64(pRaw, dataPos, &pCluster->id, CLUSTER_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pCluster->id, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, CLUSTER_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, CLUSTER_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_DECODE_OVER) SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
CLUSTER_DECODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr()); mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr());
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
...@@ -161,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { ...@@ -161,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN); int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
if (code != 0) { if (code != 0) {
strcpy(clusterObj.name, "tdengine2.0"); strcpy(clusterObj.name, "tdengine3.0");
mError("failed to get name from system, set to default val %s", clusterObj.name); mError("failed to get name from system, set to default val %s", clusterObj.name);
} }
...@@ -190,8 +192,8 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock ...@@ -190,8 +192,8 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
cols = 0; cols = 0;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pCluster->id, false); colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->id, false);
char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0}; char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->pMeta->pSchemas[cols].bytes);
...@@ -200,7 +202,7 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock ...@@ -200,7 +202,7 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
colDataAppend(pColInfo, numOfRows, buf, false); colDataAppend(pColInfo, numOfRows, buf, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pCluster->createdTime, false); colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false);
sdbRelease(pSdb, pCluster); sdbRelease(pSdb, pCluster);
numOfRows++; numOfRows++;
......
...@@ -13,12 +13,14 @@ ...@@ -13,12 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "mndDef.h" #include "mndDef.h"
#include "mndConsumer.h" #include "mndConsumer.h"
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) { SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj)); SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) { if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -409,7 +411,7 @@ void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) { ...@@ -409,7 +411,7 @@ void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
return (void *)buf; return (void *)buf;
} }
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0; int32_t sz = 0;
/*int32_t outputNameSz = 0;*/ /*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
...@@ -460,7 +462,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -460,7 +462,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
...@@ -515,3 +517,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -515,3 +517,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
#endif #endif
return 0; return 0;
} }
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pOffset->key);
tlen += taosEncodeFixedI64(buf, pOffset->offset);
return tlen;
}
void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
buf = taosDecodeStringTo(buf, pOffset->key);
buf = taosDecodeFixedI64(buf, &pOffset->offset);
return buf;
}
\ No newline at end of file
...@@ -157,8 +157,8 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) { ...@@ -157,8 +157,8 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode; SMnode *pMnode = pMsg->pNode;
char *msgStr = pMsg->rpcMsg.pCont; char *msgStr = pMsg->rpcMsg.pCont;
SMqCMCommitOffsetReq commitOffsetReq; SMqCMCommitOffsetReq commitOffsetReq;
SCoder decoder; SDecoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msgStr, pMsg->rpcMsg.contLen, TD_DECODER); tDecoderInit(&decoder, msgStr, pMsg->rpcMsg.contLen);
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq); tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
......
...@@ -83,12 +83,12 @@ END: ...@@ -83,12 +83,12 @@ END:
} }
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) { int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
SCoder encoder; SEncoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tEncoderInit(&encoder, NULL, 0);
tEncodeSStreamTask(&encoder, pTask); tEncodeSStreamTask(&encoder, pTask);
int32_t size = encoder.pos; int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size; int32_t tlen = sizeof(SMsgHead) + size;
tCoderClear(&encoder); tEncoderClear(&encoder);
void* buf = taosMemoryMalloc(tlen); void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -96,9 +96,9 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet ...@@ -96,9 +96,9 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
} }
((SMsgHead*)buf)->vgId = htonl(nodeId); ((SMsgHead*)buf)->vgId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER); tEncoderInit(&encoder, abuf, size);
tEncodeSStreamTask(&encoder, pTask); tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder); tEncoderClear(&encoder);
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, pEpSet, sizeof(SEpSet)); memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
......
...@@ -28,8 +28,8 @@ ...@@ -28,8 +28,8 @@
#include "mndVgroup.h" #include "mndVgroup.h"
#include "tname.h" #include "tname.h"
#define TSDB_STB_VER_NUMBER 1 #define STB_VER_NUMBER 1
#define TSDB_STB_RESERVE_SIZE 64 #define STB_RESERVE_SIZE 64
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
...@@ -46,13 +46,15 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBl ...@@ -46,13 +46,15 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
int32_t mndInitStb(SMnode *pMnode) { int32_t mndInitStb(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_STB, SSdbTable table = {
.keyType = SDB_KEY_BINARY, .sdbType = SDB_STB,
.encodeFp = (SdbEncodeFp)mndStbActionEncode, .keyType = SDB_KEY_BINARY,
.decodeFp = (SdbDecodeFp)mndStbActionDecode, .encodeFp = (SdbEncodeFp)mndStbActionEncode,
.insertFp = (SdbInsertFp)mndStbActionInsert, .decodeFp = (SdbDecodeFp)mndStbActionDecode,
.updateFp = (SdbUpdateFp)mndStbActionUpdate, .insertFp = (SdbInsertFp)mndStbActionInsert,
.deleteFp = (SdbDeleteFp)mndStbActionDelete}; .updateFp = (SdbUpdateFp)mndStbActionUpdate,
.deleteFp = (SdbDeleteFp)mndStbActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq); mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
...@@ -74,8 +76,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { ...@@ -74,8 +76,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + +pStb->commentLen + int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + +pStb->commentLen +
pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE; pStb->ast1Len + pStb->ast2Len + STB_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER; if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
...@@ -99,6 +101,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { ...@@ -99,6 +101,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
for (int32_t i = 0; i < pStb->numOfColumns; ++i) { for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i]; SSchema *pSchema = &pStb->pColumns[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER) SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER) SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER) SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
...@@ -107,6 +110,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { ...@@ -107,6 +110,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
for (int32_t i = 0; i < pStb->numOfTags; ++i) { for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pTags[i]; SSchema *pSchema = &pStb->pTags[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER) SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER) SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER) SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
...@@ -121,7 +125,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { ...@@ -121,7 +125,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
if (pStb->ast2Len > 0) { if (pStb->ast2Len > 0) {
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER) SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
} }
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER)
terrno = 0; terrno = 0;
...@@ -143,7 +147,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -143,7 +147,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_STB_VER_NUMBER) { if (sver != STB_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER; goto _OVER;
} }
...@@ -183,6 +187,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -183,6 +187,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < pStb->numOfColumns; ++i) { for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i]; SSchema *pSchema = &pStb->pColumns[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER) SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER) SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER) SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
...@@ -191,6 +196,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -191,6 +196,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < pStb->numOfTags; ++i) { for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pTags[i]; SSchema *pSchema = &pStb->pTags[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER) SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER) SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER) SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
...@@ -211,7 +217,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { ...@@ -211,7 +217,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
if (pStb->pAst2 == NULL) goto _OVER; if (pStb->pAst2 == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER) SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
} }
SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER) SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
...@@ -363,7 +369,7 @@ static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSch ...@@ -363,7 +369,7 @@ static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSch
} }
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SCoder coder = {0}; SEncoder encoder = {0};
int32_t contLen; int32_t contLen;
SName name = {0}; SName name = {0};
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
...@@ -416,11 +422,11 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt ...@@ -416,11 +422,11 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
pHead->vgId = htonl(pVgroup->vgId); pHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER); tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
if (tEncodeSVCreateStbReq(&coder, &req) < 0) { if (tEncodeSVCreateStbReq(&encoder, &req) < 0) {
return NULL; return NULL;
} }
tCoderClear(&coder); tEncoderClear(&encoder);
*pContLen = contLen; *pContLen = contLen;
taosMemoryFreeClear(req.pRSmaParam.qmsg1); taosMemoryFreeClear(req.pRSmaParam.qmsg1);
...@@ -434,7 +440,7 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, ...@@ -434,7 +440,7 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
int32_t contLen = 0; int32_t contLen = 0;
int32_t ret = 0; int32_t ret = 0;
SMsgHead *pHead = NULL; SMsgHead *pHead = NULL;
SCoder coder = {0}; SEncoder encoder = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
...@@ -456,9 +462,9 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, ...@@ -456,9 +462,9 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER); tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
tEncodeSVDropStbReq(&coder, &req); tEncodeSVDropStbReq(&encoder, &req);
tCoderClear(&coder); tEncoderClear(&encoder);
*pContLen = contLen; *pContLen = contLen;
return pHead; return pHead;
...@@ -488,7 +494,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { ...@@ -488,7 +494,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) { for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
SField *pField1 = taosArrayGet(pCreate->pColumns, i); SField *pField1 = taosArrayGet(pCreate->pColumns, i);
if (pField->type < 0) { if (pField1->type < 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
...@@ -574,6 +580,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -574,6 +580,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_VND_CREATE_STB; action.msgType = TDMT_VND_CREATE_STB;
action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
...@@ -613,6 +620,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -613,6 +620,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_VND_DROP_STB; action.msgType = TDMT_VND_DROP_STB;
action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
if (mndTransAppendUndoAction(pTrans, &action) != 0) { if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
...@@ -733,6 +741,7 @@ _OVER: ...@@ -733,6 +741,7 @@ _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbInfo(pTrans, pDb); mndTransSetDbInfo(pTrans, pDb);
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
...@@ -792,7 +801,10 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) { ...@@ -792,7 +801,10 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) {
} }
int32_t numOfStbs = -1; int32_t numOfStbs = -1;
mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs); if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
goto _OVER;
}
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) { if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB; terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
goto _OVER; goto _OVER;
...@@ -819,7 +831,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) { ...@@ -819,7 +831,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) {
return 0; return 0;
} }
static int32_t mndCheckAlterStbReq(SMAltertbReq *pAlter) { static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) { if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
...@@ -1170,7 +1182,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -1170,7 +1182,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0; return 0;
} }
static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) { static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
SStbObj stbObj = {0}; SStbObj stbObj = {0};
taosRLockLatch(&pOld->lock); taosRLockLatch(&pOld->lock);
memcpy(&stbObj, pOld, sizeof(SStbObj)); memcpy(&stbObj, pOld, sizeof(SStbObj));
...@@ -1234,12 +1246,12 @@ _OVER: ...@@ -1234,12 +1246,12 @@ _OVER:
} }
static int32_t mndProcessMAlterStbReq(SNodeMsg *pReq) { static int32_t mndProcessMAlterStbReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
int32_t code = -1; int32_t code = -1;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
SUserObj *pUser = NULL; SUserObj *pUser = NULL;
SMAltertbReq alterReq = {0}; SMAlterStbReq alterReq = {0};
if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) { if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
......
...@@ -70,14 +70,14 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { ...@@ -70,14 +70,14 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL; void *buf = NULL;
SCoder encoder; SEncoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tEncoderInit(&encoder, NULL, 0);
if (tEncodeSStreamObj(&encoder, pStream) < 0) { if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder); tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER; goto STREAM_ENCODE_OVER;
} }
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tCoderClear(&encoder); tEncoderClear(&encoder);
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE; int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
...@@ -86,12 +86,12 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { ...@@ -86,12 +86,12 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
buf = taosMemoryMalloc(tlen); buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto STREAM_ENCODE_OVER; if (buf == NULL) goto STREAM_ENCODE_OVER;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); tEncoderInit(&encoder, buf, tlen);
if (tEncodeSStreamObj(&encoder, pStream) < 0) { if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder); tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER; goto STREAM_ENCODE_OVER;
} }
tCoderClear(&encoder); tEncoderClear(&encoder);
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
...@@ -138,8 +138,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { ...@@ -138,8 +138,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
if (buf == NULL) goto STREAM_DECODE_OVER; if (buf == NULL) goto STREAM_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
SCoder decoder; SDecoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, tlen + 1, TD_DECODER); tDecoderInit(&decoder, buf, tlen + 1);
if (tDecodeSStreamObj(&decoder, pStream) < 0) { if (tDecodeSStreamObj(&decoder, pStream) < 0) {
goto STREAM_DECODE_OVER; goto STREAM_DECODE_OVER;
} }
...@@ -345,7 +345,10 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre ...@@ -345,7 +345,10 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
} }
int32_t numOfStbs = -1; int32_t numOfStbs = -1;
mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs); if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
goto _OVER;
}
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) { if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB; terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
goto _OVER; goto _OVER;
......
...@@ -137,7 +137,7 @@ void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) { ...@@ -137,7 +137,7 @@ void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
} }
void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen) { void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
SMAltertbReq req = {0}; SMAlterStbReq req = {0};
strcpy(req.name, stbname); strcpy(req.name, stbname);
req.numOfFields = 1; req.numOfFields = 1;
req.pFields = taosArrayInit(1, sizeof(SField)); req.pFields = taosArrayInit(1, sizeof(SField));
...@@ -158,7 +158,7 @@ void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagnam ...@@ -158,7 +158,7 @@ void* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagnam
} }
void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen) { void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
SMAltertbReq req = {0}; SMAlterStbReq req = {0};
strcpy(req.name, stbname); strcpy(req.name, stbname);
req.numOfFields = 1; req.numOfFields = 1;
req.pFields = taosArrayInit(1, sizeof(SField)); req.pFields = taosArrayInit(1, sizeof(SField));
...@@ -180,7 +180,7 @@ void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagna ...@@ -180,7 +180,7 @@ void* MndTestStb::BuildAlterStbDropTagReq(const char* stbname, const char* tagna
void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname, void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname,
int32_t* pContLen) { int32_t* pContLen) {
SMAltertbReq req = {0}; SMAlterStbReq req = {0};
strcpy(req.name, stbname); strcpy(req.name, stbname);
req.numOfFields = 2; req.numOfFields = 2;
req.pFields = taosArrayInit(2, sizeof(SField)); req.pFields = taosArrayInit(2, sizeof(SField));
...@@ -208,7 +208,7 @@ void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char* ...@@ -208,7 +208,7 @@ void* MndTestStb::BuildAlterStbUpdateTagNameReq(const char* stbname, const char*
void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes,
int32_t* pContLen) { int32_t* pContLen) {
SMAltertbReq req = {0}; SMAlterStbReq req = {0};
strcpy(req.name, stbname); strcpy(req.name, stbname);
req.numOfFields = 1; req.numOfFields = 1;
req.pFields = taosArrayInit(1, sizeof(SField)); req.pFields = taosArrayInit(1, sizeof(SField));
...@@ -229,7 +229,7 @@ void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char ...@@ -229,7 +229,7 @@ void* MndTestStb::BuildAlterStbUpdateTagBytesReq(const char* stbname, const char
} }
void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen) { void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen) {
SMAltertbReq req = {0}; SMAlterStbReq req = {0};
strcpy(req.name, stbname); strcpy(req.name, stbname);
req.numOfFields = 1; req.numOfFields = 1;
req.pFields = taosArrayInit(1, sizeof(SField)); req.pFields = taosArrayInit(1, sizeof(SField));
...@@ -250,7 +250,7 @@ void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* col ...@@ -250,7 +250,7 @@ void* MndTestStb::BuildAlterStbAddColumnReq(const char* stbname, const char* col
} }
void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen) { void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen) {
SMAltertbReq req = {0}; SMAlterStbReq req = {0};
strcpy(req.name, stbname); strcpy(req.name, stbname);
req.numOfFields = 1; req.numOfFields = 1;
req.pFields = taosArrayInit(1, sizeof(SField)); req.pFields = taosArrayInit(1, sizeof(SField));
...@@ -272,7 +272,7 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co ...@@ -272,7 +272,7 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co
void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes,
int32_t* pContLen) { int32_t* pContLen) {
SMAltertbReq req = {0}; SMAlterStbReq req = {0};
strcpy(req.name, stbname); strcpy(req.name, stbname);
req.numOfFields = 1; req.numOfFields = 1;
req.pFields = taosArrayInit(1, sizeof(SField)); req.pFields = taosArrayInit(1, sizeof(SField));
......
...@@ -99,10 +99,10 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -99,10 +99,10 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
ASSERT(0); ASSERT(0);
return; return;
} }
SCoder decoder; SDecoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER); tDecoderInit(&decoder, msg, pMsg->contLen - sizeof(SMsgHead));
tDecodeSStreamTask(&decoder, pTask); tDecodeSStreamTask(&decoder, pTask);
tCoderClear(&decoder); tDecoderClear(&decoder);
sndMetaDeployTask(pSnode->pMeta, pTask); sndMetaDeployTask(pSnode->pMeta, pTask);
} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) { } else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {
......
...@@ -90,7 +90,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur); ...@@ -90,7 +90,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur);
// tsdb // tsdb
// typedef struct STsdb STsdb; // typedef struct STsdb STsdb;
typedef void *tsdbReaderT; typedef void *tsdbReaderT;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2
...@@ -108,12 +108,12 @@ int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, con ...@@ -108,12 +108,12 @@ int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, con
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool* allHave); int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond);
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo); int32_t tsdbGetTableGroupFromIdList(SVnode *pVnode, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
// tq // tq
...@@ -126,8 +126,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList ...@@ -126,8 +126,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t* pUid, int32_t *pNumOfRows, int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
int16_t *pNumOfCols); int32_t *pNumOfRows, int16_t *pNumOfCols);
// need to reposition // need to reposition
...@@ -189,10 +189,10 @@ struct SMetaEntry { ...@@ -189,10 +189,10 @@ struct SMetaEntry {
SSchemaWrapper schemaTag; SSchemaWrapper schemaTag;
} stbEntry; } stbEntry;
struct { struct {
int64_t ctime; int64_t ctime;
int32_t ttlDays; int32_t ttlDays;
tb_uid_t suid; tb_uid_t suid;
const void *pTags; const uint8_t *pTags;
} ctbEntry; } ctbEntry;
struct { struct {
int64_t ctime; int64_t ctime;
...@@ -205,7 +205,7 @@ struct SMetaEntry { ...@@ -205,7 +205,7 @@ struct SMetaEntry {
struct SMetaReader { struct SMetaReader {
int32_t flags; int32_t flags;
SMeta *pMeta; SMeta *pMeta;
SCoder coder; SDecoder coder;
SMetaEntry me; SMetaEntry me;
void *pBuf; void *pBuf;
int szBuf; int szBuf;
......
...@@ -39,8 +39,8 @@ typedef struct SMSmaCursor SMSmaCursor; ...@@ -39,8 +39,8 @@ typedef struct SMSmaCursor SMSmaCursor;
// metaOpen ================== // metaOpen ==================
// metaEntry ================== // metaEntry ==================
int metaEncodeEntry(SCoder* pCoder, const SMetaEntry* pME); int metaEncodeEntry(SEncoder* pCoder, const SMetaEntry* pME);
int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME); int metaDecodeEntry(SDecoder* pCoder, SMetaEntry* pME);
// metaTable ================== // metaTable ==================
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "meta.h" #include "meta.h"
int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) { int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pME->version) < 0) return -1; if (tEncodeI64(pCoder, pME->version) < 0) return -1;
...@@ -43,8 +43,8 @@ int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) { ...@@ -43,8 +43,8 @@ int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
return 0; return 0;
} }
int metaDecodeEntry(SCoder *pCoder, SMetaEntry *pME) { int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
uint64_t len; uint32_t len;
if (tStartDecode(pCoder) < 0) return -1; if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pME->version) < 0) return -1; if (tDecodeI64(pCoder, &pME->version) < 0) return -1;
......
...@@ -22,7 +22,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) { ...@@ -22,7 +22,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
} }
void metaReaderClear(SMetaReader *pReader) { void metaReaderClear(SMetaReader *pReader) {
tCoderClear(&pReader->coder); tDecoderClear(&pReader->coder);
tdbFree(pReader->pBuf); tdbFree(pReader->pBuf);
} }
...@@ -37,7 +37,7 @@ int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t u ...@@ -37,7 +37,7 @@ int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t u
} }
// decode the entry // decode the entry
tCoderInit(&pReader->coder, TD_LITTLE_ENDIAN, pReader->pBuf, pReader->szBuf, TD_DECODER); tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) { if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
goto _err; goto _err;
...@@ -147,7 +147,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -147,7 +147,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
SSchemaWrapper *pSW = NULL; SSchemaWrapper *pSW = NULL;
SSchema *pSchema = NULL; SSchema *pSchema = NULL;
void *pBuf; void *pBuf;
SCoder coder = {0}; SDecoder coder = {0};
// fetch // fetch
skmDbKey.uid = uid; skmDbKey.uid = uid;
...@@ -163,11 +163,11 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -163,11 +163,11 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
pBuf = pVal; pBuf = pVal;
pSW = taosMemoryMalloc(sizeof(SSchemaWrapper)); pSW = taosMemoryMalloc(sizeof(SSchemaWrapper));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER); tDecoderInit(&coder, pVal, vLen);
tDecodeSSchemaWrapper(&coder, pSW); tDecodeSSchemaWrapper(&coder, pSW);
pSchema = taosMemoryMalloc(sizeof(SSchema) * pSW->nCols); pSchema = taosMemoryMalloc(sizeof(SSchema) * pSW->nCols);
memcpy(pSchema, pSW->pSchema, sizeof(SSchema) * pSW->nCols); memcpy(pSchema, pSW->pSchema, sizeof(SSchema) * pSW->nCols);
tCoderClear(&coder); tDecoderClear(&coder);
pSW->pSchema = pSchema; pSW->pSchema = pSchema;
......
此差异已折叠。
...@@ -33,7 +33,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { ...@@ -33,7 +33,6 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
void *pBuf = NULL; void *pBuf = NULL;
int32_t szBuf = 0; int32_t szBuf = 0;
void *p = NULL; void *p = NULL;
SCoder coder = {0};
SMetaReader mr = {0}; SMetaReader mr = {0};
// validate req // validate req
...@@ -192,7 +191,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { ...@@ -192,7 +191,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
tb_uid_t uid; tb_uid_t uid;
int64_t tver; int64_t tver;
SMetaEntry me = {0}; SMetaEntry me = {0};
SCoder coder = {0}; SDecoder coder = {0};
int8_t type; int8_t type;
int64_t ctime; int64_t ctime;
tb_uid_t suid; tb_uid_t suid;
...@@ -253,7 +252,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { ...@@ -253,7 +252,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
// decode entry // decode entry
void *pDataCopy = taosMemoryMalloc(nData); // remove the copy (todo) void *pDataCopy = taosMemoryMalloc(nData); // remove the copy (todo)
memcpy(pDataCopy, pData, nData); memcpy(pDataCopy, pData, nData);
tCoderInit(&coder, TD_LITTLE_ENDIAN, pDataCopy, nData, TD_DECODER); tDecoderInit(&coder, pDataCopy, nData);
ret = metaDecodeEntry(&coder, &me); ret = metaDecodeEntry(&coder, &me);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
...@@ -272,7 +271,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { ...@@ -272,7 +271,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
} }
taosMemoryFree(pDataCopy); taosMemoryFree(pDataCopy);
tCoderClear(&coder); tDecoderClear(&coder);
tdbDbcClose(pTbDbc); tdbDbcClose(pTbDbc);
if (type == TSDB_CHILD_TABLE) { if (type == TSDB_CHILD_TABLE) {
...@@ -309,7 +308,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -309,7 +308,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
void *pVal = NULL; void *pVal = NULL;
int kLen = 0; int kLen = 0;
int vLen = 0; int vLen = 0;
SCoder coder = {0}; SEncoder coder = {0};
// set key and value // set key and value
tbDbKey.version = pME->version; tbDbKey.version = pME->version;
...@@ -330,13 +329,13 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -330,13 +329,13 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
goto _err; goto _err;
} }
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER); tEncoderInit(&coder, pVal, vLen);
if (metaEncodeEntry(&coder, pME) < 0) { if (metaEncodeEntry(&coder, pME) < 0) {
goto _err; goto _err;
} }
tCoderClear(&coder); tEncoderClear(&coder);
// write to table.db // write to table.db
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) { if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
...@@ -393,7 +392,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -393,7 +392,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) {
} }
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
SCoder coder = {0}; SEncoder coder = {0};
void *pVal = NULL; void *pVal = NULL;
int vLen = 0; int vLen = 0;
int rcode = 0; int rcode = 0;
...@@ -422,7 +421,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -422,7 +421,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
goto _exit; goto _exit;
} }
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER); tEncoderInit(&coder, pVal, vLen);
tEncodeSSchemaWrapper(&coder, pSW); tEncodeSSchemaWrapper(&coder, pSW);
if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) { if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
...@@ -432,7 +431,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -432,7 +431,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
_exit: _exit:
taosMemoryFree(pVal); taosMemoryFree(pVal);
tCoderClear(&coder); tEncoderClear(&coder);
return rcode; return rcode;
} }
......
...@@ -910,12 +910,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -910,12 +910,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
SCoder decoder; SDecoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) { if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0); ASSERT(0);
} }
tCoderClear(&decoder); tDecoderClear(&decoder);
// exec // exec
if (tqExpandTask(pTq, pTask, 4) < 0) { if (tqExpandTask(pTq, pTask, 4) < 0) {
......
...@@ -249,8 +249,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -249,8 +249,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
lastKey = rowKey; lastKey = rowKey;
++pCols->numOfRows; if (pCols) {
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false); ++pCols->numOfRows;
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
}
} else { } else {
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true); tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
} }
...@@ -279,7 +281,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -279,7 +281,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
} }
#endif #endif
} }
if (lastKey != TSKEY_INITIAL_VAL) { if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
++pCols->numOfRows; ++pCols->numOfRows;
} }
......
...@@ -63,14 +63,18 @@ struct SMemSkipListCurosr { ...@@ -63,14 +63,18 @@ struct SMemSkipListCurosr {
SMemSkipListNode *pNodeC; SMemSkipListNode *pNodeC;
}; };
#define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET))
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) #define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)) #define SL_NODE_HALF_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) #define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_HEAD_NODE(sl) ((sl)->pHead) #define SL_HEAD_NODE(sl) ((sl)->pHead)
#define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel)) #define SL_TAIL_NODE(sl) ((SMemSkipListNode *)&SL_NODE_FORWARD(SL_HEAD_NODE(sl), (sl)->maxLevel))
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
// SMemTable // SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
...@@ -111,23 +115,18 @@ int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) { ...@@ -111,23 +115,18 @@ int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) {
} }
int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) { int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *pSubmitBlk) {
SMemData *pMemData; SMemData *pMemData;
STsdb *pTsdb = pMemTb->pTsdb; STsdb *pTsdb = pMemTb->pTsdb;
SVnode *pVnode = pTsdb->pVnode; SVnode *pVnode = pTsdb->pVnode;
SVBufPool *pPool = pVnode->inUse; SVBufPool *pPool = pVnode->inUse;
int32_t hash; tb_uid_t suid = pSubmitBlk->suid;
int32_t tlen; tb_uid_t uid = pSubmitBlk->uid;
uint8_t buf[16]; int32_t iBucket;
int32_t rlen;
const uint8_t *p; // search SMemData by hash
SMemSkipListNode *pSlNode; iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
const STSRow *pTSRow; for (pMemData = pMemTb->pBuckets[iBucket]; pMemData; pMemData = pMemData->pHashNext) {
SMemSkipListCurosr slc = {0}; if (pMemData->suid == suid && pMemData->uid == uid) break;
// search hash
hash = (pSubmitBlk->suid + pSubmitBlk->uid) % pMemTb->nBucket;
for (pMemData = pMemTb->pBuckets[hash]; pMemData; pMemData = pMemData->pHashNext) {
if (pMemData->suid == pSubmitBlk->suid && pMemData->uid == pSubmitBlk->uid) break;
} }
// create pMemData if need // create pMemData if need
...@@ -143,8 +142,8 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p ...@@ -143,8 +142,8 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
} }
pMemData->pHashNext = NULL; pMemData->pHashNext = NULL;
pMemData->suid = pSubmitBlk->suid; pMemData->suid = suid;
pMemData->uid = pSubmitBlk->uid; pMemData->uid = uid;
pMemData->minKey = TSKEY_MAX; pMemData->minKey = TSKEY_MAX;
pMemData->maxKey = TSKEY_MIN; pMemData->maxKey = TSKEY_MIN;
pMemData->minVer = -1; pMemData->minVer = -1;
...@@ -159,55 +158,67 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p ...@@ -159,55 +158,67 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
pHead->level = maxLevel; pHead->level = maxLevel;
pTail->level = maxLevel; pTail->level = maxLevel;
for (int iLevel = 0; iLevel < maxLevel; iLevel++) { for (int iLevel = 0; iLevel < maxLevel; iLevel++) {
SL_NODE_FORWARD(pHead, iLevel) = pTail; SL_HEAD_NODE_FORWARD(pHead, iLevel) = pTail;
SL_NODE_FORWARD(pTail, iLevel) = pHead; SL_TAIL_NODE_BACKWARD(pTail, iLevel) = pHead;
} }
// add to MemTable // add to hash
hash = (pMemData->suid + pMemData->uid) % pMemTb->nBucket; if (pMemTb->nHash >= pMemTb->nBucket) {
pMemData->pHashNext = pMemTb->pBuckets[hash]; // rehash (todo)
pMemTb->pBuckets[hash] = pMemData; }
iBucket = HASH_BUCKET(suid, uid, pMemTb->nBucket);
pMemData->pHashNext = pMemTb->pBuckets[iBucket];
pMemTb->pBuckets[iBucket] = pMemData;
pMemTb->nHash++; pMemTb->nHash++;
// sort organize (todo)
} }
// loop to insert data to skiplist // do insert data to SMemData
#if 0 SMemSkipListCurosr slc = {0};
tsdbMemSkipListCursorOpen(&slc, &pMemData->sl); const uint8_t *p = pSubmitBlk->pData;
p = pSubmitBlk->pData; const uint8_t *pt;
for (;;) { const STSRow *pRow;
if (p - (uint8_t *)pSubmitBlk->pData >= pSubmitBlk->nData) break; uint64_t szRow;
SDecoder decoder = {0};
const uint8_t *pt = p; // tCoderInit(&coder, TD_LITTLE_ENDIAN, pSubmitBlk->pData, pSubmitBlk->nData, TD_DECODER);
p = tGetBinary(p, &pTSRow, &rlen); for (;;) {
// if (tDecodeIsEnd(&coder)) break;
// if (tDecodeBinary(&coder, (const uint8_t **)&pRow, &szRow) < 0) {
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
// check the row (todo) // check the row (todo)
// move the cursor to position to write (todo) // // move the cursor to position to write (todo)
int32_t c; // int32_t c;
tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c); // tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
ASSERT(c); // ASSERT(c);
// encode row // // encode row
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); // int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt); // int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
pSlNode = vnodeBufPoolMalloc(pPool, tsize); // pSlNode = vnodeBufPoolMalloc(pPool, tsize);
pSlNode->level = level; // pSlNode->level = level;
uint8_t *pData = SL_NODE_DATA(pSlNode); // uint8_t *pData = SL_NODE_DATA(pSlNode);
*(int64_t *)pData = version; // *(int64_t *)pData = version;
pData += sizeof(version); // pData += sizeof(version);
memcpy(pData, pt, p - pt); // memcpy(pData, pt, p - pt);
// insert row // // insert row
tsdbMemSkipListCursorPut(&slc, pSlNode); // tsdbMemSkipListCursorPut(&slc, pSlNode);
// update status // update status
if (pTSRow->ts < pMemData->minKey) pMemData->minKey = pTSRow->ts; if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts;
if (pTSRow->ts > pMemData->maxKey) pMemData->maxKey = pTSRow->ts; if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts;
} }
tsdbMemSkipListCursorClose(&slc); // tCoderClear(&coder);
#endif // tsdbMemSkipListCursorClose(&slc);
// update status
if (pMemData->minVer == -1) pMemData->minVer = version; if (pMemData->minVer == -1) pMemData->minVer = version;
if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version; if (pMemData->maxVer == -1 || pMemData->maxVer < version) pMemData->maxVer = version;
...@@ -217,8 +228,4 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p ...@@ -217,8 +228,4 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version; if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
return 0; return 0;
} }
\ No newline at end of file
// SMemData
// SMemSkipList
\ No newline at end of file
...@@ -1638,7 +1638,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { ...@@ -1638,7 +1638,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno)); tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno));
return -1; return -1;
} }
tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb), tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb),
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid); vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid);
...@@ -2006,6 +2006,12 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, ...@@ -2006,6 +2006,12 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid,
int8_t level) { int8_t level) {
SArray *pResult = NULL; SArray *pResult = NULL;
if (!taskInfo) {
tsdbDebug("vgId:%d no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, REPO_ID(pTsdb), level, suid);
return TSDB_CODE_SUCCESS;
}
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo, tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo,
suid); suid);
...@@ -2071,10 +2077,18 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType ...@@ -2071,10 +2077,18 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType
tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), suid); tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!pRSmaInfo->taskInfo[0]) {
tsdbDebug("vgId:%d no rsma qTaskInfo for suid:%" PRIu64, REPO_ID(pTsdb), suid);
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
// TODO: use the proper schema instead of 0, and cache STSchema in cache // TODO: use the proper schema instead of 0, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0); STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0);
if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
return TSDB_CODE_FAILED;
}
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1); tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1);
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2); tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2);
taosMemoryFree(pTSchema); taosMemoryFree(pTSchema);
......
...@@ -25,7 +25,7 @@ const SVnodeCfg vnodeCfgDefault = { ...@@ -25,7 +25,7 @@ const SVnodeCfg vnodeCfgDefault = {
.isHeap = false, .isHeap = false,
.isWeak = 0, .isWeak = 0,
.tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI, .tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI,
.update = 0, .update = 1,
.compression = 2, .compression = 2,
.slLevel = 5, .slLevel = 5,
.days = 10, .days = 10,
......
...@@ -17,6 +17,6 @@ target_include_directories( ...@@ -17,6 +17,6 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
#if(${BUILD_TEST}) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)
#endif(${BUILD_TEST}) endif(${BUILD_TEST})
...@@ -38,6 +38,7 @@ extern "C" { ...@@ -38,6 +38,7 @@ extern "C" {
#include "tlockfree.h" #include "tlockfree.h"
#include "tmsg.h" #include "tmsg.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tstreamUpdate.h"
#include "vnode.h" #include "vnode.h"
#include "executorInt.h" #include "executorInt.h"
...@@ -325,10 +326,15 @@ typedef struct SExchangeInfo { ...@@ -325,10 +326,15 @@ typedef struct SExchangeInfo {
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
} SExchangeInfo; } SExchangeInfo;
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef struct SColMatchInfo { typedef struct SColMatchInfo {
int32_t srcSlotId; // source slot id
int32_t colId; int32_t colId;
int32_t targetSlotId; int32_t targetSlotId;
bool output; bool output;
int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo; } SColMatchInfo;
typedef struct SScanInfo { typedef struct SScanInfo {
...@@ -380,6 +386,9 @@ typedef struct SStreamBlockScanInfo { ...@@ -380,6 +386,9 @@ typedef struct SStreamBlockScanInfo {
void* readerHandle; // stream block reader handle void* readerHandle; // stream block reader handle
SArray* pColMatchInfo; // SArray* pColMatchInfo; //
SNode* pCondition; SNode* pCondition;
SArray* tsArray;
SUpdateInfo* pUpdateInfo;
int32_t primaryTsIndex; // primary time stamp slot id
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
...@@ -440,6 +449,7 @@ typedef struct SIntervalAggOperatorInfo { ...@@ -440,6 +449,7 @@ typedef struct SIntervalAggOperatorInfo {
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
struct SFillInfo* pFillInfo; // fill info struct SFillInfo* pFillInfo; // fill info
bool invertible;
} SIntervalAggOperatorInfo; } SIntervalAggOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
......
...@@ -36,21 +36,6 @@ typedef struct STaskMgmt { ...@@ -36,21 +36,6 @@ typedef struct STaskMgmt {
bool closed; bool closed;
} STaskMgmt; } STaskMgmt;
static void taskMgmtKillTaskFn(void* handle, void* param1) {
void** fp = (void**)handle;
qKillTask(*fp);
}
static void freeqinfoFn(void *qhandle) {
void** handle = qhandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillTask(*handle);
qDestroyTask(*handle);
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) { qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
assert(readHandle != NULL && pSubplan != NULL); assert(readHandle != NULL && pSubplan != NULL);
......
...@@ -129,6 +129,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData ...@@ -129,6 +129,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData
char* val = colDataGetData(pColInfoData, rowIndex); char* val = colDataGetData(pColInfoData, rowIndex);
if (IS_VAR_DATA_TYPE(pkey->type)) { if (IS_VAR_DATA_TYPE(pkey->type)) {
memcpy(pkey->pData, val, varDataTLen(val)); memcpy(pkey->pData, val, varDataTLen(val));
ASSERT(varDataTLen(val) <= pkey->bytes);
} else { } else {
memcpy(pkey->pData, val, pkey->bytes); memcpy(pkey->pData, val, pkey->bytes);
} }
......
...@@ -36,6 +36,7 @@ typedef struct SBuiltinFuncDefinition { ...@@ -36,6 +36,7 @@ typedef struct SBuiltinFuncDefinition {
FExecProcess processFunc; FExecProcess processFunc;
FScalarExecProcess sprocessFunc; FScalarExecProcess sprocessFunc;
FExecFinalize finalizeFunc; FExecFinalize finalizeFunc;
FExecProcess invertFunc;
} SBuiltinFuncDefinition; } SBuiltinFuncDefinition;
extern const SBuiltinFuncDefinition funcMgtBuiltins[]; extern const SBuiltinFuncDefinition funcMgtBuiltins[];
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册