提交 8e1a9220 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact

......@@ -35,7 +35,7 @@ endif(${BUILD_TEST})
add_subdirectory(source)
add_subdirectory(tools)
add_subdirectory(tests)
add_subdirectory(example)
add_subdirectory(examples/c)
# docs
add_subdirectory(docs)
......
此差异已折叠。
......@@ -93,10 +93,13 @@ title: TDengine 参数限制与保留关键字
`TBNAME` 可以视为超级表中一个特殊的标签,代表子表的表名。
获取一个超级表所有的子表名及相关的标签信息:
```mysql
SELECT TBNAME, location FROM meters;
```
统计超级表下辖子表数量:
```mysql
SELECT COUNT(TBNAME) FROM meters;
```
......
此差异已折叠。
......@@ -56,6 +56,7 @@ There are about 200 keywords reserved by TDengine, they can't be used as the nam
Get the table name and tag values of all subtables in a STable.
```mysql
SELECT TBNAME, location FROM meters;
```
Count the number of subtables in a STable.
```mysql
......
......@@ -165,7 +165,6 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -191,20 +190,18 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
return;
}
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
if (cnt >= 2) break;
/*printf("get data\n");*/
/*msg_process(tmqmessage);*/
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
}
}
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err = tmq_consumer_close(tmq);
if (err)
......@@ -253,39 +250,6 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
fprintf(stderr, "%% Consumer closed\n");
}
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
printf("subscribe err\n");
return;
}
int32_t batchCnt = 0;
int32_t skipLogNum = 0;
clock_t startTime = clock();
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
batchCnt++;
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
/*msg_process(tmqmessage);*/
taos_free_result(tmqmessage);
} else {
break;
}
}
clock_t endTime = clock();
printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum,
(double)(endTime - startTime) / CLOCKS_PER_SEC);
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main(int argc, char* argv[]) {
if (argc > 1) {
printf("env init\n");
......@@ -296,7 +260,6 @@ int main(int argc, char* argv[]) {
}
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
/*perf_loop(tmq, topic_list);*/
/*basic_consume_loop(tmq, topic_list);*/
sync_consume_loop(tmq, topic_list);
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
}
......@@ -3,20 +3,20 @@ PROJECT(TDengine)
IF (TD_LINUX)
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(demo apitest.c)
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
ADD_EXECUTABLE(sml schemaless.c)
TARGET_LINK_LIBRARIES(sml taos_static trpc tutil pthread )
ADD_EXECUTABLE(subscribe subscribe.c)
TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread )
ADD_EXECUTABLE(epoll epoll.c)
TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
# ADD_EXECUTABLE(demo apitest.c)
#TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
#ADD_EXECUTABLE(sml schemaless.c)
#TARGET_LINK_LIBRARIES(sml taos_static trpc tutil pthread )
#ADD_EXECUTABLE(subscribe subscribe.c)
#TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread )
#ADD_EXECUTABLE(epoll epoll.c)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
ENDIF ()
IF (TD_DARWIN)
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(demo demo.c)
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread lua)
ADD_EXECUTABLE(epoll epoll.c)
TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
#ADD_EXECUTABLE(demo demo.c)
#TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread lua)
#ADD_EXECUTABLE(epoll epoll.c)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
ENDIF ()
......@@ -85,6 +85,14 @@ typedef struct taosField {
int32_t bytes;
} TAOS_FIELD;
typedef struct TAOS_FIELD_E {
char name[65];
int8_t type;
uint8_t precision;
uint8_t scale;
int32_t bytes;
} TAOS_FIELD_E;
#ifdef WINDOWS
#define DLL_EXPORT __declspec(dllexport)
#else
......@@ -134,7 +142,10 @@ DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
......@@ -230,7 +241,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
......
......@@ -62,14 +62,11 @@ int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, u
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
#endif
// STagVal
static FORCE_INLINE void tTagValPush(SArray *pTagArray, void *key, int8_t type, uint8_t *pData, uint32_t nData,
bool isJson);
// STag
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
void tTagFree(STag *pTag);
bool tTagGet(const STag *pTag, STagVal *pTagVal);
char* tTagValToData(const STagVal *pTagVal, bool isJson);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
......@@ -148,6 +145,7 @@ struct SColVal {
SValue value;
};
#pragma pack(push, 1)
struct STagVal {
union {
int16_t cid;
......@@ -155,16 +153,7 @@ struct STagVal {
};
int8_t type;
union {
int8_t i8;
uint8_t u8;
int16_t i16;
uint16_t u16;
int32_t i32;
uint32_t u32;
int64_t i64;
uint64_t u64;
float f;
double d;
struct {
uint32_t nData;
uint8_t *pData;
......@@ -172,24 +161,8 @@ struct STagVal {
};
};
static FORCE_INLINE void tTagValPush(SArray *pTagArray, void *key, int8_t type, uint8_t *pData, uint32_t nData,
bool isJson) {
STagVal tagVal = {0};
if (isJson) {
tagVal.pKey = (char *)key;
} else {
tagVal.cid = *(int16_t *)key;
}
tagVal.type = type;
tagVal.pData = pData;
tagVal.nData = nData;
taosArrayPush(pTagArray, &tagVal);
}
#pragma pack(push, 1)
#define TD_TAG_JSON ((int8_t)0x1)
#define TD_TAG_LARGE ((int8_t)0x2)
#define TD_TAG_JSON ((int8_t)0x40) // distinguish JSON string and JSON value with the highest bit
#define TD_TAG_LARGE ((int8_t)0x20)
struct STag {
int8_t flags;
int16_t len;
......@@ -449,14 +422,3 @@ int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToM
#endif /*_TD_COMMON_DATA_FORMAT_H_*/
// SKVRowBuilder;
// int32_t tdInitKVRowBuilder(SKVRowBuilder *pBuilder);
// void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
// void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
// SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
// static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t colId, const void *value, int32_t tlen)
// #ifdef JSON_TAG_REFACTOR
// TODO: JSON_TAG_TODO
......@@ -287,7 +287,7 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN];
} SSchema;
#define COL_IS_SET(FLG) ((FLG) & (COL_SET_VAL | COL_SET_NULL) != 0)
#define COL_IS_SET(FLG) (((FLG) & (COL_SET_VAL | COL_SET_NULL)) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
......@@ -791,19 +791,24 @@ typedef struct {
int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
SEpSet epSet;
} SQueryNodeAddr;
typedef struct {
SArray* addrsList; // SArray<SQueryNodeAddr>
SQueryNodeAddr addr;
uint64_t load;
} SQueryNodeLoad;
typedef struct {
SArray* qnodeList; // SArray<SQueryNodeLoad>
} SQnodeListRsp;
int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
void tFreeSQnodeListRsp(SQnodeListRsp* pRsp);
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
SEpSet epSet;
} SQueryNodeAddr;
typedef struct {
SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp;
......@@ -926,6 +931,21 @@ typedef struct {
int32_t syncState;
} SMnodeLoad;
typedef struct {
int32_t dnodeId;
int64_t numOfProcessedQuery;
int64_t numOfProcessedCQuery;
int64_t numOfProcessedFetch;
int64_t numOfProcessedDrop;
int64_t numOfProcessedHb;
int64_t cacheDataSize;
int64_t numOfQueryInQueue;
int64_t numOfFetchInQueue;
int64_t timeInQueryQueue;
int64_t timeInFetchQueue;
} SQnodeLoad;
typedef struct {
int32_t sver; // software version
int64_t dnodeVer; // dnode table version in sdb
......@@ -937,6 +957,7 @@ typedef struct {
int32_t numOfSupportVnodes;
char dnodeEp[TSDB_EP_LEN];
SMnodeLoad mload;
SQnodeLoad qload;
SClusterCfg clusterCfg;
SArray* pVloads; // array of SVnodeLoad
} SStatusReq;
......@@ -1012,6 +1033,7 @@ typedef struct {
// for tsma
int8_t isTsma;
void* pTsma;
} SCreateVnodeReq;
......@@ -1955,6 +1977,7 @@ typedef struct {
int8_t killConnection;
int8_t align[3];
SEpSet epSet;
SArray *pQnodeList;
} SQueryHbRspBasic;
typedef struct {
......@@ -2034,7 +2057,10 @@ static FORCE_INLINE void tFreeClientKv(void* pKv) {
static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) {
SClientHbRsp* rsp = (SClientHbRsp*)pRsp;
taosMemoryFreeClear(rsp->query);
if (rsp->query) {
taosArrayDestroy(rsp->query->pQnodeList);
taosMemoryFreeClear(rsp->query);
}
if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv);
}
......@@ -2279,6 +2305,7 @@ typedef struct {
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
int8_t timezoneInt; // sma data expired if timezone changes.
int32_t dstVgId;
char indexName[TSDB_INDEX_NAME_LEN];
int32_t exprLen;
int32_t tagsFilterLen;
......@@ -2421,7 +2448,7 @@ typedef struct {
int32_t epoch;
uint64_t reqId;
int64_t consumerId;
int64_t waitTime;
int64_t timeout;
int64_t currentOffset;
} SMqPollReq;
......
......@@ -144,7 +144,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_CGROUP, "mnode-drop-cgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "mnode-mq-ask-ep", SMqAskEpReq, SMqAskEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, NULL)
......@@ -255,6 +254,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MON_BM_INFO, "monitor-binfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
......
......@@ -25,20 +25,6 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SQnode SQnode;
typedef struct {
int64_t numOfProcessedQuery;
int64_t numOfProcessedCQuery;
int64_t numOfProcessedFetch;
int64_t numOfProcessedDrop;
int64_t memSizeInCache;
int64_t dataSizeSend;
int64_t dataSizeRecv;
int64_t numOfQueryInQueue;
int64_t numOfFetchInQueue;
int64_t waitTimeInQueryQUeue;
int64_t waitTimeInFetchQUeue;
} SQnodeLoad;
typedef struct {
SMsgCb msgCb;
} SQnodeOpt;
......
......@@ -32,6 +32,10 @@ extern "C" {
struct SDataSink;
struct SSDataBlock;
typedef struct SDataSinkStat {
uint64_t cachedSize;
} SDataSinkStat;
typedef struct SDataSinkMgtCfg {
uint32_t maxDataBlockNum; // todo: this should be numOfRows?
uint32_t maxDataBlockNumPerQuery;
......@@ -62,6 +66,8 @@ typedef struct SOutputData {
*/
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat);
/**
* Put the result set returned by the executor into datasinker.
* @param handle
......@@ -88,6 +94,8 @@ void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd);
*/
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput);
int32_t dsGetCacheSize(DataSinkHandle handle, uint64_t *pSize);
/**
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
* @param ahandle
......
......@@ -194,6 +194,7 @@ void indexInit();
/* index filter */
typedef struct SIndexMetaArg {
void* metaHandle;
void* metaEx;
uint64_t suid;
} SIndexMetaArg;
......
......@@ -171,6 +171,7 @@ void tFreeSMonVmInfo(SMonVmInfo *pInfo);
typedef struct {
SMonSysInfo sys;
SMonLogs log;
SQnodeLoad load;
} SMonQmInfo;
int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo);
......@@ -210,6 +211,10 @@ typedef struct {
int32_t tSerializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo);
int32_t tDeserializeSMonMloadInfo(void *buf, int32_t bufLen, SMonMloadInfo *pInfo);
int32_t tSerializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo);
int32_t tDeserializeSQnodeLoad(void *buf, int32_t bufLen, SQnodeLoad *pInfo);
typedef struct {
const char *server;
uint16_t port;
......
......@@ -77,8 +77,8 @@ int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery);
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
int32_t rowNum);
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD** fields);
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD** fields);
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields);
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields);
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind,
char* msgBuf, int32_t msgBufLen);
void destroyBoundColumnInfo(void* pBoundInfo);
......
......@@ -22,7 +22,7 @@ extern "C" {
#include "tmsgcb.h"
#include "trpc.h"
#include "executor.h"
enum {
NODE_TYPE_VNODE = 1,
......@@ -40,13 +40,19 @@ typedef struct SQWorkerCfg {
} SQWorkerCfg;
typedef struct {
uint64_t numOfStartTask;
uint64_t numOfStopTask;
uint64_t numOfRecvedFetch;
uint64_t numOfSentHb;
uint64_t numOfSentFetch;
uint64_t numOfTaskInQueue;
uint64_t cacheDataSize;
uint64_t queryProcessed;
uint64_t cqueryProcessed;
uint64_t fetchProcessed;
uint64_t dropProcessed;
uint64_t hbProcessed;
uint64_t numOfQueryInQueue;
uint64_t numOfFetchInQueue;
uint64_t timeInQueryQueue;
uint64_t timeInFetchQueue;
uint64_t numOfErrors;
} SQWorkerStat;
......@@ -68,7 +74,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
void qWorkerDestroy(void **qWorkerMgmt);
int64_t qWorkerGetWaitTimeInQueue(void *qWorkerMgmt, EQueueType type);
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
#ifdef __cplusplus
}
......
......@@ -33,8 +33,19 @@ extern "C" {
#ifdef WINDOWS
#define TD_TMP_DIR_PATH "C:\\Windows\\Temp\\"
#define TD_CFG_DIR_PATH "C:\\TDengine\\cfg\\"
#define TD_DATA_DIR_PATH "C:\\TDengine\\data\\"
#define TD_LOG_DIR_PATH "C:\\TDengine\\log\\"
#elif defined(_TD_DARWIN_64)
#define TD_TMP_DIR_PATH "/tmp/taosd/"
#define TD_CFG_DIR_PATH "/usr/local/etc/taos/"
#define TD_DATA_DIR_PATH "/usr/local/var/lib/taos/"
#define TD_LOG_DIR_PATH "/usr/local/var/log/taos/"
#else
#define TD_TMP_DIR_PATH "/tmp/"
#define TD_CFG_DIR_PATH "/etc/taos/"
#define TD_DATA_DIR_PATH "/var/lib/taos/"
#define TD_LOG_DIR_PATH "/var/log/taos/"
#endif
typedef struct TdDir *TdDirPtr;
......
......@@ -70,6 +70,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028)
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
......@@ -182,7 +183,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_BNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0356)
#define TSDB_CODE_MND_BNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0357)
#define TSDB_CODE_MND_TOO_FEW_MNODES TAOS_DEF_ERROR_CODE(0, 0x0358)
#define TSDB_CODE_MND_MNODE_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_TOO_MANY_MNODES TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_CANT_DROP_MASTER TAOS_DEF_ERROR_CODE(0, 0x035A)
// mnode-acct
......
......@@ -253,8 +253,7 @@ typedef enum ELogicConditionType {
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 64
#define TSDB_TRANS_DESC_LEN 128
#define TSDB_TRANS_ERROR_LEN 512
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
......@@ -343,7 +342,7 @@ typedef enum ELogicConditionType {
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
#define TSDB_MIN_ROLLUP_FILE_FACTOR 0
#define TSDB_MAX_ROLLUP_FILE_FACTOR 1
#define TSDB_MAX_ROLLUP_FILE_FACTOR 10
#define TSDB_DEFAULT_ROLLUP_FILE_FACTOR 0.1
#define TSDB_MIN_TABLE_TTL 0
#define TSDB_DEFAULT_TABLE_TTL 0
......
......@@ -378,14 +378,16 @@ static FORCE_INLINE int32_t tDecodeDouble(SDecoder* pCoder, double* val) {
}
static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len) {
if (tDecodeU32v(pCoder, len) < 0) return -1;
uint32_t length = 0;
if (tDecodeU32v(pCoder, &length) < 0) return -1;
if (len) *len = length;
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, *len)) return -1;
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1;
if (val) {
*val = (uint8_t*)TD_CODER_CURRENT(pCoder);
}
TD_CODER_MOVE_POS(pCoder, *len);
TD_CODER_MOVE_POS(pCoder, length);
return 0;
}
......@@ -410,14 +412,16 @@ static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) {
}
static FORCE_INLINE int32_t tDecodeBinaryAlloc(SDecoder* pCoder, void** val, uint64_t* len) {
if (tDecodeU64v(pCoder, len) < 0) return -1;
uint64_t length = 0;
if (tDecodeU64v(pCoder, &length) < 0) return -1;
if (len) *len = length;
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, *len)) return -1;
*val = taosMemoryMalloc(*len);
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1;
*val = taosMemoryMalloc(length);
if (*val == NULL) return -1;
memcpy(*val, TD_CODER_CURRENT(pCoder), *len);
memcpy(*val, TD_CODER_CURRENT(pCoder), length);
TD_CODER_MOVE_POS(pCoder, *len);
TD_CODER_MOVE_POS(pCoder, length);
return 0;
}
......@@ -457,151 +461,64 @@ static FORCE_INLINE void* tDecoderMalloc(SDecoder* pCoder, int32_t size) {
}
// ===========================================
#define tPutV(p, v) \
do { \
int32_t n = 0; \
for (;;) { \
if (v <= 0x7f) { \
if (p) p[n] = v; \
n++; \
break; \
} \
if (p) p[n] = (v & 0x7f) | 0x80; \
n++; \
v >>= 7; \
} \
return n; \
} while (0)
#define tPutV(p, v) \
int32_t n = 0; \
for (;;) { \
if (v <= 0x7f) { \
if (p) p[n] = v; \
n++; \
break; \
} \
if (p) p[n] = (v & 0x7f) | 0x80; \
n++; \
v >>= 7; \
} \
return n;
#define tGetV(p, v) \
do { \
int32_t n = 0; \
if (v) *v = 0; \
for (;;) { \
if (p[n] <= 0x7f) { \
if (v) (*v) |= (p[n] << (7 * n)); \
n++; \
break; \
} \
if (v) (*v) |= ((p[n] & 0x7f) << (7 * n)); \
n++; \
} \
return n; \
} while (0)
#define tGetV(p, v) \
int32_t n = 0; \
if (v) *v = 0; \
for (;;) { \
if (p[n] <= 0x7f) { \
if (v) (*v) |= (p[n] << (7 * n)); \
n++; \
break; \
} \
if (v) (*v) |= ((p[n] & 0x7f) << (7 * n)); \
n++; \
} \
return n;
// PUT
static FORCE_INLINE int32_t tPutU8(uint8_t* p, uint8_t v) {
if (p) ((uint8_t*)p)[0] = v;
return sizeof(uint8_t);
}
static FORCE_INLINE int32_t tPutI8(uint8_t* p, int8_t v) {
if (p) ((int8_t*)p)[0] = v;
return sizeof(int8_t);
}
static FORCE_INLINE int32_t tPutU16(uint8_t* p, uint16_t v) {
if (p) ((uint16_t*)p)[0] = v;
return sizeof(uint16_t);
}
static FORCE_INLINE int32_t tPutI16(uint8_t* p, int16_t v) {
if (p) ((int16_t*)p)[0] = v;
return sizeof(int16_t);
}
static FORCE_INLINE int32_t tPutU32(uint8_t* p, uint32_t v) {
if (p) ((uint32_t*)p)[0] = v;
return sizeof(uint32_t);
}
static FORCE_INLINE int32_t tPutI32(uint8_t* p, int32_t v) {
if (p) ((int32_t*)p)[0] = v;
return sizeof(int32_t);
}
static FORCE_INLINE int32_t tPutU64(uint8_t* p, uint64_t v) {
if (p) ((uint64_t*)p)[0] = v;
return sizeof(uint64_t);
}
static FORCE_INLINE int32_t tPutI64(uint8_t* p, int64_t v) {
if (p) ((int64_t*)p)[0] = v;
return sizeof(int64_t);
}
static FORCE_INLINE int32_t tPutFloat(uint8_t* p, float f) {
union {
uint32_t ui;
float f;
} v = {.f = f};
return tPutU32(p, v.ui);
}
static FORCE_INLINE int32_t tPutDouble(uint8_t* p, double d) {
union {
uint64_t ui;
double d;
} v = {.d = d};
return tPutU64(p, v.ui);
}
static FORCE_INLINE int32_t tPutU16v(uint8_t* p, uint16_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutU16v(uint8_t* p, uint16_t v) { tPutV(p, v) }
static FORCE_INLINE int32_t tPutI16v(uint8_t* p, int16_t v) { return tPutU16v(p, ZIGZAGE(int16_t, v)); }
static FORCE_INLINE int32_t tPutU32v(uint8_t* p, uint32_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutU32v(uint8_t* p, uint32_t v) { tPutV(p, v) }
static FORCE_INLINE int32_t tPutI32v(uint8_t* p, int32_t v) { return tPutU32v(p, ZIGZAGE(int32_t, v)); }
static FORCE_INLINE int32_t tPutU64v(uint8_t* p, uint64_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI64v(uint8_t* p, int64_t v) { return tPutU64v(p, ZIGZAGE(int64_t, v)); }
// GET
static FORCE_INLINE int32_t tGetU8(uint8_t* p, uint8_t* v) {
if (v) *v = ((uint8_t*)p)[0];
return sizeof(uint8_t);
}
static FORCE_INLINE int32_t tGetI8(uint8_t* p, int8_t* v) {
if (v) *v = ((int8_t*)p)[0];
return sizeof(int8_t);
}
static FORCE_INLINE int32_t tGetU16(uint8_t* p, uint16_t* v) {
if (v) *v = ((uint16_t*)p)[0];
return sizeof(uint16_t);
}
static FORCE_INLINE int32_t tGetI16(uint8_t* p, int16_t* v) {
if (v) *v = ((int16_t*)p)[0];
return sizeof(int16_t);
}
static FORCE_INLINE int32_t tGetU32(uint8_t* p, uint32_t* v) {
if (v) *v = ((uint32_t*)p)[0];
return sizeof(uint32_t);
}
static FORCE_INLINE int32_t tGetI32(uint8_t* p, int32_t* v) {
if (v) *v = ((int32_t*)p)[0];
return sizeof(int32_t);
}
static FORCE_INLINE int32_t tGetU64(uint8_t* p, uint64_t* v) {
if (v) *v = ((uint64_t*)p)[0];
return sizeof(uint64_t);
}
static FORCE_INLINE int32_t tGetI64(uint8_t* p, int64_t* v) {
if (v) *v = ((int64_t*)p)[0];
return sizeof(int64_t);
}
static FORCE_INLINE int32_t tGetU16v(uint8_t* p, uint16_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetU16v(uint8_t* p, uint16_t* v) { tGetV(p, v) }
static FORCE_INLINE int32_t tGetI16v(uint8_t* p, int16_t* v) {
int32_t n;
......@@ -613,7 +530,7 @@ static FORCE_INLINE int32_t tGetI16v(uint8_t* p, int16_t* v) {
return n;
}
static FORCE_INLINE int32_t tGetU32v(uint8_t* p, uint32_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetU32v(uint8_t* p, uint32_t* v) { tGetV(p, v) }
static FORCE_INLINE int32_t tGetI32v(uint8_t* p, int32_t* v) {
int32_t n;
......@@ -625,46 +542,6 @@ static FORCE_INLINE int32_t tGetI32v(uint8_t* p, int32_t* v) {
return n;
}
static FORCE_INLINE int32_t tGetU64v(uint8_t* p, uint64_t* v) { tGetV(p, v); }
static FORCE_INLINE int32_t tGetI64v(uint8_t* p, int64_t* v) {
int32_t n;
uint64_t tv;
n = tGetU64v(p, &tv);
if (v) *v = ZIGZAGD(int64_t, tv);
return n;
}
static FORCE_INLINE int32_t tGetFloat(uint8_t* p, float* f) {
int32_t n = 0;
union {
uint32_t ui;
float f;
} v;
n = tGetU32(p, &v.ui);
*f = v.f;
return n;
}
static FORCE_INLINE int32_t tGetDouble(uint8_t* p, double* d) {
int32_t n = 0;
union {
uint64_t ui;
double d;
} v;
n = tGetU64(p, &v.ui);
*d = v.d;
return n;
}
// =====================
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, uint8_t* pData, uint32_t nData) {
int n = 0;
......
......@@ -119,6 +119,8 @@ typedef struct SHeartBeatInfo {
struct SAppInstInfo {
int64_t numOfConns;
SCorEpSet mgmtEp;
TdThreadMutex qnodeMutex;
SArray* pQnodeList;
SInstanceSummary summary;
SList* pConnList; // STscObj linked list
uint64_t clusterId;
......@@ -290,7 +292,7 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
......@@ -317,6 +319,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList);
#ifdef __cplusplus
}
......
......@@ -116,8 +116,11 @@ int stmtAffectedRowsOnce(TAOS_STMT *stmt);
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName);
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields);
int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields);
int stmtIsInsert(TAOS_STMT *stmt, int *insert);
int stmtGetParamNum(TAOS_STMT *stmt, int *nums);
int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes);
int stmtAddBatch(TAOS_STMT *stmt);
TAOS_RES *stmtUseResult(TAOS_STMT *stmt);
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx);
......
......@@ -160,6 +160,10 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
taos_close(pTscObj);
}
if (pRsp->query->pQnodeList) {
updateQnodeList(pTscObj->pAppInfo, pRsp->query->pQnodeList);
}
releaseTscObj(pRsp->connKey.tscRid);
}
}
......
......@@ -117,7 +117,8 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
SAppInstInfo* p = NULL;
if (pInst == NULL) {
p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet;
p->mgmtEp = epSet;
taosThreadMutexInit(&p->qnodeMutex, NULL);
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p, key);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
......@@ -228,7 +229,61 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
return TSDB_CODE_SUCCESS;
}
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
SQueryNodeLoad *node1 = (SQueryNodeLoad *)elem1;
SQueryNodeLoad *node2 = (SQueryNodeLoad *)elem2;
if (node1->load < node2->load) {
return -1;
}
return node1->load > node2->load;
}
int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList) {
taosThreadMutexLock(&pInfo->qnodeMutex);
if (pInfo->pQnodeList) {
taosArrayDestroy(pInfo->pQnodeList);
pInfo->pQnodeList = NULL;
}
if (pNodeList) {
pInfo->pQnodeList = taosArrayDup(pNodeList);
taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
}
taosThreadMutexUnlock(&pInfo->qnodeMutex);
return TSDB_CODE_SUCCESS;
}
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
SAppInstInfo*pInfo = pRequest->pTscObj->pAppInfo;
int32_t code = 0;
taosThreadMutexLock(&pInfo->qnodeMutex);
if (pInfo->pQnodeList) {
*pNodeList = taosArrayDup(pInfo->pQnodeList);
}
taosThreadMutexUnlock(&pInfo->qnodeMutex);
if (NULL == *pNodeList) {
SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) {
*pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, *pNodeList);
}
if (TSDB_CODE_SUCCESS == code && *pNodeList) {
code = updateQnodeList(pInfo, *pNodeList);
}
}
return code;
}
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList) {
pRequest->type = pQuery->msgType;
SPlanContext cxt = {.queryId = pRequest->requestId,
.acctId = pRequest->pTscObj->acctId,
......@@ -237,14 +292,10 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.showRewrite = pQuery->showRewrite,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, pNodeList);
}
int32_t code = getQnodeList(pRequest, pNodeList);
if (TSDB_CODE_SUCCESS == code) {
code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
code = qCreateQueryPlan(&cxt, pPlan, *pNodeList);
}
return code;
}
......@@ -369,8 +420,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
}
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) {
*pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList);
return getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
}
int32_t validateSversion(SRequestObj* pRequest, void* res) {
......@@ -456,8 +506,8 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
code = execDdlQuery(pRequest, pQuery);
break;
case QUERY_EXEC_MODE_SCHEDULE: {
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
SArray* pNodeList = NULL;
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, &pNodeList);
if (TSDB_CODE_SUCCESS == code) {
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, &pRes);
if (NULL != pRes) {
......@@ -992,30 +1042,11 @@ static char* parseTagDatatoJson(void* p) {
char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) {
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
// JSON_TAG_REFACTOR
pTagVal->nData;
pTagVal->pData; // for varChar, len not included
// TODO: adapt below code;
char* val = (char*)pTagVal->pData;
// TODO: adapt below code;
if (j == 0) {
if (*val == TSDB_DATA_TYPE_NULL) {
string = taosMemoryCalloc(1, 8);
sprintf(string, "%s", TSDB_DATA_NULL_STR_L);
goto end;
}
continue;
}
// json key encode by binary
memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, varDataVal(val), varDataLen(val));
memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey));
// json value
val += varDataTLen(val);
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *val;
char type = pTagVal->type;
if (type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull();
if (value == NULL) {
......@@ -1024,11 +1055,11 @@ static char* parseTagDatatoJson(void* p) {
cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL;
if (varDataLen(realData) > 0) {
char* tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(realData), varDataLen(realData), tagJsonValue);
if (pTagVal->nData > 0) {
char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val);
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, pTagVal->pData);
taosMemoryFree(tagJsonValue);
goto end;
}
......@@ -1037,7 +1068,7 @@ static char* parseTagDatatoJson(void* p) {
if (value == NULL) {
goto end;
}
} else if (varDataLen(realData) == 0) {
} else if (pTagVal->nData == 0) {
value = cJSON_CreateString("");
} else {
ASSERT(0);
......@@ -1045,22 +1076,14 @@ static char* parseTagDatatoJson(void* p) {
cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData);
double jsonVd = *(double*)(&pTagVal->i64);
cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL) {
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData);
char jsonVd = *(char*)(&pTagVal->i64);
cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL) {
goto end;
......@@ -1125,7 +1148,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
} else if (jsonInnerType == TD_TAG_JSON) {
char* jsonString = parseTagDatatoJson(jsonInnerData);
STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString);
......@@ -1144,10 +1167,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
double jsonVd = *(double*)(jsonInnerData);
sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(jsonInnerData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst)));
......
......@@ -666,8 +666,39 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
return stmtSetTbName(stmt, name);
}
int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
if (stmt == NULL || tags == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtSetTbTags(stmt, tags);
}
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); }
int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields) {
if (stmt == NULL || NULL == fieldNum) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtGetTagFields(stmt, fieldNum, fields);
}
int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields) {
if (stmt == NULL || NULL == fieldNum) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtGetColFields(stmt, fieldNum, fields);
}
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
......@@ -772,6 +803,16 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
return stmtGetParamNum(stmt, nums);
}
int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
if (stmt == NULL || type == NULL || NULL == bytes || idx < 0) {
tscError("invalid parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtGetParam(stmt, idx, type, bytes);
}
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
......
......@@ -17,7 +17,7 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
}
break;
case STMT_SETTAGS:
if (STMT_STATUS_NE(SETTBNAME)) {
if (STMT_STATUS_NE(SETTBNAME) && STMT_STATUS_NE(FETCH_FIELDS)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
break;
......@@ -540,6 +540,8 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
if (pStmt->bInfo.needParse) {
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
STMT_ERR_RET(stmtParseSql(pStmt));
}
return TSDB_CODE_SUCCESS;
......@@ -550,10 +552,6 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
if (pStmt->bInfo.inExecCache) {
return TSDB_CODE_SUCCESS;
}
......@@ -571,7 +569,7 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fields) {
int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query tag fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
......@@ -589,7 +587,7 @@ int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel
return TSDB_CODE_SUCCESS;
}
int32_t stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fields) {
int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query column fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
......@@ -852,6 +850,71 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
return TSDB_CODE_SUCCESS;
}
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
STscStmt* pStmt = (STscStmt*)stmt;
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STMT_ERR_RET(stmtFetchTagFields(stmt, nums, fields));
return TSDB_CODE_SUCCESS;
}
int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
STscStmt* pStmt = (STscStmt*)stmt;
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STMT_ERR_RET(stmtFetchColFields(stmt, nums, fields));
return TSDB_CODE_SUCCESS;
}
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
STscStmt* pStmt = (STscStmt*)stmt;
......@@ -884,6 +947,50 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
return TSDB_CODE_SUCCESS;
}
int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
STscStmt* pStmt = (STscStmt*)stmt;
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
int32_t nums = 0;
TAOS_FIELD_E *pField = NULL;
STMT_ERR_RET(stmtFetchColFields(stmt, &nums, &pField));
if (idx >= nums) {
tscError("idx %d is too big", idx);
taosMemoryFree(pField);
STMT_ERR_RET(TSDB_CODE_INVALID_PARA);
}
*type = pField[idx].type;
*bytes = pField[idx].bytes;
taosMemoryFree(pField);
return TSDB_CODE_SUCCESS;
}
TAOS_RES* stmtUseResult(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
......
......@@ -1243,7 +1243,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
return TMQ_RESP_ERR__FAIL;
}
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int64_t reqOffset;
if (pVg->currentOffset >= 0) {
reqOffset = pVg->currentOffset;
......@@ -1269,7 +1269,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic*
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
pReq->withTbName = tmq->withTbName;
pReq->waitTime = waitTime;
pReq->timeout = timeout;
pReq->consumerId = tmq->consumerId;
pReq->epoch = tmq->epoch;
pReq->currentOffset = reqOffset;
......@@ -1297,7 +1297,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj;
}
int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
/*printf("call poll\n");*/
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
......@@ -1318,7 +1318,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
#endif
}
atomic_store_32(&pVg->vgSkipCnt, 0);
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg);
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
......@@ -1388,7 +1388,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
return 0;
}
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
while (1) {
SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper);
......@@ -1428,17 +1428,17 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
taosFreeQitem(rspWrapper);
if (pollIfReset && reset) {
tscDebug("consumer %ld reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, waitTime);
tmqPollImpl(tmq, timeout);
}
}
}
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs();
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
}
......@@ -1450,16 +1450,16 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
while (1) {
tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, wait_time) < 0) return NULL;
if (tmqPollImpl(tmq, timeout) < 0) return NULL;
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
}
if (wait_time != 0) {
if (timeout != 0) {
int64_t endTime = taosGetTimestampMs();
int64_t leftTime = endTime - startTime;
if (leftTime > wait_time) {
if (leftTime > timeout) {
tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
return NULL;
}
......@@ -1474,10 +1474,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
} else {
if (rsp == TMQ_RESP_ERR__FAIL) {
return TMQ_RESP_ERR__FAIL;
}
......@@ -1485,10 +1482,7 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
} else {
if (rsp == TMQ_RESP_ERR__FAIL) {
return TMQ_RESP_ERR__FAIL;
}
}
......
......@@ -215,7 +215,6 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "type", .bytes = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "last_error", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
......
......@@ -116,22 +116,23 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
int32_t type = pColumnInfoData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = varDataTLen(pData);
int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
dataLen = varDataTLen(pData + CHAR_BYTES) + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = DOUBLE_BYTES;
dataLen = DOUBLE_BYTES + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_JSON) {
dataLen = ((STag*)(pData + CHAR_BYTES))->len;
dataLen = CHAR_BYTES + CHAR_BYTES;
} else if (*pData == TD_TAG_JSON) { // json string
dataLen = ((STag*)(pData))->len;
} else {
ASSERT(0);
}
dataLen += CHAR_BYTES;
}else {
dataLen = varDataTLen(pData);
}
SVarColAttr* pAttr = &pColumnInfoData->varmeta;
......
......@@ -808,7 +808,6 @@ static void setBitMap(uint8_t *p, STSchema *pTSchema, uint8_t flags) {
}
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
int32_t nDataTP, nDataKV;
uint32_t flags;
STSKVRow *pTSKVRow = (STSKVRow *)pBuilder->pKVBuf;
int32_t nCols = pBuilder->pTSchema->numOfCols;
......@@ -822,7 +821,7 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
pBuilder->row.flags |= TSROW_HAS_NONE;
}
ASSERT(pBuilder->row.flags & 0xf != 0);
ASSERT((pBuilder->row.flags & 0xf) != 0);
*(ppRow) = &pBuilder->row;
switch (pBuilder->row.flags & 0xf) {
case TSROW_HAS_NONE:
......@@ -852,7 +851,7 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
if (nDataKV < nDataTP) {
// generate KV row
ASSERT(pBuilder->row.flags & 0xf != TSROW_HAS_VAL);
ASSERT((pBuilder->row.flags & 0xf) != TSROW_HAS_VAL);
pBuilder->row.flags |= TSROW_KV_ROW;
pBuilder->row.nData = nDataKV;
......@@ -868,12 +867,12 @@ int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow) {
pBuilder->row.nData = nDataTP;
uint8_t *p;
uint8_t flags = pBuilder->row.flags & 0xf;
uint8_t flags = (pBuilder->row.flags & 0xf);
if (flags == TSROW_HAS_VAL) {
pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2;
} else {
if (flags == TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE) {
if (flags == (TSROW_HAS_VAL | TSROW_HAS_NULL | TSROW_HAS_NONE)) {
pBuilder->row.pData = pBuilder->pTPBuf;
} else {
pBuilder->row.pData = pBuilder->pTPBuf + pBuilder->szBitMap2 - pBuilder->szBitMap1;
......@@ -945,6 +944,9 @@ static void debugPrintTagVal(int8_t type, const void *val, int32_t vlen, const c
case TSDB_DATA_TYPE_UBIGINT:
printf("%s:%d type:%d vlen:%d, val:%" PRIu64 "\n", tag, ln, (int32_t)type, vlen, *(uint64_t *)val);
break;
case TSDB_DATA_TYPE_NULL:
printf("%s:%d type:%d vlen:%d, val:%" PRIi8 "\n", tag, ln, (int32_t)type, vlen, *(int8_t *)val);
break;
default:
ASSERT(0);
break;
......@@ -997,7 +999,11 @@ void debugPrintSTag(STag *pTag, const char *tag, int32_t ln) {
}
printf("%s:%d loop[%d-%d] offset=%d\n", __func__, __LINE__, (int32_t)pTag->nTag, (int32_t)n, (int32_t)offset);
tGetTagVal(p + offset, &tagVal, isJson);
debugPrintTagVal(tagVal.type, tagVal.pData, tagVal.nData, __func__, __LINE__);
if (IS_VAR_DATA_TYPE(tagVal.type)) {
debugPrintTagVal(tagVal.type, tagVal.pData, tagVal.nData, __func__, __LINE__);
} else {
debugPrintTagVal(tagVal.type, &tagVal.i64, tDataTypes[tagVal.type].bytes, __func__, __LINE__);
}
}
printf("\n");
}
......@@ -1020,44 +1026,8 @@ static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
n += tPutBinary(p ? p + n : p, pTagVal->pData, pTagVal->nData);
} else {
p = p ? p + n : p;
switch (pTagVal->type) {
case TSDB_DATA_TYPE_BOOL:
n += tPutI8(p, pTagVal->i8 ? 1 : 0);
break;
case TSDB_DATA_TYPE_TINYINT:
n += tPutI8(p, pTagVal->i8);
break;
case TSDB_DATA_TYPE_SMALLINT:
n += tPutI16(p, pTagVal->i16);
break;
case TSDB_DATA_TYPE_INT:
n += tPutI32(p, pTagVal->i32);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
n += tPutI64(p, pTagVal->i64);
break;
case TSDB_DATA_TYPE_FLOAT:
n += tPutFloat(p, pTagVal->f);
break;
case TSDB_DATA_TYPE_DOUBLE:
n += tPutDouble(p, pTagVal->d);
break;
case TSDB_DATA_TYPE_UTINYINT:
n += tPutU8(p, pTagVal->u8);
break;
case TSDB_DATA_TYPE_USMALLINT:
n += tPutU16(p, pTagVal->u16);
break;
case TSDB_DATA_TYPE_UINT:
n += tPutU32(p, pTagVal->u32);
break;
case TSDB_DATA_TYPE_UBIGINT:
n += tPutU64(p, pTagVal->u64);
break;
default:
ASSERT(0);
}
n += tDataTypes[pTagVal->type].bytes;
if (p) memcpy(p, &(pTagVal->i64), tDataTypes[pTagVal->type].bytes);
}
return n;
......@@ -1079,42 +1049,8 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData);
} else {
switch (pTagVal->type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
n += tGetI8(p + n, &pTagVal->i8);
break;
case TSDB_DATA_TYPE_SMALLINT:
n += tGetI16(p, &pTagVal->i16);
break;
case TSDB_DATA_TYPE_INT:
n += tGetI32(p, &pTagVal->i32);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
n += tGetI64(p, &pTagVal->i64);
break;
case TSDB_DATA_TYPE_FLOAT:
n += tGetFloat(p, &pTagVal->f);
break;
case TSDB_DATA_TYPE_DOUBLE:
n += tGetDouble(p, &pTagVal->d);
break;
case TSDB_DATA_TYPE_UTINYINT:
n += tGetU8(p, &pTagVal->u8);
break;
case TSDB_DATA_TYPE_USMALLINT:
n += tGetU16(p, &pTagVal->u16);
break;
case TSDB_DATA_TYPE_UINT:
n += tGetU32(p, &pTagVal->u32);
break;
case TSDB_DATA_TYPE_UBIGINT:
n += tGetU64(p, &pTagVal->u64);
break;
default:
ASSERT(0);
}
memcpy(&(pTagVal->i64), p + n, tDataTypes[pTagVal->type].bytes);
n += tDataTypes[pTagVal->type].bytes;
}
return n;
......@@ -1191,6 +1127,26 @@ void tTagFree(STag *pTag) {
if (pTag) taosMemoryFree(pTag);
}
char *tTagValToData(const STagVal *value, bool isJson) {
if (!value) return NULL;
char *data = NULL;
int8_t typeBytes = 0;
if (isJson) {
typeBytes = CHAR_BYTES;
}
if (IS_VAR_DATA_TYPE(value->type)) {
data = taosMemoryCalloc(1, typeBytes + VARSTR_HEADER_SIZE + value->nData);
if (data == NULL) return NULL;
if (isJson) *data = value->type;
varDataLen(data + typeBytes) = value->nData;
memcpy(varDataVal(data + typeBytes), value->pData, value->nData);
} else {
data = ((char *)&(value->i64)) - typeBytes; // json with type
}
return data;
}
bool tTagGet(const STag *pTag, STagVal *pTagVal) {
int16_t lidx = 0;
int16_t ridx = pTag->nTag - 1;
......
......@@ -147,12 +147,25 @@ int32_t tEncodeSQueryNodeAddr(SEncoder *pEncoder, SQueryNodeAddr *pAddr) {
return 0;
}
int32_t tEncodeSQueryNodeLoad(SEncoder *pEncoder, SQueryNodeLoad *pLoad) {
if (tEncodeSQueryNodeAddr(pEncoder, &pLoad->addr) < 0) return -1;
if (tEncodeU64(pEncoder, pLoad->load) < 0) return -1;
return 0;
}
int32_t tDecodeSQueryNodeAddr(SDecoder *pDecoder, SQueryNodeAddr *pAddr) {
if (tDecodeI32(pDecoder, &pAddr->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pAddr->epSet) < 0) return -1;
return 0;
}
int32_t tDecodeSQueryNodeLoad(SDecoder *pDecoder, SQueryNodeLoad *pLoad) {
if (tDecodeSQueryNodeAddr(pDecoder, &pLoad->addr) < 0) return -1;
if (tDecodeU64(pDecoder, &pLoad->load) < 0) return -1;
return 0;
}
int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse);
......@@ -304,6 +317,12 @@ static int32_t tSerializeSClientHbRsp(SEncoder *pEncoder, const SClientHbRsp *pR
if (tEncodeI32(pEncoder, pRsp->query->onlineDnodes) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->query->killConnection) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pRsp->query->epSet) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->query->pQnodeList);
if (tEncodeI32(pEncoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SQueryNodeLoad *pLoad = taosArrayGet(pRsp->query->pQnodeList, i);
if (tEncodeSQueryNodeLoad(pEncoder, pLoad) < 0) return -1;
}
} else {
if (tEncodeI32(pEncoder, queryNum) < 0) return -1;
}
......@@ -333,6 +352,15 @@ static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp)
if (tDecodeI32(pDecoder, &pRsp->query->onlineDnodes) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->query->killConnection) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pRsp->query->epSet) < 0) return -1;
int32_t pQnodeNum = 0;
if (tDecodeI32(pDecoder, &pQnodeNum) < 0) return -1;
if (pQnodeNum > 0) {
pRsp->query->pQnodeList = taosArrayInit(pQnodeNum, sizeof(SQueryNodeLoad));
if (NULL == pRsp->query->pQnodeList) return -1;
SQueryNodeLoad load = {0};
if (tDecodeSQueryNodeLoad(pDecoder, &load) < 0) return -1;
taosArrayPush(pRsp->query->pQnodeList, &load);
}
}
int32_t kvNum = 0;
......@@ -898,6 +926,18 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
// mnode loads
if (tEncodeI32(&encoder, pReq->mload.syncState) < 0) return -1;
if (tEncodeI32(&encoder, pReq->qload.dnodeId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedQuery) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedCQuery) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedFetch) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedDrop) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfProcessedHb) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.cacheDataSize) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfQueryInQueue) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.numOfFetchInQueue) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.timeInQueryQueue) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -955,6 +995,18 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI32(&decoder, &pReq->mload.syncState) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->qload.dnodeId) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedQuery) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedCQuery) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedFetch) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedDrop) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfProcessedHb) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.cacheDataSize) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfQueryInQueue) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.numOfFetchInQueue) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.timeInQueryQueue) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
......@@ -1921,11 +1973,11 @@ int32_t tSerializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp)
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->addrsList);
int32_t num = taosArrayGetSize(pRsp->qnodeList);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
SQueryNodeAddr *addr = taosArrayGet(pRsp->addrsList, i);
if (tEncodeSQueryNodeAddr(&encoder, addr) < 0) return -1;
SQueryNodeLoad *pLoad = taosArrayGet(pRsp->qnodeList, i);
if (tEncodeSQueryNodeLoad(&encoder, pLoad) < 0) return -1;
}
tEndEncode(&encoder);
......@@ -1941,15 +1993,15 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (NULL == pRsp->addrsList) {
pRsp->addrsList = taosArrayInit(num, sizeof(SQueryNodeAddr));
if (NULL == pRsp->addrsList) return -1;
if (NULL == pRsp->qnodeList) {
pRsp->qnodeList = taosArrayInit(num, sizeof(SQueryNodeLoad));
if (NULL == pRsp->qnodeList) return -1;
}
for (int32_t i = 0; i < num; ++i) {
SQueryNodeAddr addr = {0};
if (tDecodeSQueryNodeAddr(&decoder, &addr) < 0) return -1;
taosArrayPush(pRsp->addrsList, &addr);
SQueryNodeLoad load = {0};
if (tDecodeSQueryNodeLoad(&decoder, &load) < 0) return -1;
taosArrayPush(pRsp->qnodeList, &load);
}
tEndDecode(&decoder);
......@@ -1957,7 +2009,7 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
return 0;
}
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->addrsList); }
void tFreeSQnodeListRsp(SQnodeListRsp *pRsp) { taosArrayDestroy(pRsp->qnodeList); }
int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq) {
SEncoder encoder = {0};
......@@ -2921,6 +2973,11 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
}
if (tEncodeI8(&encoder, pReq->isTsma) < 0) return -1;
if (pReq->isTsma) {
uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen));
if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -2984,6 +3041,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
}
if (tDecodeI8(&decoder, &pReq->isTsma) < 0) return -1;
if (pReq->isTsma) {
if (tDecodeBinaryAlloc(&decoder, &pReq->pTsma, NULL) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
......@@ -2993,6 +3053,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) {
taosArrayDestroy(pReq->pRetensions);
pReq->pRetensions = NULL;
if(pReq->isTsma) {
taosMemoryFreeClear(pReq->pTsma);
}
return 0;
}
......@@ -3591,6 +3654,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if (tEncodeI8(pCoder, pSma->intervalUnit) < 0) return -1;
if (tEncodeI8(pCoder, pSma->slidingUnit) < 0) return -1;
if (tEncodeI8(pCoder, pSma->timezoneInt) < 0) return -1;
if (tEncodeI32(pCoder, pSma->dstVgId) < 0) return -1;
if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1;
if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1;
if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1;
......@@ -3613,6 +3677,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
if (tDecodeI8(pCoder, &pSma->version) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->dstVgId) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1;
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
......
......@@ -35,6 +35,7 @@ typedef struct SDnodeMgmt {
SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp;
} SDnodeMgmt;
// dmHandle.c
......@@ -58,4 +59,4 @@ void dmStopWorker(SDnodeMgmt *pMgmt);
}
#endif
#endif /*_TD_DND_QNODE_INT_H_*/
\ No newline at end of file
#endif /*_TD_DND_QNODE_INT_H_*/
......@@ -79,6 +79,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
(*pMgmt->getMnodeLoadsFp)(&minfo);
req.mload = minfo.load;
(*pMgmt->getQnodeLoadsFp)(&req.qload);
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
tSerializeSStatusReq(pHead, contLen, &req);
......
......@@ -48,6 +48,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
pMgmt->getQnodeLoadsFp = pInput->getQnodeLoadsFp;
if (dmStartWorker(pMgmt) != 0) {
return -1;
......
......@@ -197,6 +197,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -20,6 +20,14 @@ void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {
SQnodeLoad qload = {0};
qndGetLoad(pMgmt->pQnode, &qload);
qload.dnodeId = pMgmt->pData->dnodeId;
}
void qmGetQnodeLoads(SQnodeMgmt *pMgmt, SQnodeLoad *pInfo) {
qndGetLoad(pMgmt->pQnode, pInfo);
pInfo->dnodeId = pMgmt->pData->dnodeId;
}
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
......
......@@ -140,6 +140,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->szCache = pCreate->pages;
pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
pCfg->isWeak = true;
pCfg->isTsma = pCreate->isTsma;
pCfg->tsdbCfg.compression = pCreate->compression;
pCfg->tsdbCfg.precision = pCreate->precision;
pCfg->tsdbCfg.days = pCreate->daysPerFile;
......@@ -149,8 +150,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->tsdbCfg.minRows = pCreate->minRows;
pCfg->tsdbCfg.maxRows = pCreate->maxRows;
for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
memcpy(&pCfg->tsdbCfg.retentions[i], taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
if (i == 0) {
if ((pRetention->freq > 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
}
}
pCfg->walCfg.vgId = pCreate->vgId;
pCfg->hashBegin = pCreate->hashBegin;
pCfg->hashEnd = pCreate->hashEnd;
......@@ -209,7 +215,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
code = terrno;
goto _OVER;
}
......@@ -217,9 +223,20 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
if (code != 0) {
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
code = terrno;
goto _OVER;
}
if (createReq.isTsma) {
SMsgHead *smaMsg = createReq.pTsma;
uint32_t contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
if (vnodeProcessCreateTSma(pImpl, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen) < 0) {
dError("vgId:%d, failed to create tsma since %s", createReq.vgId, terrstr());
code = terrno;
goto _OVER;
};
}
code = vnodeStart(pImpl);
if (code != 0) {
dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr());
......@@ -227,7 +244,10 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
code = vmWriteVnodeListToFile(pMgmt);
if (code != 0) goto _OVER;
if (code != 0) {
code = terrno;
goto _OVER;
}
_OVER:
if (code != 0) {
......
......@@ -104,7 +104,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
dTrace("msg:%p, get from vnode-write queue", pMsg);
if (taosArrayPush(pArray, &pMsg) == NULL) {
dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
dTrace("msg:%p, failed to push to array since %s", pMsg, terrstr());
vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY);
}
}
......
......@@ -168,6 +168,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
void dmSendMonitorReport();
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
void dmGetQnodeLoads(SQnodeLoad *pInfo);
#ifdef __cplusplus
}
......
......@@ -37,6 +37,7 @@ void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo);
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo);
#ifdef __cplusplus
}
......
......@@ -178,6 +178,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.sendMonitorReportFp = dmSendMonitorReport,
.getVnodeLoadsFp = dmGetVnodeLoads,
.getMnodeLoadsFp = dmGetMnodeLoads,
.getQnodeLoadsFp = dmGetQnodeLoads,
};
opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
......
......@@ -170,3 +170,17 @@ void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
dmReleaseWrapper(pWrapper);
}
}
void dmGetQnodeLoads(SQnodeLoad *pInfo) {
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE];
if (dmMarkWrapper(pWrapper) == 0) {
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_QM_LOAD, tDeserializeSQnodeLoad, pInfo);
} else if (pWrapper->pMgmt != NULL) {
qmGetQnodeLoads(pWrapper->pMgmt, pInfo);
}
dmReleaseWrapper(pWrapper);
}
}
......@@ -34,6 +34,7 @@
#include "dnode.h"
#include "mnode.h"
#include "qnode.h"
#include "monitor.h"
#include "sync.h"
#include "wal.h"
......@@ -92,6 +93,7 @@ typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef void (*SendMonitorReportFp)();
typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo);
typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
typedef struct {
int32_t dnodeId;
......@@ -118,6 +120,7 @@ typedef struct {
SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp;
} SMgmtInputOpt;
typedef struct {
......@@ -180,4 +183,4 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
}
#endif
#endif /*_TD_DM_INT_H_*/
\ No newline at end of file
#endif /*_TD_DM_INT_H_*/
......@@ -54,9 +54,11 @@ typedef enum {
} EAuthOp;
typedef enum {
TRN_STEP_LOG = 1,
TRN_STEP_ACTION = 2,
} ETrnStep;
TRN_CONFLICT_NOTHING = 0,
TRN_CONFLICT_GLOBAL = 1,
TRN_CONFLICT_DB = 2,
TRN_CONFLICT_DB_INSIDE = 3,
} ETrnConflct;
typedef enum {
TRN_STAGE_PREPARE = 0,
......@@ -68,69 +70,15 @@ typedef enum {
TRN_STAGE_FINISHED = 6
} ETrnStage;
typedef enum {
TRN_TYPE_BASIC_SCOPE = 1000,
TRN_TYPE_CREATE_ACCT = 1001,
TRN_TYPE_CREATE_CLUSTER = 1002,
TRN_TYPE_CREATE_USER = 1003,
TRN_TYPE_ALTER_USER = 1004,
TRN_TYPE_DROP_USER = 1005,
TRN_TYPE_CREATE_FUNC = 1006,
TRN_TYPE_DROP_FUNC = 1007,
TRN_TYPE_CREATE_SNODE = 1010,
TRN_TYPE_DROP_SNODE = 1011,
TRN_TYPE_CREATE_QNODE = 1012,
TRN_TYPE_DROP_QNODE = 10013,
TRN_TYPE_CREATE_BNODE = 1014,
TRN_TYPE_DROP_BNODE = 1015,
TRN_TYPE_CREATE_MNODE = 1016,
TRN_TYPE_DROP_MNODE = 1017,
TRN_TYPE_CREATE_TOPIC = 1020,
TRN_TYPE_DROP_TOPIC = 1021,
TRN_TYPE_SUBSCRIBE = 1022,
TRN_TYPE_REBALANCE = 1023,
TRN_TYPE_COMMIT_OFFSET = 1024,
TRN_TYPE_CREATE_STREAM = 1025,
TRN_TYPE_DROP_STREAM = 1026,
TRN_TYPE_ALTER_STREAM = 1027,
TRN_TYPE_CONSUMER_LOST = 1028,
TRN_TYPE_CONSUMER_RECOVER = 1029,
TRN_TYPE_DROP_CGROUP = 1030,
TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000,
TRN_TYPE_CREATE_DNODE = 2001,
TRN_TYPE_DROP_DNODE = 2002,
TRN_TYPE_GLOBAL_SCOPE_END,
TRN_TYPE_DB_SCOPE = 3000,
TRN_TYPE_CREATE_DB = 3001,
TRN_TYPE_ALTER_DB = 3002,
TRN_TYPE_DROP_DB = 3003,
TRN_TYPE_SPLIT_VGROUP = 3004,
TRN_TYPE_MERGE_VGROUP = 3015,
TRN_TYPE_DB_SCOPE_END,
TRN_TYPE_STB_SCOPE = 4000,
TRN_TYPE_CREATE_STB = 4001,
TRN_TYPE_ALTER_STB = 4002,
TRN_TYPE_DROP_STB = 4003,
TRN_TYPE_CREATE_SMA = 4004,
TRN_TYPE_DROP_SMA = 4005,
TRN_TYPE_STB_SCOPE_END,
} ETrnType;
typedef enum {
TRN_POLICY_ROLLBACK = 0,
TRN_POLICY_RETRY = 1,
} ETrnPolicy;
typedef enum {
TRN_EXEC_PARALLEL = 0,
TRN_EXEC_NO_PARALLEL = 1,
} ETrnExecType;
TRN_EXEC_PRARLLEL = 0,
TRN_EXEC_SERIAL = 1,
} ETrnExec;
typedef enum {
DND_REASON_ONLINE = 0,
......@@ -159,8 +107,8 @@ typedef struct {
int32_t id;
ETrnStage stage;
ETrnPolicy policy;
ETrnType type;
ETrnExecType parallel;
ETrnConflct conflict;
ETrnExec exec;
int32_t code;
int32_t failedTimes;
SRpcHandleInfo rpcInfo;
......@@ -172,10 +120,11 @@ typedef struct {
SArray* commitActions;
int64_t createdTime;
int64_t lastExecTime;
int64_t dbUid;
int32_t lastErrorAction;
int32_t lastErrorNo;
tmsg_t lastErrorMsgType;
SEpSet lastErrorEpset;
char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_ERROR_LEN];
char desc[TSDB_TRANS_DESC_LEN];
int32_t startFunc;
int32_t stopFunc;
int32_t paramLen;
......@@ -219,6 +168,7 @@ typedef struct {
int64_t createdTime;
int64_t updateTime;
SDnodeObj* pDnode;
SQnodeLoad load;
} SQnodeObj;
typedef struct {
......@@ -343,6 +293,7 @@ typedef struct {
int8_t isTsma;
int8_t replica;
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
void* pTsma;
} SVgObj;
typedef struct {
......
......@@ -22,9 +22,15 @@
extern "C" {
#endif
#define QNODE_LOAD_VALUE(pQnode) (pQnode ? (pQnode->load.numOfQueryInQueue + pQnode->load.numOfFetchInQueue) : 0)
int32_t mndInitQnode(SMnode *pMnode);
void mndCleanupQnode(SMnode *pMnode);
SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId);
void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj);
int32_t mndCreateQnodeList(SMnode *pMnode, SArray** pList, int32_t limit);
#ifdef __cplusplus
}
#endif
......
......@@ -34,7 +34,7 @@ typedef struct {
int32_t errCode;
int32_t acceptableCode;
int8_t stage;
int8_t isRaw;
int8_t actionType; // 0-msg, 1-raw
int8_t rawWritten;
int8_t msgSent;
int8_t msgReceived;
......@@ -52,7 +52,7 @@ void mndCleanupTrans(SMnode *pMnode);
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId);
void mndReleaseTrans(SMnode *pMnode, STrans *pTrans);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq);
void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
......@@ -62,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
void mndTransSetNoParallel(STrans *pTrans);
void mndTransSetSerial(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SRpcMsg *pRsp);
......
......@@ -80,7 +80,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
mDebug("acct:%s, will be created when deploying, raw:%p", acctObj.acct, pRaw);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_ACCT, NULL);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL);
if (pTrans == NULL) {
mError("acct:%s, failed to create since %s", acctObj.acct, terrstr());
return -1;
......
......@@ -246,7 +246,7 @@ static int32_t mndCreateBnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
bnodeObj.createdTime = taosGetTimestampMs();
bnodeObj.updateTime = bnodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_BNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
......@@ -363,7 +363,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
static int32_t mndDropBnode(SMnode *pMnode, SRpcMsg *pReq, SBnodeObj *pObj) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
......
......@@ -179,10 +179,8 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_CLUSTER, NULL);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL);
if (pTrans == NULL) {
mError("cluster:%" PRId64 ", failed to create since %s", clusterObj.id, terrstr());
return -1;
......@@ -204,7 +202,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
mndTransDrop(pTrans);
return 0;
#endif
}
static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
......
......@@ -97,7 +97,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CONSUMER_LOST, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
if (pTrans == NULL) goto FAIL;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
......@@ -121,7 +121,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CONSUMER_RECOVER, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
if (pTrans == NULL) goto FAIL;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
......@@ -403,7 +403,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
for (int32_t i = 0; i < newTopicNum; i++) {
......
......@@ -545,7 +545,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
}
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DB, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
......@@ -775,7 +775,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
......@@ -1036,7 +1036,7 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo
static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
......
......@@ -17,6 +17,7 @@
#include "mndDnode.h"
#include "mndAuth.h"
#include "mndMnode.h"
#include "mndQnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
......@@ -100,10 +101,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
mDebug("dnode:%d, will be created when deploying, raw:%p", dnodeObj.id, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_DNODE, NULL);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
if (pTrans == NULL) {
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
return -1;
......@@ -125,7 +123,6 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
mndTransDrop(pTrans);
return 0;
#endif
}
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
......@@ -259,7 +256,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
int64_t interval = TABS(pDnode->lastAccessTime - curMs);
if (interval > 30000 * tsStatusInterval) {
if (interval > 5000 * tsStatusInterval) {
if (pDnode->rebootTime > 0) {
pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
}
......@@ -388,6 +385,12 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mndReleaseMnode(pMnode, pObj);
}
SQnodeObj *pQnode = mndAcquireQnode(pMnode, statusReq.qload.dnodeId);
if (pQnode != NULL) {
pQnode->load = statusReq.qload;
mndReleaseQnode(pMnode, pQnode);
}
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
......@@ -481,7 +484,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) {
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
return -1;
......@@ -557,7 +560,7 @@ CREATE_DNODE_OVER:
}
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_DNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) {
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
return -1;
......@@ -610,7 +613,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
if (pMObj != NULL) {
terrno = TSDB_CODE_MND_MNODE_DEPLOYED;
terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
goto DROP_DNODE_OVER;
}
......
......@@ -215,7 +215,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
}
memcpy(func.pCode, pCreate->pCode, func.codeSize);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, pReq);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
......@@ -245,7 +245,7 @@ _OVER:
static int32_t mndDropFunc(SMnode *pMnode, SRpcMsg *pReq, SFuncObj *pFunc) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_FUNC, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
......
......@@ -369,7 +369,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
......@@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
} else if (code == 0) {
mTrace("msg:%p, successfully processed and response", pMsg);
} else {
mDebug("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
}
......@@ -686,4 +686,4 @@ void mndReleaseSyncRef(SMnode *pMnode) {
int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1);
mTrace("mnode sync is released, ref:%d", ref);
taosThreadRwlockUnlock(&pMnode->lock);
}
\ No newline at end of file
}
......@@ -18,9 +18,9 @@
#include "mndAuth.h"
#include "mndDnode.h"
#include "mndShow.h"
#include "mndSync.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndSync.h"
#define MNODE_VER_NUMBER 1
#define MNODE_RESERVE_SIZE 64
......@@ -92,10 +92,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
mDebug("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_DNODE, NULL);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
if (pTrans == NULL) {
mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
return -1;
......@@ -117,7 +114,6 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
mndTransDrop(pTrans);
return 0;
#endif
}
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
......@@ -363,11 +359,11 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
mnodeObj.createdTime = taosGetTimestampMs();
mnodeObj.updateTime = mnodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_MNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
mndTransSetNoParallel(pTrans);
mndTransSetSerial(pTrans);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
......@@ -396,6 +392,11 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
mDebug("mnode:%d, start to create", createReq.dnodeId);
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
goto _OVER;
}
pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
if (pObj != NULL) {
terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
......@@ -535,11 +536,11 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
mndTransSetNoParallel(pTrans);
mndTransSetSerial(pTrans);
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
......@@ -632,6 +633,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t cols = 0;
SMnodeObj *pObj = NULL;
char *pWrite;
int64_t curMs = taosGetTimestampMs();
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj);
......@@ -647,11 +649,16 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b1, false);
bool online = mndIsDnodeOnline(pMnode, pObj->pDnode, curMs);
const char *roles = NULL;
if (pObj->id == pMnode->selfDnodeId) {
roles = syncStr(TAOS_SYNC_STATE_LEADER);
} else {
roles = syncStr(pObj->state);
if (!online) {
roles = "OFFLINE";
} else {
roles = syncStr(pObj->state);
}
}
char *b2 = taosMemoryCalloc(1, 12 + VARSTR_HEADER_SIZE);
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
......
......@@ -179,7 +179,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_COMMIT_OFFSET, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg);
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
......
......@@ -18,6 +18,7 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndQnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndUser.h"
......@@ -382,6 +383,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
rspBasic->onlineDnodes = 1; // TODO
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
mndReleaseConn(pMnode, pConn);
hbRsp.query = rspBasic;
......
......@@ -409,7 +409,8 @@ static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
SSmaObj *pSma) {
SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
......@@ -419,9 +420,14 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
mndReleaseDnode(pMnode, pDnode);
// todo add sma info here
int32_t smaContLen = 0;
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
if (pSmaReq == NULL) return -1;
pVgroup->pTsma = pSmaReq;
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
taosMemoryFreeClear(pSmaReq);
if (pReq == NULL) return -1;
action.pCont = pReq;
......@@ -502,19 +508,19 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.fixedSinkVgId = smaObj.dstVgId;
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetNoParallel(pTrans);
mndTransSetSerial(pTrans);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......@@ -747,7 +753,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
if (pVgroup == NULL) goto _OVER;
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_SMA, pReq);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
......
......@@ -253,7 +253,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
snodeObj.createdTime = taosGetTimestampMs();
snodeObj.updateTime = snodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
......@@ -372,7 +372,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
static int32_t mndDropSnode(SMnode *pMnode, SRpcMsg *pReq, SSnodeObj *pObj) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
......
......@@ -402,7 +402,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
tstrncpy(streamObj.targetDb, pDb->name, TSDB_DB_FNAME_LEN);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
......
......@@ -218,6 +218,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.hashMethod = pDb->cfg.hashMethod;
createReq.numOfRetensions = pDb->cfg.numOfRetensions;
createReq.pRetensions = pDb->cfg.pRetensions;
createReq.isTsma = pVgroup->isTsma;
createReq.pTsma = pVgroup->pTsma;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &createReq.replicas[v];
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册