未验证 提交 7c535c40 编写于 作者: D dapan1121 提交者: GitHub

Merge branch '3.0' into feature/qnode

......@@ -14,6 +14,25 @@ MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR})
MESSAGE(STATUS "Project executable files output path: " ${EXECUTABLE_OUTPUT_PATH})
MESSAGE(STATUS "Project library files output path: " ${LIBRARY_OUTPUT_PATH})
find_package(Git QUIET)
if(GIT_FOUND AND EXISTS "${PROJECT_SOURCE_DIR}/.git")
# Update submodules as needed
option(GIT_SUBMODULE "Check submodules during build" ON)
if(GIT_SUBMODULE)
message(STATUS "Submodule update")
execute_process(COMMAND ${GIT_EXECUTABLE} submodule update --init --recursive
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
RESULT_VARIABLE GIT_SUBMOD_RESULT)
if(NOT GIT_SUBMOD_RESULT EQUAL "0")
message(WARNING "git submodule update --init --recursive failed with ${GIT_SUBMOD_RESULT}, please checkout submodules")
endif()
endif()
endif()
if(NOT EXISTS "${PROJECT_SOURCE_DIR}/tools/taos-tools/CMakeLists.txt")
message(WARNING "The submodules were not downloaded! GIT_SUBMODULE was turned off or failed. Please update submodules manually if you need build them.")
endif()
if (NOT DEFINED TD_GRANT)
SET(TD_GRANT FALSE)
endif()
......@@ -46,7 +65,7 @@ ENDIF ()
IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/W3 /D_WIN32 /vmg")
SET(COMMON_FLAGS "/W3 /D_WIN32")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
......
......@@ -195,7 +195,7 @@ DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
#endif
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision, bool dataFormat);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
/* --------------------------TMQ INTERFACE------------------------------- */
......
......@@ -45,6 +45,12 @@ enum {
STREAM_TRIGGER__BY_EVENT_TIME,
};
typedef enum EStreamType {
STREAM_NORMAL = 1,
STREAM_INVERT,
STREAM_INVALID,
} EStreamType;
typedef struct {
uint32_t numOfTables;
SArray* pGroupList;
......@@ -71,6 +77,7 @@ typedef struct SDataBlockInfo {
int16_t numOfCols;
int16_t hasVarCol;
int32_t capacity;
EStreamType type;
} SDataBlockInfo;
typedef struct SSDataBlock {
......@@ -115,8 +122,6 @@ void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData);
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols,
// sometimes info.numOfCols != array size
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
......
......@@ -135,6 +135,8 @@ typedef enum _mgmt_table {
#define TSDB_ALTER_USER_ADD_WRITE_DB 0x6
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7
#define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8
#define TSDB_ALTER_USER_ADD_ALL_DB 0x9
#define TSDB_ALTER_USER_REMOVE_ALL_DB 0xA
#define TSDB_ALTER_USER_PRIVILEGES 0x2
......@@ -326,11 +328,11 @@ typedef struct {
int8_t alterType;
int32_t numOfFields;
SArray* pFields;
} SMAltertbReq;
} SMAlterStbReq;
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
void tFreeSMAltertbReq(SMAltertbReq* pReq);
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
void tFreeSMAltertbReq(SMAlterStbReq* pReq);
typedef struct SEpSet {
int8_t inUse;
......@@ -338,8 +340,8 @@ typedef struct SEpSet {
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t tEncodeSEpSet(SEncoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SDecoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
......@@ -611,8 +613,8 @@ typedef struct {
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp);
int32_t tSerializeSUseDbRspImp(SEncoder* pEncoder, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRspImp(SDecoder* pDecoder, SUseDbRsp* pRsp);
void tFreeSUsedbRsp(SUseDbRsp* pRsp);
typedef struct {
......@@ -1527,8 +1529,8 @@ typedef struct {
char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2
} SRSmaParam;
int32_t tEncodeSRSmaParam(SCoder* pCoder, const SRSmaParam* pRSmaParam);
int32_t tDecodeSRSmaParam(SCoder* pCoder, SRSmaParam* pRSmaParam);
int32_t tEncodeSRSmaParam(SEncoder* pCoder, const SRSmaParam* pRSmaParam);
int32_t tDecodeSRSmaParam(SDecoder* pCoder, SRSmaParam* pRSmaParam);
// TDMT_VND_CREATE_STB ==============
typedef struct SVCreateStbReq {
......@@ -1540,8 +1542,8 @@ typedef struct SVCreateStbReq {
SRSmaParam pRSmaParam;
} SVCreateStbReq;
int tEncodeSVCreateStbReq(SCoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq);
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
int tDecodeSVCreateStbReq(SDecoder* pCoder, SVCreateStbReq* pReq);
// TDMT_VND_DROP_STB ==============
typedef struct SVDropStbReq {
......@@ -1549,8 +1551,8 @@ typedef struct SVDropStbReq {
tb_uid_t suid;
} SVDropStbReq;
int32_t tEncodeSVDropStbReq(SCoder* pCoder, const SVDropStbReq* pReq);
int32_t tDecodeSVDropStbReq(SCoder* pCoder, SVDropStbReq* pReq);
int32_t tEncodeSVDropStbReq(SEncoder* pCoder, const SVDropStbReq* pReq);
int32_t tDecodeSVDropStbReq(SDecoder* pCoder, SVDropStbReq* pReq);
#define TD_CREATE_IF_NOT_EXISTS 0x1
typedef struct SVCreateTbReq {
......@@ -1562,8 +1564,8 @@ typedef struct SVCreateTbReq {
int8_t type;
union {
struct {
tb_uid_t suid;
const void* pTag;
tb_uid_t suid;
const uint8_t* pTag;
} ctb;
struct {
SSchemaWrapper schema;
......@@ -1571,8 +1573,8 @@ typedef struct SVCreateTbReq {
};
} SVCreateTbReq;
int tEncodeSVCreateTbReq(SCoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SCoder* pCoder, SVCreateTbReq* pReq);
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
typedef struct {
int32_t nReqs;
......@@ -1582,15 +1584,15 @@ typedef struct {
};
} SVCreateTbBatchReq;
int tEncodeSVCreateTbBatchReq(SCoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SCoder* pCoder, SVCreateTbBatchReq* pReq);
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
int tDecodeSVCreateTbBatchReq(SDecoder* pCoder, SVCreateTbBatchReq* pReq);
typedef struct {
int32_t code;
} SVCreateTbRsp, SVUpdateTbRsp;
int tEncodeSVCreateTbRsp(SCoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SCoder* pCoder, SVCreateTbRsp* pRsp);
int tEncodeSVCreateTbRsp(SEncoder* pCoder, const SVCreateTbRsp* pRsp);
int tDecodeSVCreateTbRsp(SDecoder* pCoder, SVCreateTbRsp* pRsp);
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
......@@ -1603,8 +1605,8 @@ typedef struct {
};
} SVCreateTbBatchRsp;
int tEncodeSVCreateTbBatchRsp(SCoder* pCoder, const SVCreateTbBatchRsp* pRsp);
int tDecodeSVCreateTbBatchRsp(SCoder* pCoder, SVCreateTbBatchRsp* pRsp);
int tEncodeSVCreateTbBatchRsp(SEncoder* pCoder, const SVCreateTbBatchRsp* pRsp);
int tDecodeSVCreateTbBatchRsp(SDecoder* pCoder, SVCreateTbBatchRsp* pRsp);
int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
......@@ -1627,8 +1629,8 @@ typedef struct {
};
} SVDropTbBatchReq;
int32_t tEncodeSVDropTbBatchReq(SCoder* pCoder, const SVDropTbBatchReq* pReq);
int32_t tDecodeSVDropTbBatchReq(SCoder* pCoder, SVDropTbBatchReq* pReq);
int32_t tEncodeSVDropTbBatchReq(SEncoder* pCoder, const SVDropTbBatchReq* pReq);
int32_t tDecodeSVDropTbBatchReq(SDecoder* pCoder, SVDropTbBatchReq* pReq);
typedef struct {
int32_t nRsps;
......@@ -1638,8 +1640,8 @@ typedef struct {
};
} SVDropTbBatchRsp;
int32_t tEncodeSVDropTbBatchRsp(SCoder* pCoder, const SVDropTbBatchRsp* pRsp);
int32_t tDecodeSVDropTbBatchRsp(SCoder* pCoder, SVDropTbBatchRsp* pRsp);
int32_t tEncodeSVDropTbBatchRsp(SEncoder* pCoder, const SVDropTbBatchRsp* pRsp);
int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
typedef struct {
SMsgHead head;
......@@ -1821,14 +1823,14 @@ static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) {
static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) {
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (uint8_t*)pKv->value, pKv->valueLen) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
static FORCE_INLINE int32_t tDecodeSKv(SDecoder* pDecoder, SKv* pKv) {
if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1;
if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1;
pKv->value = taosMemoryMalloc(pKv->valueLen + 1);
......@@ -1837,13 +1839,13 @@ static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
return 0;
}
static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) {
static FORCE_INLINE int32_t tEncodeSClientHbKey(SEncoder* pEncoder, const SClientHbKey* pKey) {
if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1;
if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) {
static FORCE_INLINE int32_t tDecodeSClientHbKey(SDecoder* pDecoder, SClientHbKey* pKey) {
if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1;
if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1;
return 0;
......@@ -2046,10 +2048,10 @@ typedef struct {
int32_t reserved;
} SMqCMCommitOffsetRsp;
int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq);
int32_t tEncodeSMqOffset(SEncoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq);
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
......@@ -2089,7 +2091,7 @@ static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
......@@ -2098,7 +2100,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSch
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
......@@ -2131,7 +2133,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
for (int32_t i = 0; i < pSW->nCols; i++) {
......@@ -2141,7 +2143,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchem
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapper* pSW) {
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
......@@ -2593,12 +2595,12 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
#define TD_AUTO_CREATE_TABLE 0x1
typedef struct {
int64_t suid;
int64_t uid;
int32_t sver;
uint64_t nData;
const void* pData;
SVCreateTbReq cTbReq;
int64_t suid;
int64_t uid;
int32_t sver;
uint32_t nData;
const uint8_t* pData;
SVCreateTbReq cTbReq;
} SVSubmitBlk;
typedef struct {
......@@ -2610,8 +2612,8 @@ typedef struct {
};
} SVSubmitReq;
int32_t tEncodeSVSubmitReq(SCoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SCoder* pCoder, SVSubmitReq* pReq);
int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
#pragma pack(pop)
......
......@@ -51,201 +51,208 @@
#define TK_USER 33
#define TK_PRIVILEGE 34
#define TK_DROP 35
#define TK_DNODE 36
#define TK_PORT 37
#define TK_NK_INTEGER 38
#define TK_DNODES 39
#define TK_NK_IPTOKEN 40
#define TK_LOCAL 41
#define TK_QNODE 42
#define TK_ON 43
#define TK_BNODE 44
#define TK_SNODE 45
#define TK_MNODE 46
#define TK_DATABASE 47
#define TK_USE 48
#define TK_IF 49
#define TK_NOT 50
#define TK_EXISTS 51
#define TK_BUFFER 52
#define TK_CACHELAST 53
#define TK_COMP 54
#define TK_DAYS 55
#define TK_NK_VARIABLE 56
#define TK_FSYNC 57
#define TK_MAXROWS 58
#define TK_MINROWS 59
#define TK_KEEP 60
#define TK_PAGES 61
#define TK_PAGESIZE 62
#define TK_PRECISION 63
#define TK_REPLICA 64
#define TK_STRICT 65
#define TK_WAL 66
#define TK_VGROUPS 67
#define TK_SINGLE_STABLE 68
#define TK_RETENTIONS 69
#define TK_NK_COMMA 70
#define TK_NK_COLON 71
#define TK_TABLE 72
#define TK_NK_LP 73
#define TK_NK_RP 74
#define TK_STABLE 75
#define TK_ADD 76
#define TK_COLUMN 77
#define TK_MODIFY 78
#define TK_RENAME 79
#define TK_TAG 80
#define TK_SET 81
#define TK_NK_EQ 82
#define TK_USING 83
#define TK_TAGS 84
#define TK_NK_DOT 85
#define TK_COMMENT 86
#define TK_BOOL 87
#define TK_TINYINT 88
#define TK_SMALLINT 89
#define TK_INT 90
#define TK_INTEGER 91
#define TK_BIGINT 92
#define TK_FLOAT 93
#define TK_DOUBLE 94
#define TK_BINARY 95
#define TK_TIMESTAMP 96
#define TK_NCHAR 97
#define TK_UNSIGNED 98
#define TK_JSON 99
#define TK_VARCHAR 100
#define TK_MEDIUMBLOB 101
#define TK_BLOB 102
#define TK_VARBINARY 103
#define TK_DECIMAL 104
#define TK_DELAY 105
#define TK_FILE_FACTOR 106
#define TK_NK_FLOAT 107
#define TK_ROLLUP 108
#define TK_TTL 109
#define TK_SMA 110
#define TK_SHOW 111
#define TK_DATABASES 112
#define TK_TABLES 113
#define TK_STABLES 114
#define TK_MNODES 115
#define TK_MODULES 116
#define TK_QNODES 117
#define TK_FUNCTIONS 118
#define TK_INDEXES 119
#define TK_FROM 120
#define TK_ACCOUNTS 121
#define TK_APPS 122
#define TK_CONNECTIONS 123
#define TK_LICENCE 124
#define TK_GRANTS 125
#define TK_QUERIES 126
#define TK_SCORES 127
#define TK_TOPICS 128
#define TK_VARIABLES 129
#define TK_BNODES 130
#define TK_SNODES 131
#define TK_CLUSTER 132
#define TK_LIKE 133
#define TK_INDEX 134
#define TK_FULLTEXT 135
#define TK_FUNCTION 136
#define TK_INTERVAL 137
#define TK_TOPIC 138
#define TK_AS 139
#define TK_WITH 140
#define TK_SCHEMA 141
#define TK_DESC 142
#define TK_DESCRIBE 143
#define TK_RESET 144
#define TK_QUERY 145
#define TK_CACHE 146
#define TK_EXPLAIN 147
#define TK_ANALYZE 148
#define TK_VERBOSE 149
#define TK_NK_BOOL 150
#define TK_RATIO 151
#define TK_COMPACT 152
#define TK_VNODES 153
#define TK_IN 154
#define TK_OUTPUTTYPE 155
#define TK_AGGREGATE 156
#define TK_BUFSIZE 157
#define TK_STREAM 158
#define TK_INTO 159
#define TK_TRIGGER 160
#define TK_AT_ONCE 161
#define TK_WINDOW_CLOSE 162
#define TK_WATERMARK 163
#define TK_KILL 164
#define TK_CONNECTION 165
#define TK_MERGE 166
#define TK_VGROUP 167
#define TK_REDISTRIBUTE 168
#define TK_SPLIT 169
#define TK_SYNCDB 170
#define TK_NULL 171
#define TK_NK_QUESTION 172
#define TK_NK_ARROW 173
#define TK_ROWTS 174
#define TK_TBNAME 175
#define TK_QSTARTTS 176
#define TK_QENDTS 177
#define TK_WSTARTTS 178
#define TK_WENDTS 179
#define TK_WDURATION 180
#define TK_CAST 181
#define TK_NOW 182
#define TK_TODAY 183
#define TK_TIMEZONE 184
#define TK_COUNT 185
#define TK_FIRST 186
#define TK_LAST 187
#define TK_LAST_ROW 188
#define TK_BETWEEN 189
#define TK_IS 190
#define TK_NK_LT 191
#define TK_NK_GT 192
#define TK_NK_LE 193
#define TK_NK_GE 194
#define TK_NK_NE 195
#define TK_MATCH 196
#define TK_NMATCH 197
#define TK_CONTAINS 198
#define TK_JOIN 199
#define TK_INNER 200
#define TK_SELECT 201
#define TK_DISTINCT 202
#define TK_WHERE 203
#define TK_PARTITION 204
#define TK_BY 205
#define TK_SESSION 206
#define TK_STATE_WINDOW 207
#define TK_SLIDING 208
#define TK_FILL 209
#define TK_VALUE 210
#define TK_NONE 211
#define TK_PREV 212
#define TK_LINEAR 213
#define TK_NEXT 214
#define TK_GROUP 215
#define TK_HAVING 216
#define TK_ORDER 217
#define TK_SLIMIT 218
#define TK_SOFFSET 219
#define TK_LIMIT 220
#define TK_OFFSET 221
#define TK_ASC 222
#define TK_NULLS 223
#define TK_ID 224
#define TK_NK_BITNOT 225
#define TK_INSERT 226
#define TK_VALUES 227
#define TK_IMPORT 228
#define TK_NK_SEMI 229
#define TK_FILE 230
#define TK_GRANT 36
#define TK_ON 37
#define TK_TO 38
#define TK_REVOKE 39
#define TK_FROM 40
#define TK_NK_COMMA 41
#define TK_READ 42
#define TK_WRITE 43
#define TK_NK_DOT 44
#define TK_DNODE 45
#define TK_PORT 46
#define TK_NK_INTEGER 47
#define TK_DNODES 48
#define TK_NK_IPTOKEN 49
#define TK_LOCAL 50
#define TK_QNODE 51
#define TK_BNODE 52
#define TK_SNODE 53
#define TK_MNODE 54
#define TK_DATABASE 55
#define TK_USE 56
#define TK_IF 57
#define TK_NOT 58
#define TK_EXISTS 59
#define TK_BUFFER 60
#define TK_CACHELAST 61
#define TK_COMP 62
#define TK_DAYS 63
#define TK_NK_VARIABLE 64
#define TK_FSYNC 65
#define TK_MAXROWS 66
#define TK_MINROWS 67
#define TK_KEEP 68
#define TK_PAGES 69
#define TK_PAGESIZE 70
#define TK_PRECISION 71
#define TK_REPLICA 72
#define TK_STRICT 73
#define TK_WAL 74
#define TK_VGROUPS 75
#define TK_SINGLE_STABLE 76
#define TK_RETENTIONS 77
#define TK_NK_COLON 78
#define TK_TABLE 79
#define TK_NK_LP 80
#define TK_NK_RP 81
#define TK_STABLE 82
#define TK_ADD 83
#define TK_COLUMN 84
#define TK_MODIFY 85
#define TK_RENAME 86
#define TK_TAG 87
#define TK_SET 88
#define TK_NK_EQ 89
#define TK_USING 90
#define TK_TAGS 91
#define TK_COMMENT 92
#define TK_BOOL 93
#define TK_TINYINT 94
#define TK_SMALLINT 95
#define TK_INT 96
#define TK_INTEGER 97
#define TK_BIGINT 98
#define TK_FLOAT 99
#define TK_DOUBLE 100
#define TK_BINARY 101
#define TK_TIMESTAMP 102
#define TK_NCHAR 103
#define TK_UNSIGNED 104
#define TK_JSON 105
#define TK_VARCHAR 106
#define TK_MEDIUMBLOB 107
#define TK_BLOB 108
#define TK_VARBINARY 109
#define TK_DECIMAL 110
#define TK_DELAY 111
#define TK_FILE_FACTOR 112
#define TK_NK_FLOAT 113
#define TK_ROLLUP 114
#define TK_TTL 115
#define TK_SMA 116
#define TK_SHOW 117
#define TK_DATABASES 118
#define TK_TABLES 119
#define TK_STABLES 120
#define TK_MNODES 121
#define TK_MODULES 122
#define TK_QNODES 123
#define TK_FUNCTIONS 124
#define TK_INDEXES 125
#define TK_ACCOUNTS 126
#define TK_APPS 127
#define TK_CONNECTIONS 128
#define TK_LICENCE 129
#define TK_GRANTS 130
#define TK_QUERIES 131
#define TK_SCORES 132
#define TK_TOPICS 133
#define TK_VARIABLES 134
#define TK_BNODES 135
#define TK_SNODES 136
#define TK_CLUSTER 137
#define TK_TRANSACTIONS 138
#define TK_LIKE 139
#define TK_INDEX 140
#define TK_FULLTEXT 141
#define TK_FUNCTION 142
#define TK_INTERVAL 143
#define TK_TOPIC 144
#define TK_AS 145
#define TK_WITH 146
#define TK_SCHEMA 147
#define TK_DESC 148
#define TK_DESCRIBE 149
#define TK_RESET 150
#define TK_QUERY 151
#define TK_CACHE 152
#define TK_EXPLAIN 153
#define TK_ANALYZE 154
#define TK_VERBOSE 155
#define TK_NK_BOOL 156
#define TK_RATIO 157
#define TK_COMPACT 158
#define TK_VNODES 159
#define TK_IN 160
#define TK_OUTPUTTYPE 161
#define TK_AGGREGATE 162
#define TK_BUFSIZE 163
#define TK_STREAM 164
#define TK_INTO 165
#define TK_TRIGGER 166
#define TK_AT_ONCE 167
#define TK_WINDOW_CLOSE 168
#define TK_WATERMARK 169
#define TK_KILL 170
#define TK_CONNECTION 171
#define TK_TRANSACTION 172
#define TK_MERGE 173
#define TK_VGROUP 174
#define TK_REDISTRIBUTE 175
#define TK_SPLIT 176
#define TK_SYNCDB 177
#define TK_NULL 178
#define TK_NK_QUESTION 179
#define TK_NK_ARROW 180
#define TK_ROWTS 181
#define TK_TBNAME 182
#define TK_QSTARTTS 183
#define TK_QENDTS 184
#define TK_WSTARTTS 185
#define TK_WENDTS 186
#define TK_WDURATION 187
#define TK_CAST 188
#define TK_NOW 189
#define TK_TODAY 190
#define TK_TIMEZONE 191
#define TK_COUNT 192
#define TK_FIRST 193
#define TK_LAST 194
#define TK_LAST_ROW 195
#define TK_BETWEEN 196
#define TK_IS 197
#define TK_NK_LT 198
#define TK_NK_GT 199
#define TK_NK_LE 200
#define TK_NK_GE 201
#define TK_NK_NE 202
#define TK_MATCH 203
#define TK_NMATCH 204
#define TK_CONTAINS 205
#define TK_JOIN 206
#define TK_INNER 207
#define TK_SELECT 208
#define TK_DISTINCT 209
#define TK_WHERE 210
#define TK_PARTITION 211
#define TK_BY 212
#define TK_SESSION 213
#define TK_STATE_WINDOW 214
#define TK_SLIDING 215
#define TK_FILL 216
#define TK_VALUE 217
#define TK_NONE 218
#define TK_PREV 219
#define TK_LINEAR 220
#define TK_NEXT 221
#define TK_GROUP 222
#define TK_HAVING 223
#define TK_ORDER 224
#define TK_SLIMIT 225
#define TK_SOFFSET 226
#define TK_LIMIT 227
#define TK_OFFSET 228
#define TK_ASC 229
#define TK_NULLS 230
#define TK_ID 231
#define TK_NK_BITNOT 232
#define TK_INSERT 233
#define TK_VALUES 234
#define TK_IMPORT 235
#define TK_NK_SEMI 236
#define TK_FILE 237
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -165,15 +165,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
*/
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
/**
* release the query handle and decrease the reference count in cache
* @param pMgmt
* @param pQInfo
* @param freeHandle
* @return
*/
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes);
......
......@@ -168,6 +168,9 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId);
#ifdef __cplusplus
}
......
......@@ -44,7 +44,8 @@ enum {
UDFC_CODE_PIPE_READ_ERR = -2,
UDFC_CODE_CONNECT_PIPE_ERR = -3,
UDFC_CODE_LOAD_UDF_FAILURE = -4,
UDFC_CODE_INVALID_STATE = -5
UDFC_CODE_INVALID_STATE = -5,
UDFC_CODE_NO_PIPE = -6,
};
typedef void *UdfcFuncHandle;
......@@ -140,6 +141,44 @@ typedef int32_t (*TUdfDestroyFunc)();
#define UDF_MEMORY_EXP_GROWTH 1.5
#define udfColDataIsNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] == -1)
#define udfColDataIsNull_f(pColumn, row) ((BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) & (1u << (7u - BitPos(row)))) == (1u << (7u - BitPos(row))))
#define udfColDataSetNull_f(pColumn, row) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, row) |= (1u << (7u - BitPos(row))); \
} while (0)
#define udfColDataSetNotNull_f(pColumn, r_) \
do { \
BMCharPos(pColumn->colData.fixLenCol.nullBitmap, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1)
static FORCE_INLINE char* udfColDataGetData(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
return pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row];
} else {
return pColumn->colData.fixLenCol.data + pColumn->colMeta.bytes * row;
}
}
static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn* pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) {
if (udfColDataIsNull_var(pColumn, row)) {
return true;
}
char* data = udfColDataGetData(pColumn, row);
return (*data == TSDB_DATA_TYPE_NULL);
} else {
return udfColDataIsNull_var(pColumn, row);
}
} else {
return udfColDataIsNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
......@@ -186,17 +225,22 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t ne
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) {
udfColEnsureCapacity(pColumn, row+1);
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
udfColDataSetNull_var(pColumn, row);
} else {
udfColDataSetNull_f(pColumn, row);
}
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow+1);
bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) {
if (isVarCol) {
data->varLenCol.varOffsets[currentRow] = -1;
} else {
colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow);
}
udfColDataSetNull(pColumn, currentRow);
} else {
if (!isVarCol) {
colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow);
......
......@@ -310,6 +310,29 @@ typedef struct SCreateFunctionStmt {
int32_t bufSize;
} SCreateFunctionStmt;
typedef struct SDropFunctionStmt {
ENodeType type;
char funcName[TSDB_FUNC_NAME_LEN];
bool ignoreNotExists;
} SDropFunctionStmt;
#define PRIVILEGE_TYPE_MASK(n) (1 << n)
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0)
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1)
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2)
#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef struct SGrantStmt {
ENodeType type;
char userName[TSDB_USER_LEN];
char dbName[TSDB_DB_NAME_LEN];
int64_t privileges;
} SGrantStmt;
typedef SGrantStmt SRevokeStmt;
#ifdef __cplusplus
}
#endif
......
......@@ -143,6 +143,8 @@ typedef enum ENodeType {
QUERY_NODE_REDISTRIBUTE_VGROUP_STMT,
QUERY_NODE_SPLIT_VGROUP_STMT,
QUERY_NODE_SYNCDB_STMT,
QUERY_NODE_GRANT_STMT,
QUERY_NODE_REVOKE_STMT,
QUERY_NODE_SHOW_DNODES_STMT,
QUERY_NODE_SHOW_MNODES_STMT,
QUERY_NODE_SHOW_MODULES_STMT,
......@@ -174,8 +176,10 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
QUERY_NODE_SHOW_CREATE_STABLE_STMT,
QUERY_NODE_SHOW_TRANSACTIONS_STMT,
QUERY_NODE_KILL_CONNECTION_STMT,
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN,
......
......@@ -25,20 +25,18 @@ extern "C" {
typedef struct SPlanContext {
uint64_t queryId;
int32_t acctId;
SEpSet mgmtEpSet;
SNode* pAstRoot;
bool topicQuery;
bool streamQuery;
bool rSmaQuery;
bool showRewrite;
int8_t triggerType;
int64_t watermark;
int32_t placeholderNum;
void* pTransporter;
struct SCatalog* pCatalog;
char* pMsg;
int32_t msgLen;
int32_t acctId;
SEpSet mgmtEpSet;
SNode* pAstRoot;
bool topicQuery;
bool streamQuery;
bool rSmaQuery;
bool showRewrite;
int8_t triggerType;
int64_t watermark;
int32_t placeholderNum;
char* pMsg;
int32_t msgLen;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......@@ -47,7 +45,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @pSubplan subplan to be schedule
// @groupId id of a group of datasource subplans of this @pSubplan
// @pSource one execution location of this group of datasource subplans
// @pSource one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource);
int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colIdx, uint64_t queryId);
......@@ -56,7 +54,7 @@ int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_MULTI_BIND* pParams, int32_t colI
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan);
char* qQueryPlanToString(const SQueryPlan* pPlan);
char* qQueryPlanToString(const SQueryPlan* pPlan);
SQueryPlan* qStringToQueryPlan(const char* pStr);
void qDestroyQueryPlan(SQueryPlan* pPlan);
......
......@@ -154,8 +154,8 @@ typedef struct {
} SStreamTask;
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
typedef struct {
......
......@@ -36,7 +36,7 @@ typedef struct SUpdateInfo {
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
void updateInfoDestroy(SUpdateInfo *pInfo);
#ifdef __cplusplus
......
......@@ -328,29 +328,32 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0601)
#define TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0602)
#define TSDB_CODE_TDB_TABLE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0603)
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0604)
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0605)
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0606)
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0607)
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0608)
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0609)
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060A)
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060B)
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060C)
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x060D)
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x060E)
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x060F)
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0610)
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0611)
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0612)
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613)
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614)
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x0620)
#define TSDB_CODE_TDB_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0604)
#define TSDB_CODE_TDB_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0605)
#define TSDB_CODE_TDB_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0606)
#define TSDB_CODE_TDB_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0607)
#define TSDB_CODE_TDB_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0608)
#define TSDB_CODE_TDB_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0609)
#define TSDB_CODE_TDB_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x060A)
#define TSDB_CODE_TDB_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x060B)
#define TSDB_CODE_TDB_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x060C)
#define TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE TAOS_DEF_ERROR_CODE(0, 0x060D)
#define TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x060E)
#define TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x060F)
#define TSDB_CODE_TDB_INVALID_ACTION TAOS_DEF_ERROR_CODE(0, 0x0600)
#define TSDB_CODE_TDB_INVALID_CREATE_TB_MSG TAOS_DEF_ERROR_CODE(0, 0x0601)
#define TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM TAOS_DEF_ERROR_CODE(0, 0x0602)
#define TSDB_CODE_TDB_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0613)
#define TSDB_CODE_TDB_TABLE_RECONFIGURE TAOS_DEF_ERROR_CODE(0, 0x0614)
#define TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO TAOS_DEF_ERROR_CODE(0, 0x0615)
#define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0616)
#define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0617)
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0618)
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x061A)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061B)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x061C)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x062D)
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
......@@ -630,6 +633,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_FIRST_COLUMN TAOS_DEF_ERROR_CODE(0, 0x2641)
#define TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN TAOS_DEF_ERROR_CODE(0, 0x2642)
#define TSDB_CODE_PAR_INVALID_TAGS_NUM TAOS_DEF_ERROR_CODE(0, 0x2643)
#define TSDB_CODE_PAR_PERMISSION_DENIED TAOS_DEF_ERROR_CODE(0, 0x2644)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
此差异已折叠。
......@@ -162,18 +162,17 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj*
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = {
.requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = topicQuery,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb,
};
SParseContext cxt = {.requestId = pRequest->requestId,
.acctId = pTscObj->acctId,
.db = pRequest->pDb,
.topicQuery = topicQuery,
.pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter,
.pStmtCb = pStmtCb,
.pUser = pTscObj->user};
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
......@@ -232,11 +231,15 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.placeholderNum = pQuery->placeholderNum};
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, pNodeList);
}
if (TSDB_CODE_SUCCESS == code) {
code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
}
......
......@@ -1724,13 +1724,13 @@ cleanup:
*
*/
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision, bool dataFormat) {
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT);
if(!request){
return NULL;
}
SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, dataFormat);
SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, false);
if(!info){
return (TAOS_RES*)request;
}
......
......@@ -547,21 +547,21 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
req.offsets = (SMqOffset*)offsets->container.pData;
}
SCoder encoder;
SEncoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncoderInit(&encoder, NULL, 0);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
int32_t tlen = encoder.pos;
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
tCoderClear(&encoder);
tEncoderClear(&encoder);
return -1;
}
tCoderClear(&encoder);
tEncoderClear(&encoder);
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
tEncoderInit(&encoder, buf, tlen);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tCoderClear(&encoder);
tEncoderClear(&encoder);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
if (pRequest == NULL) {
......
此差异已折叠。
......@@ -115,7 +115,10 @@ typedef enum {
TRN_TYPE_STB_SCOPE_END,
} ETrnType;
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
typedef enum {
TRN_POLICY_ROLLBACK = 0,
TRN_POLICY_RETRY = 1,
} ETrnPolicy;
typedef enum {
DND_REASON_ONLINE = 0,
......@@ -131,6 +134,15 @@ typedef enum {
DND_REASON_OTHERS
} EDndReason;
typedef enum {
CONSUMER_UPDATE__TOUCH = 1,
CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER,
CONSUMER_UPDATE__MODIFY,
} ECsmUpdateType;
typedef struct {
int32_t id;
ETrnStage stage;
......@@ -386,7 +398,6 @@ typedef struct {
int32_t codeSize;
char* pComment;
char* pCode;
char pData[];
} SFuncObj;
typedef struct {
......@@ -425,18 +436,8 @@ typedef struct {
int64_t offset;
} SMqOffsetObj;
static FORCE_INLINE int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pOffset->key);
tlen += taosEncodeFixedI64(buf, pOffset->offset);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset) {
buf = taosDecodeStringTo(buf, pOffset->key);
buf = taosDecodeFixedI64(buf, &pOffset->offset);
return buf;
}
int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
......@@ -459,26 +460,15 @@ typedef struct {
SSchemaWrapper schema;
} SMqTopicObj;
enum {
CONSUMER_UPDATE__TOUCH = 1,
CONSUMER_UPDATE__ADD,
CONSUMER_UPDATE__REMOVE,
CONSUMER_UPDATE__LOST,
CONSUMER_UPDATE__RECOVER,
CONSUMER_UPDATE__MODIFY,
};
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char appId[TSDB_CGROUP_LEN];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;
// hbStatus is not applicable to serialization
int32_t hbStatus;
// lock is used for topics update
SRWLatch lock;
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char appId[TSDB_CGROUP_LEN];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;
int32_t hbStatus; // hbStatus is not applicable to serialization
SRWLatch lock; // lock is used for topics update
SArray* currentTopics; // SArray<char*>
SArray* rebNewTopics; // SArray<char*>
SArray* rebRemovedTopics; // SArray<char*>
......@@ -492,7 +482,6 @@ typedef struct {
int64_t upTime;
int64_t subscribeTime;
int64_t rebalanceTime;
} SMqConsumerObj;
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
......@@ -581,19 +570,18 @@ typedef struct {
} SMqRebOutputObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
int32_t vgNum;
SRWLatch lock;
int8_t status;
// int32_t sqlLen;
char name[TSDB_TOPIC_FNAME_LEN];
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int32_t version;
int32_t vgNum;
SRWLatch lock;
int8_t status;
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
int32_t fixedSinkVgId; // 0 for shuffle
int64_t smaId; // 0 for unused
......@@ -606,8 +594,8 @@ typedef struct {
SSchemaWrapper outputSchema;
} SStreamObj;
int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj);
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
#ifdef __cplusplus
}
......
......@@ -29,7 +29,8 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
// for trans test
SSdbRaw *mndUserActionEncode(SUserObj *pUser);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t *pRspLen);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
#ifdef __cplusplus
}
......
......@@ -17,8 +17,8 @@
#include "mndCluster.h"
#include "mndShow.h"
#define TSDB_CLUSTER_VER_NUMBE 1
#define TSDB_CLUSTER_RESERVE_SIZE 64
#define CLUSTER_VER_NUMBE 1
#define CLUSTER_RESERVE_SIZE 64
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw);
......@@ -30,14 +30,16 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
int32_t mndInitCluster(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CLUSTER,
.keyType = SDB_KEY_INT64,
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
.insertFp = (SdbInsertFp)mndClusterActionInsert,
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
SSdbTable table = {
.sdbType = SDB_CLUSTER,
.keyType = SDB_KEY_INT64,
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
.insertFp = (SdbInsertFp)mndClusterActionInsert,
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete,
};
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndRetrieveClusters);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CLUSTER, mndCancelGetNextCluster);
......@@ -79,19 +81,19 @@ int64_t mndGetClusterId(SMnode *pMnode) {
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, TSDB_CLUSTER_VER_NUMBE, sizeof(SClusterObj) + TSDB_CLUSTER_RESERVE_SIZE);
if (pRaw == NULL) goto CLUSTER_ENCODE_OVER;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, CLUSTER_VER_NUMBE, sizeof(SClusterObj) + CLUSTER_RESERVE_SIZE);
if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_SET_INT64(pRaw, dataPos, pCluster->id, CLUSTER_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, CLUSTER_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, CLUSTER_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->id, _OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0;
CLUSTER_ENCODE_OVER:
_OVER:
if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to encode to raw:%p since %s", pCluster->id, pRaw, terrstr());
sdbFreeRaw(pRaw);
......@@ -106,29 +108,29 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CLUSTER_DECODE_OVER;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_CLUSTER_VER_NUMBE) {
if (sver != CLUSTER_VER_NUMBE) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto CLUSTER_DECODE_OVER;
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
if (pRow == NULL) goto CLUSTER_DECODE_OVER;
if (pRow == NULL) goto _OVER;
SClusterObj *pCluster = sdbGetRowObj(pRow);
if (pCluster == NULL) goto CLUSTER_DECODE_OVER;
if (pCluster == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_GET_INT64(pRaw, dataPos, &pCluster->id, CLUSTER_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, CLUSTER_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, CLUSTER_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, CLUSTER_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_CLUSTER_RESERVE_SIZE, CLUSTER_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->id, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0;
CLUSTER_DECODE_OVER:
_OVER:
if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr());
taosMemoryFreeClear(pRow);
......@@ -161,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
if (code != 0) {
strcpy(clusterObj.name, "tdengine2.0");
strcpy(clusterObj.name, "tdengine3.0");
mError("failed to get name from system, set to default val %s", clusterObj.name);
}
......@@ -190,8 +192,8 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
if (pShow->pIter == NULL) break;
cols = 0;
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pCluster->id, false);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->id, false);
char buf[tListLen(pCluster->name) + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(buf, pCluster->name, pShow->pMeta->pSchemas[cols].bytes);
......@@ -200,7 +202,7 @@ static int32_t mndRetrieveClusters(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock
colDataAppend(pColInfo, numOfRows, buf, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char*) &pCluster->createdTime, false);
colDataAppend(pColInfo, numOfRows, (const char *)&pCluster->createdTime, false);
sdbRelease(pSdb, pCluster);
numOfRows++;
......
......@@ -13,12 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndDef.h"
#include "mndConsumer.h"
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
......@@ -409,7 +411,7 @@ void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
return (void *)buf;
}
int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t sz = 0;
/*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
......@@ -460,7 +462,7 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
return pEncoder->pos;
}
int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
......@@ -515,3 +517,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
#endif
return 0;
}
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pOffset->key);
tlen += taosEncodeFixedI64(buf, pOffset->offset);
return tlen;
}
void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
buf = taosDecodeStringTo(buf, pOffset->key);
buf = taosDecodeFixedI64(buf, &pOffset->offset);
return buf;
}
\ No newline at end of file
......@@ -157,8 +157,8 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
char *msgStr = pMsg->rpcMsg.pCont;
SMqCMCommitOffsetReq commitOffsetReq;
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msgStr, pMsg->rpcMsg.contLen, TD_DECODER);
SDecoder decoder;
tDecoderInit(&decoder, msgStr, pMsg->rpcMsg.contLen);
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
......
......@@ -83,12 +83,12 @@ END:
}
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
tEncodeSStreamTask(&encoder, pTask);
int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
tCoderClear(&encoder);
tEncoderClear(&encoder);
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -96,9 +96,9 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet
}
((SMsgHead*)buf)->vgId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
tEncoderInit(&encoder, abuf, size);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
tEncoderClear(&encoder);
STransAction action = {0};
memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
......
......@@ -28,8 +28,8 @@
#include "mndVgroup.h"
#include "tname.h"
#define TSDB_STB_VER_NUMBER 1
#define TSDB_STB_RESERVE_SIZE 64
#define STB_VER_NUMBER 1
#define STB_RESERVE_SIZE 64
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
......@@ -46,13 +46,15 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
int32_t mndInitStb(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_STB,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndStbActionEncode,
.decodeFp = (SdbDecodeFp)mndStbActionDecode,
.insertFp = (SdbInsertFp)mndStbActionInsert,
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
SSdbTable table = {
.sdbType = SDB_STB,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndStbActionEncode,
.decodeFp = (SdbDecodeFp)mndStbActionDecode,
.insertFp = (SdbInsertFp)mndStbActionInsert,
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
.deleteFp = (SdbDeleteFp)mndStbActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
......@@ -74,8 +76,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + +pStb->commentLen +
pStb->ast1Len + pStb->ast2Len + TSDB_STB_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, TSDB_STB_VER_NUMBER, size);
pStb->ast1Len + pStb->ast2Len + STB_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STB, STB_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -99,6 +101,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
......@@ -107,6 +110,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pTags[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, _OVER)
SDB_SET_INT8(pRaw, dataPos, pSchema->flags, _OVER)
SDB_SET_INT16(pRaw, dataPos, pSchema->colId, _OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
......@@ -121,7 +125,7 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
if (pStb->ast2Len > 0) {
SDB_SET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
terrno = 0;
......@@ -143,7 +147,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_STB_VER_NUMBER) {
if (sver != STB_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
......@@ -183,6 +187,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
......@@ -191,6 +196,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pTags[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pSchema->flags, _OVER)
SDB_GET_INT16(pRaw, dataPos, &pSchema->colId, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, _OVER)
......@@ -211,7 +217,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
if (pStb->pAst2 == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, pStb->pAst2, pStb->ast2Len, _OVER)
}
SDB_GET_RESERVE(pRaw, dataPos, TSDB_STB_RESERVE_SIZE, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, STB_RESERVE_SIZE, _OVER)
terrno = 0;
......@@ -363,7 +369,7 @@ static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSch
}
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SCoder coder = {0};
SEncoder encoder = {0};
int32_t contLen;
SName name = {0};
SVCreateStbReq req = {0};
......@@ -416,11 +422,11 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
pHead->vgId = htonl(pVgroup->vgId);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER);
if (tEncodeSVCreateStbReq(&coder, &req) < 0) {
tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
if (tEncodeSVCreateStbReq(&encoder, &req) < 0) {
return NULL;
}
tCoderClear(&coder);
tEncoderClear(&encoder);
*pContLen = contLen;
taosMemoryFreeClear(req.pRSmaParam.qmsg1);
......@@ -434,7 +440,7 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
int32_t contLen = 0;
int32_t ret = 0;
SMsgHead *pHead = NULL;
SCoder coder = {0};
SEncoder encoder = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
......@@ -456,9 +462,9 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBuf, contLen - sizeof(SMsgHead), TD_ENCODER);
tEncodeSVDropStbReq(&coder, &req);
tCoderClear(&coder);
tEncoderInit(&encoder, pBuf, contLen - sizeof(SMsgHead));
tEncodeSVDropStbReq(&encoder, &req);
tEncoderClear(&encoder);
*pContLen = contLen;
return pHead;
......@@ -488,7 +494,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
SField *pField1 = taosArrayGet(pCreate->pColumns, i);
if (pField->type < 0) {
if (pField1->type < 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
......@@ -574,6 +580,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_CREATE_STB;
action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter);
......@@ -613,6 +620,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_DROP_STB;
action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter);
......@@ -733,6 +741,7 @@ _OVER:
mndTransDrop(pTrans);
return code;
}
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbInfo(pTrans, pDb);
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
......@@ -792,7 +801,10 @@ static int32_t mndProcessMCreateStbReq(SNodeMsg *pReq) {
}
int32_t numOfStbs = -1;
mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs);
if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
goto _OVER;
}
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
goto _OVER;
......@@ -819,7 +831,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) {
return 0;
}
static int32_t mndCheckAlterStbReq(SMAltertbReq *pAlter) {
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
......@@ -1170,7 +1182,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
SStbObj stbObj = {0};
taosRLockLatch(&pOld->lock);
memcpy(&stbObj, pOld, sizeof(SStbObj));
......@@ -1234,12 +1246,12 @@ _OVER:
}
static int32_t mndProcessMAlterStbReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode;
int32_t code = -1;
SDbObj *pDb = NULL;
SStbObj *pStb = NULL;
SUserObj *pUser = NULL;
SMAltertbReq alterReq = {0};
SMnode *pMnode = pReq->pNode;
int32_t code = -1;
SDbObj *pDb = NULL;
SStbObj *pStb = NULL;
SUserObj *pUser = NULL;
SMAlterStbReq alterReq = {0};
if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......
......@@ -70,14 +70,14 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder);
tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER;
}
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
tEncoderClear(&encoder);
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
......@@ -86,12 +86,12 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto STREAM_ENCODE_OVER;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
tEncoderInit(&encoder, buf, tlen);
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder);
tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER;
}
tCoderClear(&encoder);
tEncoderClear(&encoder);
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
......@@ -138,8 +138,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
if (buf == NULL) goto STREAM_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, tlen + 1, TD_DECODER);
SDecoder decoder;
tDecoderInit(&decoder, buf, tlen + 1);
if (tDecodeSStreamObj(&decoder, pStream) < 0) {
goto STREAM_DECODE_OVER;
}
......@@ -345,7 +345,10 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
}
int32_t numOfStbs = -1;
mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs);
if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) {
goto _OVER;
}
if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) {
terrno = TSDB_CODE_MND_SINGLE_STB_MODE_DB;
goto _OVER;
......
......@@ -412,7 +412,7 @@ static int32_t mndProcessAlterUserReq(SNodeMsg *pReq) {
goto _OVER;
}
if (alterReq.pass[0] == 0) {
if (TSDB_ALTER_USER_PASSWD == alterReq.alterType && alterReq.pass[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_PASS_FORMAT;
goto _OVER;
}
......
......@@ -99,10 +99,10 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
ASSERT(0);
return;
}
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER);
SDecoder decoder;
tDecoderInit(&decoder, msg, pMsg->contLen - sizeof(SMsgHead));
tDecodeSStreamTask(&decoder, pTask);
tCoderClear(&decoder);
tDecoderClear(&decoder);
sndMetaDeployTask(pSnode->pMeta, pTask);
} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {
......
此差异已折叠。
......@@ -39,8 +39,8 @@ typedef struct SMSmaCursor SMSmaCursor;
// metaOpen ==================
// metaEntry ==================
int metaEncodeEntry(SCoder* pCoder, const SMetaEntry* pME);
int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME);
int metaEncodeEntry(SEncoder* pCoder, const SMetaEntry* pME);
int metaDecodeEntry(SDecoder* pCoder, SMetaEntry* pME);
// metaTable ==================
......
......@@ -15,7 +15,7 @@
#include "meta.h"
int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pME->version) < 0) return -1;
......@@ -43,8 +43,8 @@ int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
return 0;
}
int metaDecodeEntry(SCoder *pCoder, SMetaEntry *pME) {
uint64_t len;
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
uint32_t len;
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pME->version) < 0) return -1;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册