未验证 提交 8912a8bc 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #12241 from taosdata/3.0

3.0
......@@ -14,6 +14,25 @@ MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH})
MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH})
find_package(Git QUIET)
if(GIT_FOUND AND EXISTS "${PROJECT_SOURCE_DIR}/.git")
# Update submodules as needed
option(GIT_SUBMODULE "Check submodules during build" ON)
if(GIT_SUBMODULE)
message(STATUS "Submodule update")
execute_process(COMMAND ${GIT_EXECUTABLE} submodule update --init --recursive
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
RESULT_VARIABLE GIT_SUBMOD_RESULT)
if(NOT GIT_SUBMOD_RESULT EQUAL "0")
message(WARNING "git submodule update --init --recursive failed with ${GIT_SUBMOD_RESULT}, please checkout submodules")
endif()
endif()
endif()
if(NOT EXISTS "${PROJECT_SOURCE_DIR}/tools/taos-tools/CMakeLists.txt")
message(WARNING "The submodules were not downloaded! GIT_SUBMODULE was turned off or failed. Please update submodules manually if you need build them.")
endif()
if (NOT DEFINED TD_GRANT)
SET(TD_GRANT FALSE)
endif()
......
......@@ -45,6 +45,12 @@ enum {
STREAM_TRIGGER__BY_EVENT_TIME,
};
typedef enum EStreamType {
STREAM_NORMAL = 1,
STREAM_INVERT,
STREAM_INVALID,
} EStreamType;
typedef struct {
uint32_t numOfTables;
SArray* pGroupList;
......@@ -71,6 +77,7 @@ typedef struct SDataBlockInfo {
int16_t numOfCols;
int16_t hasVarCol;
int32_t capacity;
EStreamType type;
} SDataBlockInfo;
typedef struct SSDataBlock {
......@@ -115,8 +122,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData);
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols,
// sometimes info.numOfCols != array size
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
......
......@@ -56,11 +56,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
#define colDataSetNotNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
......@@ -187,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
}
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource,
uint32_t numOfRow2);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
......@@ -230,9 +230,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid);
tb_uid_t uid, tb_uid_t suid);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, int32_t vgId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
......
......@@ -135,6 +135,8 @@ typedef enum _mgmt_table {
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7
#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8
#define TSDB_ALTER_USER_ADD_ALL_DB 0x9
#define TSDB_ALTER_USER_REMOVE_ALL_DB 0xA
#define TSDB_ALTER_USER_PRIVILEGES 0x2
......@@ -188,6 +190,8 @@ typedef struct SRetention {
int8_t keepUnit;
} SRetention;
#define RETENTION_VALID(r) (((r)->freq > 0) && ((r)->keep > 0))
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
......@@ -326,11 +330,11 @@ typedef struct {
int8_t alterType;
int32_t numOfFields;
SArray* pFields;
} SMAltertbReq;
} SMAlterStbReq;
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
void tFreeSMAltertbReq(SMAltertbReq* pReq);
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
void tFreeSMAltertbReq(SMAlterStbReq* pReq);
typedef struct SEpSet {
int8_t inUse;
......@@ -338,8 +342,8 @@ typedef struct SEpSet {
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t tEncodeSEpSet(SEncoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
......@@ -611,8 +615,8 @@ typedef struct {
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp);
int32_t tSerializeSUseDbRspImp(SEncoder* pEncoder, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRspImp(SDecoder* pDecoder, SUseDbRsp* pRsp);
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
typedef struct {
......@@ -1527,8 +1531,8 @@ typedef struct {
char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2
} SRSmaParam;
int32_t tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam);
int32_t tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam);
int32_t tEncodeSRSmaParam(SEncoder* pCoder, const SRSmaParam* pRSmaParam);
int32_t tDecodeSRSmaParam(SDecoder* pCoder, SRSmaParam* pRSmaParam);
// TDMT_VND_CREATE_STB ==============
typedef struct SVCreateStbReq {
......@@ -1540,8 +1544,8 @@ typedef struct SVCreateStbReq {
SRSmaParam pRSmaParam;
} SVCreateStbReq;
int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq);
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SDecoder* pCoder, SVCreateStbReq* pReq);
// TDMT_VND_DROP_STB ==============
typedef struct SVDropStbReq {
......@@ -1549,8 +1553,8 @@ typedef struct SVDropStbReq {
tb_uid_t suid;
} SVDropStbReq;
int32_t tEncodeSVDropStbReq(SCoder* pCoder, const SVDropStbReq* pReq);
int32_t tDecodeSVDropStbReq(SCoder* pCoder, SVDropStbReq* pReq);
int32_t tEncodeSVDropStbReq(SEncoder* pCoder, const SVDropStbReq* pReq);
int32_t tDecodeSVDropStbReq(SDecoder* pCoder, SVDropStbReq* pReq);
#define TD_CREATE_IF_NOT_EXISTS 0x1
typedef struct SVCreateTbReq {
......@@ -1562,8 +1566,8 @@ typedef struct SVCreateTbReq {
int8_t type;
union {
struct {
tb_uid_t suid;
const void* pTag;
tb_uid_t suid;
const uint8_t* pTag;
} ctb;
struct {
SSchemaWrapper schema;
......@@ -1571,8 +1575,8 @@ typedef struct SVCreateTbReq {
};
} SVCreateTbReq;
int tEncodeSVCreateTbReq(SCoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SCoder* pCoder, SVCreateTbReq* pReq);
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
typedef struct {
int32_t nReqs;
......@@ -1582,15 +1586,15 @@ typedef struct {
};
} SVCreateTbBatchReq;
int tEncodeSVCreateTbBatchReq(SCoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SCoder* pCoder, SVCreateTbBatchReq* pReq);
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
typedef struct {
int32_t code;
} SVCreateTbRsp, SVUpdateTbRsp;
int tEncodeSVCreateTbRsp(SCoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SCoder* pCoder, SVCreateTbRsp* pRsp);
int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp);
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
......@@ -1603,8 +1607,8 @@ typedef struct {
};
} SVCreateTbBatchRsp;
int tEncodeSVCreateTbBatchRsp(SCoder* pCoder, const SVCreateTbBatchRsp* pRsp);
int tDecodeSVCreateTbBatchRsp(SCoder* pCoder, SVCreateTbBatchRsp* pRsp);
int tEncodeSVCreateTbBatchRsp(SEncoder* pCoder, const SVCreateTbBatchRsp* pRsp);
int tDecodeSVCreateTbBatchRsp(SDecoder* pCoder, SVCreateTbBatchRsp* pRsp);
int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
......@@ -1627,8 +1631,8 @@ typedef struct {
};
} SVDropTbBatchReq;
int32_t tEncodeSVDropTbBatchReq(SCoder* pCoder, const SVDropTbBatchReq* pReq);
int32_t tDecodeSVDropTbBatchReq(SCoder* pCoder, SVDropTbBatchReq* pReq);
int32_t tEncodeSVDropTbBatchReq(SEncoder* pCoder, const SVDropTbBatchReq* pReq);
int32_t tDecodeSVDropTbBatchReq(SDecoder* pCoder, SVDropTbBatchReq* pReq);
typedef struct {
int32_t nRsps;
......@@ -1638,8 +1642,8 @@ typedef struct {
};
} SVDropTbBatchRsp;
int32_t tEncodeSVDropTbBatchRsp(SCoder* pCoder, const SVDropTbBatchRsp* pRsp);
int32_t tDecodeSVDropTbBatchRsp(SCoder* pCoder, SVDropTbBatchRsp* pRsp);
int32_t tEncodeSVDropTbBatchRsp(SEncoder* pCoder, const SVDropTbBatchRsp* pRsp);
int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
typedef struct {
SMsgHead head;
......@@ -1821,14 +1825,14 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) {
static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) {
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (uint8_t*)pKv->value, pKv->valueLen) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
static FORCE_INLINE int32_t tDecodeSKv(SDecoder* pDecoder, SKv* pKv) {
if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1;
if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1;
pKv->value = taosMemoryMalloc(pKv->valueLen + 1);
......@@ -1837,13 +1841,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
return 0;
}
static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) {
static FORCE_INLINE int32_t tEncodeSClientHbKey(SEncoder* pEncoder, const SClientHbKey* pKey) {
if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1;
if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) {
static FORCE_INLINE int32_t tDecodeSClientHbKey(SDecoder* pDecoder, SClientHbKey* pKey) {
if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1;
if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1;
return 0;
......@@ -2046,10 +2050,10 @@ typedef struct {
int32_t reserved;
} SMqCMCommitOffsetRsp;
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq);
int32_t tEncodeSMqOffset(SEncoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq);
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
......@@ -2089,7 +2093,7 @@ static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
......@@ -2098,7 +2102,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
......@@ -2131,7 +2135,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
......@@ -2141,7 +2145,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchem
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapper* pSW) {
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
......@@ -2593,12 +2597,12 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
#define TD_AUTO_CREATE_TABLE 0x1
typedef struct {
int64_t suid;
int64_t uid;
int32_t sver;
uint64_t nData;
const void* pData;
SVCreateTbReq cTbReq;
int64_t suid;
int64_t uid;
int32_t sver;
uint32_t nData;
const uint8_t* pData;
SVCreateTbReq cTbReq;
} SVSubmitBlk;
typedef struct {
......@@ -2610,8 +2614,8 @@ typedef struct {
};
} SVSubmitReq;
int32_t tEncodeSVSubmitReq(SCoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SCoder* pCoder, SVSubmitReq* pReq);
int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
#pragma pack(pop)
......
......@@ -51,201 +51,208 @@
#define TK_USER 33
#define TK_PRIVILEGE 34
#define TK_DROP 35
#define TK_DNODE 36
#define TK_PORT 37
#define TK_NK_INTEGER 38
#define TK_DNODES 39
#define TK_NK_IPTOKEN 40
#define TK_LOCAL 41
#define TK_QNODE 42
#define TK_ON 43
#define TK_BNODE 44
#define TK_SNODE 45
#define TK_MNODE 46
#define TK_DATABASE 47
#define TK_USE 48
#define TK_IF 49
#define TK_NOT 50
#define TK_EXISTS 51
#define TK_BUFFER 52
#define TK_CACHELAST 53
#define TK_COMP 54
#define TK_DAYS 55
#define TK_NK_VARIABLE 56
#define TK_FSYNC 57
#define TK_MAXROWS 58
#define TK_MINROWS 59
#define TK_KEEP 60
#define TK_PAGES 61
#define TK_PAGESIZE 62
#define TK_PRECISION 63
#define TK_REPLICA 64
#define TK_STRICT 65
#define TK_WAL 66
#define TK_VGROUPS 67
#define TK_SINGLE_STABLE 68
#define TK_RETENTIONS 69
#define TK_NK_COMMA 70
#define TK_NK_COLON 71
#define TK_TABLE 72
#define TK_NK_LP 73
#define TK_NK_RP 74
#define TK_STABLE 75
#define TK_ADD 76
#define TK_COLUMN 77
#define TK_MODIFY 78
#define TK_RENAME 79
#define TK_TAG 80
#define TK_SET 81
#define TK_NK_EQ 82
#define TK_USING 83
#define TK_TAGS 84
#define TK_NK_DOT 85
#define TK_COMMENT 86
#define TK_BOOL 87
#define TK_TINYINT 88
#define TK_SMALLINT 89
#define TK_INT 90
#define TK_INTEGER 91
#define TK_BIGINT 92
#define TK_FLOAT 93
#define TK_DOUBLE 94
#define TK_BINARY 95
#define TK_TIMESTAMP 96
#define TK_NCHAR 97
#define TK_UNSIGNED 98
#define TK_JSON 99
#define TK_VARCHAR 100
#define TK_MEDIUMBLOB 101
#define TK_BLOB 102
#define TK_VARBINARY 103
#define TK_DECIMAL 104
#define TK_DELAY 105
#define TK_FILE_FACTOR 106
#define TK_NK_FLOAT 107
#define TK_ROLLUP 108
#define TK_TTL 109
#define TK_SMA 110
#define TK_SHOW 111
#define TK_DATABASES 112
#define TK_TABLES 113
#define TK_STABLES 114
#define TK_MNODES 115
#define TK_MODULES 116
#define TK_QNODES 117
#define TK_FUNCTIONS 118
#define TK_INDEXES 119
#define TK_FROM 120
#define TK_ACCOUNTS 121
#define TK_APPS 122
#define TK_CONNECTIONS 123
#define TK_LICENCE 124
#define TK_GRANTS 125
#define TK_QUERIES 126
#define TK_SCORES 127
#define TK_TOPICS 128
#define TK_VARIABLES 129
#define TK_BNODES 130
#define TK_SNODES 131
#define TK_CLUSTER 132
#define TK_LIKE 133
#define TK_INDEX 134
#define TK_FULLTEXT 135
#define TK_FUNCTION 136
#define TK_INTERVAL 137
#define TK_TOPIC 138
#define TK_AS 139
#define TK_WITH 140
#define TK_SCHEMA 141
#define TK_DESC 142
#define TK_DESCRIBE 143
#define TK_RESET 144
#define TK_QUERY 145
#define TK_CACHE 146
#define TK_EXPLAIN 147
#define TK_ANALYZE 148
#define TK_VERBOSE 149
#define TK_NK_BOOL 150
#define TK_RATIO 151
#define TK_COMPACT 152
#define TK_VNODES 153
#define TK_IN 154
#define TK_OUTPUTTYPE 155
#define TK_AGGREGATE 156
#define TK_BUFSIZE 157
#define TK_STREAM 158
#define TK_INTO 159
#define TK_TRIGGER 160
#define TK_AT_ONCE 161
#define TK_WINDOW_CLOSE 162
#define TK_WATERMARK 163
#define TK_KILL 164
#define TK_CONNECTION 165
#define TK_MERGE 166
#define TK_VGROUP 167
#define TK_REDISTRIBUTE 168
#define TK_SPLIT 169
#define TK_SYNCDB 170
#define TK_NULL 171
#define TK_NK_QUESTION 172
#define TK_NK_ARROW 173
#define TK_ROWTS 174
#define TK_TBNAME 175
#define TK_QSTARTTS 176
#define TK_QENDTS 177
#define TK_WSTARTTS 178
#define TK_WENDTS 179
#define TK_WDURATION 180
#define TK_CAST 181
#define TK_NOW 182
#define TK_TODAY 183
#define TK_TIMEZONE 184
#define TK_COUNT 185
#define TK_FIRST 186
#define TK_LAST 187
#define TK_LAST_ROW 188
#define TK_BETWEEN 189
#define TK_IS 190
#define TK_NK_LT 191
#define TK_NK_GT 192
#define TK_NK_LE 193
#define TK_NK_GE 194
#define TK_NK_NE 195
#define TK_MATCH 196
#define TK_NMATCH 197
#define TK_CONTAINS 198
#define TK_JOIN 199
#define TK_INNER 200
#define TK_SELECT 201
#define TK_DISTINCT 202
#define TK_WHERE 203
#define TK_PARTITION 204
#define TK_BY 205
#define TK_SESSION 206
#define TK_STATE_WINDOW 207
#define TK_SLIDING 208
#define TK_FILL 209
#define TK_VALUE 210
#define TK_NONE 211
#define TK_PREV 212
#define TK_LINEAR 213
#define TK_NEXT 214
#define TK_GROUP 215
#define TK_HAVING 216
#define TK_ORDER 217
#define TK_SLIMIT 218
#define TK_SOFFSET 219
#define TK_LIMIT 220
#define TK_OFFSET 221
#define TK_ASC 222
#define TK_NULLS 223
#define TK_ID 224
#define TK_NK_BITNOT 225
#define TK_INSERT 226
#define TK_VALUES 227
#define TK_IMPORT 228
#define TK_NK_SEMI 229
#define TK_FILE 230
#define TK_GRANT 36
#define TK_ON 37
#define TK_TO 38
#define TK_REVOKE 39
#define TK_FROM 40
#define TK_NK_COMMA 41
#define TK_READ 42
#define TK_WRITE 43
#define TK_NK_DOT 44
#define TK_DNODE 45
#define TK_PORT 46
#define TK_NK_INTEGER 47
#define TK_DNODES 48
#define TK_NK_IPTOKEN 49
#define TK_LOCAL 50
#define TK_QNODE 51
#define TK_BNODE 52
#define TK_SNODE 53
#define TK_MNODE 54
#define TK_DATABASE 55
#define TK_USE 56
#define TK_IF 57
#define TK_NOT 58
#define TK_EXISTS 59
#define TK_BUFFER 60
#define TK_CACHELAST 61
#define TK_COMP 62
#define TK_DAYS 63
#define TK_NK_VARIABLE 64
#define TK_FSYNC 65
#define TK_MAXROWS 66
#define TK_MINROWS 67
#define TK_KEEP 68
#define TK_PAGES 69
#define TK_PAGESIZE 70
#define TK_PRECISION 71
#define TK_REPLICA 72
#define TK_STRICT 73
#define TK_WAL 74
#define TK_VGROUPS 75
#define TK_SINGLE_STABLE 76
#define TK_RETENTIONS 77
#define TK_NK_COLON 78
#define TK_TABLE 79
#define TK_NK_LP 80
#define TK_NK_RP 81
#define TK_STABLE 82
#define TK_ADD 83
#define TK_COLUMN 84
#define TK_MODIFY 85
#define TK_RENAME 86
#define TK_TAG 87
#define TK_SET 88
#define TK_NK_EQ 89
#define TK_USING 90
#define TK_TAGS 91
#define TK_COMMENT 92
#define TK_BOOL 93
#define TK_TINYINT 94
#define TK_SMALLINT 95
#define TK_INT 96
#define TK_INTEGER 97
#define TK_BIGINT 98
#define TK_FLOAT 99
#define TK_DOUBLE 100
#define TK_BINARY 101
#define TK_TIMESTAMP 102
#define TK_NCHAR 103
#define TK_UNSIGNED 104
#define TK_JSON 105
#define TK_VARCHAR 106
#define TK_MEDIUMBLOB 107
#define TK_BLOB 108
#define TK_VARBINARY 109
#define TK_DECIMAL 110
#define TK_DELAY 111
#define TK_FILE_FACTOR 112
#define TK_NK_FLOAT 113
#define TK_ROLLUP 114
#define TK_TTL 115
#define TK_SMA 116
#define TK_SHOW 117
#define TK_DATABASES 118
#define TK_TABLES 119
#define TK_STABLES 120
#define TK_MNODES 121
#define TK_MODULES 122
#define TK_QNODES 123
#define TK_FUNCTIONS 124
#define TK_INDEXES 125
#define TK_ACCOUNTS 126
#define TK_APPS 127
#define TK_CONNECTIONS 128
#define TK_LICENCE 129
#define TK_GRANTS 130
#define TK_QUERIES 131
#define TK_SCORES 132
#define TK_TOPICS 133
#define TK_VARIABLES 134
#define TK_BNODES 135
#define TK_SNODES 136
#define TK_CLUSTER 137
#define TK_TRANSACTIONS 138
#define TK_LIKE 139
#define TK_INDEX 140
#define TK_FULLTEXT 141
#define TK_FUNCTION 142
#define TK_INTERVAL 143
#define TK_TOPIC 144
#define TK_AS 145
#define TK_WITH 146
#define TK_SCHEMA 147
#define TK_DESC 148
#define TK_DESCRIBE 149
#define TK_RESET 150
#define TK_QUERY 151
#define TK_CACHE 152
#define TK_EXPLAIN 153
#define TK_ANALYZE 154
#define TK_VERBOSE 155
#define TK_NK_BOOL 156
#define TK_RATIO 157
#define TK_COMPACT 158
#define TK_VNODES 159
#define TK_IN 160
#define TK_OUTPUTTYPE 161
#define TK_AGGREGATE 162
#define TK_BUFSIZE 163
#define TK_STREAM 164
#define TK_INTO 165
#define TK_TRIGGER 166
#define TK_AT_ONCE 167
#define TK_WINDOW_CLOSE 168
#define TK_WATERMARK 169
#define TK_KILL 170
#define TK_CONNECTION 171
#define TK_TRANSACTION 172
#define TK_MERGE 173
#define TK_VGROUP 174
#define TK_REDISTRIBUTE 175
#define TK_SPLIT 176
#define TK_SYNCDB 177
#define TK_NULL 178
#define TK_NK_QUESTION 179
#define TK_NK_ARROW 180
#define TK_ROWTS 181
#define TK_TBNAME 182
#define TK_QSTARTTS 183
#define TK_QENDTS 184
#define TK_WSTARTTS 185
#define TK_WENDTS 186
#define TK_WDURATION 187
#define TK_CAST 188
#define TK_NOW 189
#define TK_TODAY 190
#define TK_TIMEZONE 191
#define TK_COUNT 192
#define TK_FIRST 193
#define TK_LAST 194
#define TK_LAST_ROW 195
#define TK_BETWEEN 196
#define TK_IS 197
#define TK_NK_LT 198
#define TK_NK_GT 199
#define TK_NK_LE 200
#define TK_NK_GE 201
#define TK_NK_NE 202
#define TK_MATCH 203
#define TK_NMATCH 204
#define TK_CONTAINS 205
#define TK_JOIN 206
#define TK_INNER 207
#define TK_SELECT 208
#define TK_DISTINCT 209
#define TK_WHERE 210
#define TK_PARTITION 211
#define TK_BY 212
#define TK_SESSION 213
#define TK_STATE_WINDOW 214
#define TK_SLIDING 215
#define TK_FILL 216
#define TK_VALUE 217
#define TK_NONE 218
#define TK_PREV 219
#define TK_LINEAR 220
#define TK_NEXT 221
#define TK_GROUP 222
#define TK_HAVING 223
#define TK_ORDER 224
#define TK_SLIMIT 225
#define TK_SOFFSET 226
#define TK_LIMIT 227
#define TK_OFFSET 228
#define TK_ASC 229
#define TK_NULLS 230
#define TK_ID 231
#define TK_NK_BITNOT 232
#define TK_INSERT 233
#define TK_VALUES 234
#define TK_IMPORT 235
#define TK_NK_SEMI 236
#define TK_FILE 237
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -165,15 +165,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
*/
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
/**
* release the query handle and decrease the reference count in cache
* @param pMgmt
* @param pQInfo
* @param freeHandle
* @return
*/
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes);
......
......@@ -168,6 +168,9 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId);
#ifdef __cplusplus
}
......
......@@ -44,7 +44,8 @@ enum {
UDFC_CODE_PIPE_READ_ERR = -2,
UDFC_CODE_CONNECT_PIPE_ERR = -3,
UDFC_CODE_LOAD_UDF_FAILURE = -4,
UDFC_CODE_INVALID_STATE = -5
UDFC_CODE_INVALID_STATE = -5,
UDFC_CODE_NO_PIPE = -6,
};
typedef void *UdfcFuncHandle;
......@@ -140,6 +141,44 @@ typedef int32_t (*TUdfDestroyFunc)();
#define UDF_MEMORY_EXP_GROWTH 1.5
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
#define udfColDataIsNull_f(pColumn, row) ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
#define udfColDataSetNull_f(pColumn, row) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
} while (0)
#define udfColDataSetNotNull_f(pColumn, r_) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
static FORCE_INLINE char* udfColDataGetData(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
} else {
return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
}
}
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
if (udfColDataIsNull_var(pColumn, row)) {
return true;
}
char* data = udfColDataGetData(pColumn, row);
return (*data == TSDB_DATA_TYPE_NULL);
} else {
return udfColDataIsNull_var(pColumn, row);
}
} else {
return udfColDataIsNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
......@@ -186,17 +225,22 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t ne
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
udfColEnsureCapacity(pColumn, row+1);
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
udfColDataSetNull_var(pColumn, row);
} else {
udfColDataSetNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) {
if (isVarCol) {
data->varLenCol.varOffsets[currentRow] = -1;
} else {
colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow);
}
udfColDataSetNull(pColumn, currentRow);
} else {
if (!isVarCol) {
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
......
......@@ -310,6 +310,29 @@ typedef struct SCreateFunctionStmt {
int32_t bufSize;
} SCreateFunctionStmt;
typedef struct SDropFunctionStmt {
ENodeType type;
char funcName[TSDB_FUNC_NAME_LEN];
bool ignoreNotExists;
} SDropFunctionStmt;
#define PRIVILEGE_TYPE_MASK(n) (1 << n)
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0)
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1)
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2)
#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef struct SGrantStmt {
ENodeType type;
char userName[TSDB_USER_LEN];
char dbName[TSDB_DB_NAME_LEN];
int64_t privileges;
} SGrantStmt;
typedef SGrantStmt SRevokeStmt;
#ifdef __cplusplus
}
#endif
......
......@@ -143,6 +143,8 @@ typedef enum ENodeType {
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT,
QUERY_NODE_SPLIT_VGROUP_STMT,
QUERY_NODE_SYNCDB_STMT,
QUERY_NODE_GRANT_STMT,
QUERY_NODE_REVOKE_STMT,
QUERY_NODE_SHOW_DNODES_STMT,
QUERY_NODE_SHOW_MNODES_STMT,
QUERY_NODE_SHOW_MODULES_STMT,
......@@ -174,8 +176,10 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
QUERY_NODE_SHOW_TRANSACTIONS_STMT,
QUERY_NODE_KILL_CONNECTION_STMT,
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN,
......
......@@ -25,20 +25,18 @@ extern "C" {
typedef struct SPlanContext {
uint64_t queryId;
int32_t acctId;
SEpSet mgmtEpSet;
SNode* pAstRoot;
bool topicQuery;
bool streamQuery;
bool rSmaQuery;
bool showRewrite;
int8_t triggerType;
int64_t watermark;
int32_t placeholderNum;
void* pTransporter;
struct SCatalog* pCatalog;
char* pMsg;
int32_t msgLen;
int32_t acctId;
SEpSet mgmtEpSet;
SNode* pAstRoot;
bool topicQuery;
bool streamQuery;
bool rSmaQuery;
bool showRewrite;
int8_t triggerType;
int64_t watermark;
int32_t placeholderNum;
char* pMsg;
int32_t msgLen;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......@@ -47,7 +45,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @pSubplan subplan to be schedule
// @groupId id of a group of datasource subplans of this @pSubplan
// @pSource one execution location of this group of datasource subplans
// @pSource one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId);
......@@ -56,7 +54,7 @@ int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colI
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan);
char* qQueryPlanToString(const SQueryPlan* pPlan);
char* qQueryPlanToString(const SQueryPlan* pPlan);
SQueryPlan* qStringToQueryPlan(const char* pStr);
void qDestroyQueryPlan(SQueryPlan* pPlan);
......
......@@ -25,6 +25,8 @@ extern "C" {
#ifndef _TSTREAM_H_
#define _TSTREAM_H_
typedef struct SStreamTask SStreamTask;
enum {
STREAM_TASK_STATUS__RUNNING = 1,
STREAM_TASK_STATUS__STOP,
......@@ -69,20 +71,24 @@ typedef struct {
SUseDbRsp dbInfo;
} STaskDispatcherShuffle;
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef struct {
int8_t reserved;
int64_t stbUid;
SSchemaWrapper* pSchemaWrapper;
// not applicable to encoder and decoder
void* vnode;
FTbSink* tbSinkFunc;
STSchema* pTSchema;
SHashObj* pHash; // groupId to tbuid
} STaskSinkTb;
typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data);
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
typedef struct {
int64_t smaId;
// following are not applicable to encoder and decoder
FSmaHandle* smaHandle;
FSmaSink* smaSink;
} STaskSinkSma;
typedef struct {
......@@ -115,7 +121,7 @@ enum {
TASK_SINK__FETCH,
};
typedef struct {
struct SStreamTask {
int64_t streamId;
int32_t taskId;
int8_t status;
......@@ -150,12 +156,11 @@ typedef struct {
// application storage
void* ahandle;
} SStreamTask;
};
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
typedef struct {
......
......@@ -36,7 +36,7 @@ typedef struct SUpdateInfo {
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
void updateInfoDestroy(SUpdateInfo *pInfo);
#ifdef __cplusplus
......
......@@ -327,29 +327,32 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601)
#define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602)
#define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603)
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0604)
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0605)
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0606)
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0607)
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0608)
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0609)
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060A)
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060B)
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060C)
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x060D)
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x060E)
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x060F)
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0610)
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0611)
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0612)
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613)
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614)
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x0620)
#define TSDB_CODE_TDB_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0604)
#define TSDB_CODE_TDB_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0605)
#define TSDB_CODE_TDB_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0606)
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0607)
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0608)
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0609)
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x060A)
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x060B)
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x060C)
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060D)
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060E)
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060F)
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x0600)
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x0601)
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x0602)
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0613)
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0614)
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0617)
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0618)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x061A)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061B)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x061C)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x062D)
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
......@@ -629,6 +632,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_FIRST_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2641)
#define TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN TAOS_DEF_ERROR_CODE(0, 0x2642)
#define TSDB_CODE_PAR_INVALID_TAGS_NUM TAOS_DEF_ERROR_CODE(0, 0x2643)
#define TSDB_CODE_PAR_PERMISSION_DENIED TAOS_DEF_ERROR_CODE(0, 0x2644)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
此差异已折叠。
......@@ -162,18 +162,17 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = {
.requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = topicQuery,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb,
};
SParseContext cxt = {.requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = topicQuery,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb,
.pUser = pTscObj->user};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
......@@ -232,11 +231,15 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.placeholderNum = pQuery->placeholderNum};
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, pNodeList);
}
if (TSDB_CODE_SUCCESS == code) {
code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
}
......
......@@ -547,21 +547,21 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
req.offsets = (SMqOffset*)offsets->container.pData;
}
SCoder encoder;
SEncoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncoderInit(&encoder, NULL, 0);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
int32_t tlen = encoder.pos;
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
tCoderClear(&encoder);
tEncoderClear(&encoder);
return -1;
}
tCoderClear(&encoder);
tEncoderClear(&encoder);
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
tEncoderInit(&encoder, buf, tlen);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tCoderClear(&encoder);
tEncoderClear(&encoder);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
if (pRequest == NULL) {
......
......@@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
int32_t mapIndex = i;
// if (pIndexMap) {
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
// }
// if (pIndexMap) {
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
// }
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
......@@ -1596,7 +1596,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
return TSDB_CODE_SUCCESS;
}
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
int32_t vgId) {
SSubmitReq* ret = NULL;
// cal size
......@@ -1608,13 +1609,37 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
cap += sizeof(SSubmitBlk) + rows * maxLen;
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = htobe64(suid);
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
}
// assign data
ret = taosMemoryCalloc(1, cap);
ret = taosMemoryCalloc(1, cap + 46);
ret = POINTER_SHIFT(ret, 46);
ret->header.vgId = vgId;
ret->version = htonl(1);
ret->length = htonl(cap - sizeof(SSubmitReq));
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz);
void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq));
......@@ -1623,19 +1648,47 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
SSubmitBlk* blkHead = submitBlk;
blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->schemaLen = 0;
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = 0;
blkHead->uid = htobe64(pDataBlock->info.uid);
blkHead->suid = htobe64(suid);
// uid is assigned by vnode
blkHead->uid = 0;
int32_t rows = pDataBlock->info.rows;
/*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/
/*blkHead->dataLen = htonl(rows * maxLen);*/
blkHead->dataLen = 0;
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
STSRow* rowData = blockData;
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
SEncoder encoder = {0};
tEncoderInit(&encoder, blockData, schemaLen);
if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL;
tEncoderClear(&encoder);
}
blkHead->schemaLen = htonl(schemaLen);
STSRow* rowData = POINTER_SHIFT(blockData, schemaLen);
for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0};
......@@ -1653,10 +1706,14 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
rowData = POINTER_SHIFT(rowData, rowLen);
blkHead->dataLen += rowLen;
}
int32_t len = blkHead->dataLen;
blkHead->dataLen = htonl(len);
blkHead = POINTER_SHIFT(blkHead, len);
int32_t dataLen = blkHead->dataLen;
blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen);
/*submitBlk = blkHead;*/
}
ret->length = htonl(ret->length);
return ret;
}
此差异已折叠。
......@@ -101,6 +101,7 @@ void qmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, QNODE_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, QNODE_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, qmProcessFetchMsg, QNODE_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, QNODE_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QNODE_HANDLE);
......
......@@ -106,6 +106,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); // sync integration
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
......
......@@ -51,6 +51,7 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) {
pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs;
pMgmt->state = pInfo->vstat;
tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs);
taosArrayDestroy(vloads.pVloads);
}
......@@ -177,6 +178,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
......
......@@ -135,6 +135,7 @@ static void *vmOpenVnodeFunc(void *param) {
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
......@@ -147,7 +148,7 @@ static void *vmOpenVnodeFunc(void *param) {
pThread->failed++;
} else {
vmOpenVnode(pMgmt, pCfg, pImpl);
//vnodeStart(pImpl);
// vnodeStart(pImpl);
dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->opened++;
}
......
......@@ -357,6 +357,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
pMsg->rpcMsg = *pRpc;
// if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0);
switch (qtype) {
case WRITE_QUEUE:
dTrace("msg:%p, will be put into vnode-write queue", pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg);
break;
case QUERY_QUEUE:
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
taosWriteQitem(pVnode->pQueryQ, pMsg);
......@@ -387,6 +391,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
return code;
}
int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE);
}
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
}
......
......@@ -115,7 +115,10 @@ typedef enum {
TRN_TYPE_STB_SCOPE_END,
} ETrnType;
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
typedef enum {
TRN_POLICY_ROLLBACK = 0,
TRN_POLICY_RETRY = 1,
} ETrnPolicy;
typedef enum {
DND_REASON_ONLINE = 0,
......@@ -131,6 +134,15 @@ typedef enum {
DND_REASON_OTHERS
} EDndReason;
typedef enum {
CONSUMER_UPDATE__TOUCH = 1,
CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER,
CONSUMER_UPDATE__MODIFY,
} ECsmUpdateType;
typedef struct {
int32_t id;
ETrnStage stage;
......@@ -386,7 +398,6 @@ typedef struct {
int32_t codeSize;
char* pComment;
char* pCode;
char pData[];
} SFuncObj;
typedef struct {
......@@ -425,18 +436,8 @@ typedef struct {
int64_t offset;
} SMqOffsetObj;
static FORCE_INLINE int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pOffset->key);
tlen += taosEncodeFixedI64(buf, pOffset->offset);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) {
buf = taosDecodeStringTo(buf, pOffset->key);
buf = taosDecodeFixedI64(buf, &pOffset->offset);
return buf;
}
int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
......@@ -459,26 +460,15 @@ typedef struct {
SSchemaWrapper schema;
} SMqTopicObj;
enum {
CONSUMER_UPDATE__TOUCH = 1,
CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER,
CONSUMER_UPDATE__MODIFY,
};
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char appId[TSDB_CGROUP_LEN];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;
// hbStatus is not applicable to serialization
int32_t hbStatus;
// lock is used for topics update
SRWLatch lock;
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char appId[TSDB_CGROUP_LEN];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;
int32_t hbStatus; // hbStatus is not applicable to serialization
SRWLatch lock; // lock is used for topics update
SArray* currentTopics; // SArray<char*>
SArray* rebNewTopics; // SArray<char*>
SArray* rebRemovedTopics; // SArray<char*>
......@@ -492,7 +482,6 @@ typedef struct {
int64_t upTime;
int64_t subscribeTime;
int64_t rebalanceTime;
} SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
......@@ -581,19 +570,19 @@ typedef struct {
} SMqRebOutputObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
int32_t vgNum;
SRWLatch lock;
int8_t status;
// int32_t sqlLen;
char name[TSDB_TOPIC_FNAME_LEN];
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
int32_t vgNum;
SRWLatch lock;
int8_t status;
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
int32_t fixedSinkVgId; // 0 for shuffle
int64_t smaId; // 0 for unused
......@@ -606,8 +595,8 @@ typedef struct {
SSchemaWrapper outputSchema;
} SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj);
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
#ifdef __cplusplus
}
......
......@@ -29,7 +29,8 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
// for trans test
SSdbRaw *mndUserActionEncode(SUserObj *pUser);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
#ifdef __cplusplus
}
......
......@@ -17,8 +17,8 @@
#include "mndCluster.h"
#include "mndShow.h"
#define TSDB_CLUSTER_VER_NUMBE 1
#define TSDB_CLUSTER_RESERVE_SIZE 64
#define CLUSTER_VER_NUMBE 1
#define CLUSTER_RESERVE_SIZE 64
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw);
......@@ -30,14 +30,16 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
int32_t mndInitCluster(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CLUSTER,
.keyType = SDB_KEY_INT64,
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
.insertFp = (SdbInsertFp)mndClusterActionInsert,
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
SSdbTable table = {
.sdbType = SDB_CLUSTER,
.keyType = SDB_KEY_INT64,
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
.insertFp = (SdbInsertFp)mndClusterActionInsert,
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete,
};
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
......@@ -79,19 +81,19 @@ int64_t mndGetClusterId(SMnode *pMnode) {
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, TSDB_CLUSTER_VER_NUMBE, sizeof(SClusterObj) + TSDB_CLUSTER_RESERVE_SIZE);
if (pRaw == NULL) goto CLUSTER_ENCODE_OVER;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + CLUSTER_RESERVE_SIZE);
if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_SET_INT64(pRaw, dataPos, pCluster->id, CLUSTER_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, CLUSTER_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, CLUSTER_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->id, _OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0;
CLUSTER_ENCODE_OVER:
_OVER:
if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to encode to raw:%p since %s", pCluster->id, pRaw, terrstr());
sdbFreeRaw(pRaw);
......@@ -106,29 +108,29 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CLUSTER_DECODE_OVER;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_CLUSTER_VER_NUMBE) {
if (sver != CLUSTER_VER_NUMBE) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto CLUSTER_DECODE_OVER;
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
if (pRow == NULL) goto CLUSTER_DECODE_OVER;
if (pRow == NULL) goto _OVER;
SClusterObj *pCluster = sdbGetRowObj(pRow);
if (pCluster == NULL) goto CLUSTER_DECODE_OVER;
if (pCluster == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_GET_INT64(pRaw, dataPos, &pCluster->id, CLUSTER_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, CLUSTER_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, CLUSTER_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->id, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0;
CLUSTER_DECODE_OVER:
_OVER:
if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr());
taosMemoryFreeClear(pRow);
......@@ -161,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
if (code != 0) {
strcpy(clusterObj.name, "tdengine2.0");
strcpy(clusterObj.name, "tdengine3.0");
mError("failed to get name from system, set to default val %s", clusterObj.name);
}
......@@ -190,8 +192,8 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
if (pShow->pIter == NULL) break;
cols = 0;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pCluster->id, false);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->id, false);
char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->pMeta->pSchemas[cols].bytes);
......@@ -200,7 +202,7 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
colDataAppend(pColInfo, numOfRows, buf, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pCluster->createdTime, false);
colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false);
sdbRelease(pSdb, pCluster);
numOfRows++;
......
......@@ -13,12 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndDef.h"
#include "mndConsumer.h"
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
......@@ -409,11 +411,14 @@ void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
return (void *)buf;
}
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0;
/*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
......@@ -460,9 +465,12 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
return pEncoder->pos;
}
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
......@@ -515,3 +523,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
#endif
return 0;
}
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pOffset->key);
tlen += taosEncodeFixedI64(buf, pOffset->offset);
return tlen;
}
void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
buf = taosDecodeStringTo(buf, pOffset->key);
buf = taosDecodeFixedI64(buf, &pOffset->offset);
return buf;
}
......@@ -157,8 +157,8 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
char *msgStr = pMsg->rpcMsg.pCont;
SMqCMCommitOffsetReq commitOffsetReq;
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msgStr, pMsg->rpcMsg.contLen, TD_DECODER);
SDecoder decoder;
tDecoderInit(&decoder, msgStr, pMsg->rpcMsg.contLen);
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
......
......@@ -83,12 +83,12 @@ END:
}
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
tEncodeSStreamTask(&encoder, pTask);
int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
tCoderClear(&encoder);
tEncoderClear(&encoder);
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -96,9 +96,9 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
}
((SMsgHead*)buf)->vgId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
tEncoderInit(&encoder, abuf, size);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
tEncoderClear(&encoder);
STransAction action = {0};
memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
......@@ -204,6 +204,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
ASSERT(pTask->tbSink.pSchemaWrapper);
}
......@@ -244,9 +245,10 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
//
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
......@@ -319,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
#endif
......
......@@ -412,7 +412,7 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
goto _OVER;
}
if (alterReq.pass[0] == 0) {
if (TSDB_ALTER_USER_PASSWD == alterReq.alterType && alterReq.pass[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_PASS_FORMAT;
goto _OVER;
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -60,4 +60,5 @@ void dsScheduleProcess(void* ahandle, void* pItem) {
void dsDestroyDataSinker(DataSinkHandle handle) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
pHandleImpl->fDestroy(pHandleImpl);
taosMemoryFree(pHandleImpl);
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册