提交 588c4f5a 编写于 作者: wmmhello's avatar wmmhello

Merge branch '3.0' into feat/tag_refact

此差异已折叠。
......@@ -93,10 +93,13 @@ title: TDengine 参数限制与保留关键字
`TBNAME` 可以视为超级表中一个特殊的标签,代表子表的表名。
获取一个超级表所有的子表名及相关的标签信息:
```mysql
SELECT TBNAME, location FROM meters;
```
统计超级表下辖子表数量:
```mysql
SELECT COUNT(TBNAME) FROM meters;
```
......
......@@ -56,6 +56,7 @@ There are about 200 keywords reserved by TDengine, they can't be used as the nam
Get the table name and tag values of all subtables in a STable.
```mysql
SELECT TBNAME, location FROM meters;
```
Count the number of subtables in a STable.
```mysql
......
......@@ -196,8 +196,9 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
if (cnt >= 2) break;
/*printf("get data\n");*/
/*msg_process(tmqmessage);*/
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
......@@ -253,39 +254,6 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
fprintf(stderr, "%% Consumer closed\n");
}
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
printf("subscribe err\n");
return;
}
int32_t batchCnt = 0;
int32_t skipLogNum = 0;
clock_t startTime = clock();
while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
batchCnt++;
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
/*msg_process(tmqmessage);*/
taos_free_result(tmqmessage);
} else {
break;
}
}
clock_t endTime = clock();
printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum,
(double)(endTime - startTime) / CLOCKS_PER_SEC);
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main(int argc, char* argv[]) {
if (argc > 1) {
printf("env init\n");
......@@ -296,7 +264,6 @@ int main(int argc, char* argv[]) {
}
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
/*perf_loop(tmq, topic_list);*/
/*basic_consume_loop(tmq, topic_list);*/
sync_consume_loop(tmq, topic_list);
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
}
......@@ -230,7 +230,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
......
......@@ -1033,6 +1033,7 @@ typedef struct {
// for tsma
int8_t isTsma;
void* pTsma;
} SCreateVnodeReq;
......@@ -2446,7 +2447,7 @@ typedef struct {
int32_t epoch;
uint64_t reqId;
int64_t consumerId;
int64_t waitTime;
int64_t timeout;
int64_t currentOffset;
} SMqPollReq;
......
......@@ -182,7 +182,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_BNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0356)
#define TSDB_CODE_MND_BNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0357)
#define TSDB_CODE_MND_TOO_FEW_MNODES TAOS_DEF_ERROR_CODE(0, 0x0358)
#define TSDB_CODE_MND_MNODE_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_TOO_MANY_MNODES TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_CANT_DROP_MASTER TAOS_DEF_ERROR_CODE(0, 0x035A)
// mnode-acct
......
......@@ -253,8 +253,7 @@ typedef enum ELogicConditionType {
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 64
#define TSDB_TRANS_DESC_LEN 128
#define TSDB_TRANS_ERROR_LEN 512
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
......@@ -343,7 +342,7 @@ typedef enum ELogicConditionType {
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
#define TSDB_MIN_ROLLUP_FILE_FACTOR 0
#define TSDB_MAX_ROLLUP_FILE_FACTOR 1
#define TSDB_MAX_ROLLUP_FILE_FACTOR 10
#define TSDB_DEFAULT_ROLLUP_FILE_FACTOR 0.1
#define TSDB_MIN_TABLE_TTL 0
#define TSDB_DEFAULT_TABLE_TTL 0
......
......@@ -378,14 +378,16 @@ static FORCE_INLINE int32_t tDecodeDouble(SDecoder* pCoder, double* val) {
}
static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len) {
if (tDecodeU32v(pCoder, len) < 0) return -1;
uint32_t length = 0;
if (tDecodeU32v(pCoder, &length) < 0) return -1;
if (len) *len = length;
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, *len)) return -1;
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1;
if (val) {
*val = (uint8_t*)TD_CODER_CURRENT(pCoder);
}
TD_CODER_MOVE_POS(pCoder, *len);
TD_CODER_MOVE_POS(pCoder, length);
return 0;
}
......@@ -410,14 +412,16 @@ static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) {
}
static FORCE_INLINE int32_t tDecodeBinaryAlloc(SDecoder* pCoder, void** val, uint64_t* len) {
if (tDecodeU64v(pCoder, len) < 0) return -1;
uint64_t length = 0;
if (tDecodeU64v(pCoder, &length) < 0) return -1;
if (len) *len = length;
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, *len)) return -1;
*val = taosMemoryMalloc(*len);
if (TD_CODER_CHECK_CAPACITY_FAILED(pCoder, length)) return -1;
*val = taosMemoryMalloc(length);
if (*val == NULL) return -1;
memcpy(*val, TD_CODER_CURRENT(pCoder), *len);
memcpy(*val, TD_CODER_CURRENT(pCoder), length);
TD_CODER_MOVE_POS(pCoder, *len);
TD_CODER_MOVE_POS(pCoder, length);
return 0;
}
......
......@@ -1243,7 +1243,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
return TMQ_RESP_ERR__FAIL;
}
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
int64_t reqOffset;
if (pVg->currentOffset >= 0) {
reqOffset = pVg->currentOffset;
......@@ -1269,7 +1269,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic*
strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
pReq->withTbName = tmq->withTbName;
pReq->waitTime = waitTime;
pReq->timeout = timeout;
pReq->consumerId = tmq->consumerId;
pReq->epoch = tmq->epoch;
pReq->currentOffset = reqOffset;
......@@ -1297,7 +1297,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj;
}
int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
/*printf("call poll\n");*/
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
......@@ -1318,7 +1318,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
#endif
}
atomic_store_32(&pVg->vgSkipCnt, 0);
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg);
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
......@@ -1388,7 +1388,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
return 0;
}
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
while (1) {
SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper);
......@@ -1428,17 +1428,17 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
taosFreeQitem(rspWrapper);
if (pollIfReset && reset) {
tscDebug("consumer %ld reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, waitTime);
tmqPollImpl(tmq, timeout);
}
}
}
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs();
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
}
......@@ -1450,16 +1450,16 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
while (1) {
tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, wait_time) < 0) return NULL;
if (tmqPollImpl(tmq, timeout) < 0) return NULL;
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
}
if (wait_time != 0) {
if (timeout != 0) {
int64_t endTime = taosGetTimestampMs();
int64_t leftTime = endTime - startTime;
if (leftTime > wait_time) {
if (leftTime > timeout) {
tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
return NULL;
}
......@@ -1474,10 +1474,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
} else {
if (rsp == TMQ_RESP_ERR__FAIL) {
return TMQ_RESP_ERR__FAIL;
}
......@@ -1485,10 +1482,7 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
} else {
if (rsp == TMQ_RESP_ERR__FAIL) {
return TMQ_RESP_ERR__FAIL;
}
}
......
......@@ -215,7 +215,6 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "type", .bytes = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "last_error", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
......
......@@ -2973,6 +2973,11 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
}
if (tEncodeI8(&encoder, pReq->isTsma) < 0) return -1;
if (pReq->isTsma) {
uint32_t tsmaLen = (uint32_t)(htonl(((SMsgHead *)pReq->pTsma)->contLen));
if (tEncodeBinary(&encoder, (const uint8_t *)pReq->pTsma, tsmaLen) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -3036,6 +3041,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
}
if (tDecodeI8(&decoder, &pReq->isTsma) < 0) return -1;
if (pReq->isTsma) {
if (tDecodeBinaryAlloc(&decoder, &pReq->pTsma, NULL) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
......@@ -3045,6 +3053,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) {
taosArrayDestroy(pReq->pRetensions);
pReq->pRetensions = NULL;
if(pReq->isTsma) {
taosMemoryFreeClear(pReq->pTsma);
}
return 0;
}
......
......@@ -140,6 +140,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->szCache = pCreate->pages;
pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
pCfg->isWeak = true;
pCfg->isTsma = pCreate->isTsma;
pCfg->tsdbCfg.compression = pCreate->compression;
pCfg->tsdbCfg.precision = pCreate->precision;
pCfg->tsdbCfg.days = pCreate->daysPerFile;
......@@ -209,7 +210,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
code = terrno;
goto _OVER;
}
......
......@@ -54,9 +54,11 @@ typedef enum {
} EAuthOp;
typedef enum {
TRN_STEP_LOG = 1,
TRN_STEP_ACTION = 2,
} ETrnStep;
TRN_CONFLICT_NOTHING = 0,
TRN_CONFLICT_GLOBAL = 1,
TRN_CONFLICT_DB = 2,
TRN_CONFLICT_DB_INSIDE = 3,
} ETrnConflct;
typedef enum {
TRN_STAGE_PREPARE = 0,
......@@ -68,69 +70,15 @@ typedef enum {
TRN_STAGE_FINISHED = 6
} ETrnStage;
typedef enum {
TRN_TYPE_BASIC_SCOPE = 1000,
TRN_TYPE_CREATE_ACCT = 1001,
TRN_TYPE_CREATE_CLUSTER = 1002,
TRN_TYPE_CREATE_USER = 1003,
TRN_TYPE_ALTER_USER = 1004,
TRN_TYPE_DROP_USER = 1005,
TRN_TYPE_CREATE_FUNC = 1006,
TRN_TYPE_DROP_FUNC = 1007,
TRN_TYPE_CREATE_SNODE = 1010,
TRN_TYPE_DROP_SNODE = 1011,
TRN_TYPE_CREATE_QNODE = 1012,
TRN_TYPE_DROP_QNODE = 10013,
TRN_TYPE_CREATE_BNODE = 1014,
TRN_TYPE_DROP_BNODE = 1015,
TRN_TYPE_CREATE_MNODE = 1016,
TRN_TYPE_DROP_MNODE = 1017,
TRN_TYPE_CREATE_TOPIC = 1020,
TRN_TYPE_DROP_TOPIC = 1021,
TRN_TYPE_SUBSCRIBE = 1022,
TRN_TYPE_REBALANCE = 1023,
TRN_TYPE_COMMIT_OFFSET = 1024,
TRN_TYPE_CREATE_STREAM = 1025,
TRN_TYPE_DROP_STREAM = 1026,
TRN_TYPE_ALTER_STREAM = 1027,
TRN_TYPE_CONSUMER_LOST = 1028,
TRN_TYPE_CONSUMER_RECOVER = 1029,
TRN_TYPE_DROP_CGROUP = 1030,
TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000,
TRN_TYPE_CREATE_DNODE = 2001,
TRN_TYPE_DROP_DNODE = 2002,
TRN_TYPE_GLOBAL_SCOPE_END,
TRN_TYPE_DB_SCOPE = 3000,
TRN_TYPE_CREATE_DB = 3001,
TRN_TYPE_ALTER_DB = 3002,
TRN_TYPE_DROP_DB = 3003,
TRN_TYPE_SPLIT_VGROUP = 3004,
TRN_TYPE_MERGE_VGROUP = 3015,
TRN_TYPE_DB_SCOPE_END,
TRN_TYPE_STB_SCOPE = 4000,
TRN_TYPE_CREATE_STB = 4001,
TRN_TYPE_ALTER_STB = 4002,
TRN_TYPE_DROP_STB = 4003,
TRN_TYPE_CREATE_SMA = 4004,
TRN_TYPE_DROP_SMA = 4005,
TRN_TYPE_STB_SCOPE_END,
} ETrnType;
typedef enum {
TRN_POLICY_ROLLBACK = 0,
TRN_POLICY_RETRY = 1,
} ETrnPolicy;
typedef enum {
TRN_EXEC_PARALLEL = 0,
TRN_EXEC_NO_PARALLEL = 1,
} ETrnExecType;
TRN_EXEC_PRARLLEL = 0,
TRN_EXEC_SERIAL = 1,
} ETrnExec;
typedef enum {
DND_REASON_ONLINE = 0,
......@@ -159,8 +107,8 @@ typedef struct {
int32_t id;
ETrnStage stage;
ETrnPolicy policy;
ETrnType type;
ETrnExecType parallel;
ETrnConflct conflict;
ETrnExec exec;
int32_t code;
int32_t failedTimes;
SRpcHandleInfo rpcInfo;
......@@ -172,10 +120,11 @@ typedef struct {
SArray* commitActions;
int64_t createdTime;
int64_t lastExecTime;
int64_t dbUid;
int32_t lastErrorAction;
int32_t lastErrorNo;
tmsg_t lastErrorMsgType;
SEpSet lastErrorEpset;
char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_ERROR_LEN];
char desc[TSDB_TRANS_DESC_LEN];
int32_t startFunc;
int32_t stopFunc;
int32_t paramLen;
......@@ -344,6 +293,7 @@ typedef struct {
int8_t isTsma;
int8_t replica;
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
void* pTsma;
} SVgObj;
typedef struct {
......
......@@ -34,7 +34,7 @@ typedef struct {
int32_t errCode;
int32_t acceptableCode;
int8_t stage;
int8_t isRaw;
int8_t actionType; // 0-msg, 1-raw
int8_t rawWritten;
int8_t msgSent;
int8_t msgReceived;
......@@ -52,7 +52,7 @@ void mndCleanupTrans(SMnode *pMnode);
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId);
void mndReleaseTrans(SMnode *pMnode, STrans *pTrans);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq);
void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
......@@ -62,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
void mndTransSetNoParallel(STrans *pTrans);
void mndTransSetSerial(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SRpcMsg *pRsp);
......
......@@ -80,7 +80,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
mDebug("acct:%s, will be created when deploying, raw:%p", acctObj.acct, pRaw);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_ACCT, NULL);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL);
if (pTrans == NULL) {
mError("acct:%s, failed to create since %s", acctObj.acct, terrstr());
return -1;
......
......@@ -246,7 +246,7 @@ static int32_t mndCreateBnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
bnodeObj.createdTime = taosGetTimestampMs();
bnodeObj.updateTime = bnodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_BNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
......@@ -363,7 +363,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
static int32_t mndDropBnode(SMnode *pMnode, SRpcMsg *pReq, SBnodeObj *pObj) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
......
......@@ -179,10 +179,8 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_CLUSTER, NULL);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL);
if (pTrans == NULL) {
mError("cluster:%" PRId64 ", failed to create since %s", clusterObj.id, terrstr());
return -1;
......@@ -204,7 +202,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
mndTransDrop(pTrans);
return 0;
#endif
}
static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
......
......@@ -97,7 +97,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CONSUMER_LOST, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
if (pTrans == NULL) goto FAIL;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
......@@ -121,7 +121,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
mndReleaseConsumer(pMnode, pConsumer);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CONSUMER_RECOVER, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
if (pTrans == NULL) goto FAIL;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
......@@ -403,7 +403,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
for (int32_t i = 0; i < newTopicNum; i++) {
......
......@@ -545,7 +545,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
}
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DB, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
......@@ -775,7 +775,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
......@@ -1036,7 +1036,7 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo
static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
......
......@@ -101,10 +101,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
mDebug("dnode:%d, will be created when deploying, raw:%p", dnodeObj.id, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_DNODE, NULL);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
if (pTrans == NULL) {
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
return -1;
......@@ -126,7 +123,6 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
mndTransDrop(pTrans);
return 0;
#endif
}
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
......@@ -260,7 +256,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
int64_t interval = TABS(pDnode->lastAccessTime - curMs);
if (interval > 30000 * tsStatusInterval) {
if (interval > 5000 * tsStatusInterval) {
if (pDnode->rebootTime > 0) {
pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
}
......@@ -488,7 +484,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) {
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
return -1;
......@@ -564,7 +560,7 @@ CREATE_DNODE_OVER:
}
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_DNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) {
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
return -1;
......@@ -617,7 +613,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
if (pMObj != NULL) {
terrno = TSDB_CODE_MND_MNODE_DEPLOYED;
terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
goto DROP_DNODE_OVER;
}
......
......@@ -215,7 +215,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
}
memcpy(func.pCode, pCreate->pCode, func.codeSize);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, pReq);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
......@@ -245,7 +245,7 @@ _OVER:
static int32_t mndDropFunc(SMnode *pMnode, SRpcMsg *pReq, SFuncObj *pFunc) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_FUNC, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
......
......@@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
} else if (code == 0) {
mTrace("msg:%p, successfully processed and response", pMsg);
} else {
mDebug("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
}
......
......@@ -18,9 +18,9 @@
#include "mndAuth.h"
#include "mndDnode.h"
#include "mndShow.h"
#include "mndSync.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndSync.h"
#define MNODE_VER_NUMBER 1
#define MNODE_RESERVE_SIZE 64
......@@ -92,10 +92,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
mDebug("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_DNODE, NULL);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
if (pTrans == NULL) {
mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
return -1;
......@@ -117,7 +114,6 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
mndTransDrop(pTrans);
return 0;
#endif
}
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
......@@ -363,11 +359,11 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
mnodeObj.createdTime = taosGetTimestampMs();
mnodeObj.updateTime = mnodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_MNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
mndTransSetNoParallel(pTrans);
mndTransSetSerial(pTrans);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
......@@ -396,6 +392,11 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
mDebug("mnode:%d, start to create", createReq.dnodeId);
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
goto _OVER;
}
pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
if (pObj != NULL) {
terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
......@@ -535,11 +536,11 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
mndTransSetNoParallel(pTrans);
mndTransSetSerial(pTrans);
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
......@@ -632,6 +633,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t cols = 0;
SMnodeObj *pObj = NULL;
char *pWrite;
int64_t curMs = taosGetTimestampMs();
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj);
......@@ -647,12 +649,17 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b1, false);
bool online = mndIsDnodeOnline(pMnode, pObj->pDnode, curMs);
const char *roles = NULL;
if (pObj->id == pMnode->selfDnodeId) {
roles = syncStr(TAOS_SYNC_STATE_LEADER);
} else {
if (!online) {
roles = "OFFLINE";
} else {
roles = syncStr(pObj->state);
}
}
char *b2 = taosMemoryCalloc(1, 12 + VARSTR_HEADER_SIZE);
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
......
......@@ -179,7 +179,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_COMMIT_OFFSET, pMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg);
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
......
......@@ -248,7 +248,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
qnodeObj.createdTime = taosGetTimestampMs();
qnodeObj.updateTime = qnodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_QNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
......@@ -365,7 +365,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
static int32_t mndDropQnode(SMnode *pMnode, SRpcMsg *pReq, SQnodeObj *pObj) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_QNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
......
......@@ -409,7 +409,8 @@ static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
SSmaObj *pSma) {
SVnodeGid *pVgid = pVgroup->vnodeGid + 0;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) return -1;
......@@ -419,9 +420,14 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
mndReleaseDnode(pMnode, pDnode);
// todo add sma info here
int32_t smaContLen = 0;
void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen);
if (pSmaReq == NULL) return -1;
pVgroup->pTsma = pSmaReq;
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
taosMemoryFreeClear(pSmaReq);
if (pReq == NULL) return -1;
action.pCont = pReq;
......@@ -502,19 +508,19 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.fixedSinkVgId = smaObj.dstVgId;
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetNoParallel(pTrans);
mndTransSetSerial(pTrans);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......@@ -747,7 +753,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
if (pVgroup == NULL) goto _OVER;
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_SMA, pReq);
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
......
......@@ -253,7 +253,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
snodeObj.createdTime = taosGetTimestampMs();
snodeObj.updateTime = snodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
......@@ -372,7 +372,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
static int32_t mndDropSnode(SMnode *pMnode, SRpcMsg *pReq, SSnodeObj *pObj) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
......
......@@ -735,7 +735,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
......@@ -1257,7 +1257,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
if (code != 0) goto _OVER;
code = -1;
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_STB, pReq);
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
......@@ -1403,7 +1403,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_STB, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
......
......@@ -402,7 +402,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
tstrncpy(streamObj.targetDb, pDb->name, TSDB_DB_FNAME_LEN);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
......
......@@ -394,8 +394,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
mInfo("rebalance calculation completed, rebalanced vg:");
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
mInfo("vg: %d moved from consumer %ld to consumer %ld", pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId,
pOutputRebVg->newConsumerId);
mInfo("vgId:%d moved from consumer %" PRId64 " to consumer %" PRId64, pOutputRebVg->pVgEp->vgId,
pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
}
// 9. clear
......@@ -405,10 +405,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_REBALANCE, pMsg);
if (pTrans == NULL) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg);
if (pTrans == NULL) return -1;
// make txn:
// 1. redo action: action to all vg
const SArray *rebVgs = pOutput->rebVgs;
......@@ -625,7 +624,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_CGROUP, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
......
......@@ -383,7 +383,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
/*topicObj.withSchema = 1;*/
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFreeClear(topicObj.ast);
......@@ -551,7 +551,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
#endif
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
return -1;
......
......@@ -79,10 +79,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
mDebug("user:%s, will be created when deploying, raw:%p", userObj.user, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_USER, NULL);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL);
if (pTrans == NULL) {
mError("user:%s, failed to create since %s", userObj.user, terrstr());
return -1;
......@@ -104,7 +101,6 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
mndTransDrop(pTrans);
return 0;
#endif
}
static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
......@@ -291,7 +287,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
userObj.updateTime = userObj.createdTime;
userObj.superUser = pCreate->superUser;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
return -1;
......@@ -371,7 +367,7 @@ _OVER:
}
static int32_t mndAlterUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("user:%s, failed to alter since %s", pOld->user, terrstr());
return -1;
......@@ -578,7 +574,7 @@ _OVER:
}
static int32_t mndDropUser(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_USER, pReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) {
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
return -1;
......
......@@ -218,6 +218,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.hashMethod = pDb->cfg.hashMethod;
createReq.numOfRetensions = pDb->cfg.numOfRetensions;
createReq.pRetensions = pDb->cfg.pRetensions;
createReq.isTsma = pVgroup->isTsma;
createReq.pTsma = pVgroup->pTsma;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &createReq.replicas[v];
......
......@@ -11,6 +11,8 @@
#include <gtest/gtest.h>
#if 0
#include "mndTrans.h"
#include "mndUser.h"
#include "tcache.h"
......@@ -103,7 +105,7 @@ class MndTestTrans2 : public ::testing::Test {
void SetUp() override {}
void TearDown() override {}
int32_t CreateUserLog(const char *acct, const char *user, ETrnType type, SDbObj *pDb) {
int32_t CreateUserLog(const char *acct, const char *user, ETrnConflct conflict, SDbObj *pDb) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN);
......@@ -113,7 +115,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, type, &rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, conflict, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
......@@ -135,7 +137,7 @@ class MndTestTrans2 : public ::testing::Test {
return code;
}
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy, ETrnType type,
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy, ETrnConflct conflict,
SDbObj *pDb) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
......@@ -146,7 +148,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, policy, type, &rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, policy, conflict, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
......@@ -218,7 +220,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
......@@ -528,3 +530,5 @@ TEST_F(MndTestTrans2, 04_Conflict) {
mndReleaseUser(pMnode, pUser);
}
}
#endif
\ No newline at end of file
......@@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut
)
add_test(
if(NOT TD_WINDOWS)
add_test(
NAME userTest
COMMAND userTest
)
)
endif(NOT TD_WINDOWS)
......@@ -171,12 +171,13 @@ struct SVnodeCfg {
uint64_t szBuf;
bool isHeap;
bool isWeak;
int8_t isTsma;
int8_t hashMethod;
STsdbCfg tsdbCfg;
SWalCfg walCfg;
SSyncCfg syncCfg;
uint32_t hashBegin;
uint32_t hashEnd;
int8_t hashMethod;
};
typedef struct {
......
......@@ -70,8 +70,8 @@ typedef struct {
int32_t epoch;
int32_t skipLogNum;
int64_t reqOffset;
SRpcHandleInfo info;
SRWLatch lock;
SRpcMsg* handle;
} STqPushHandle;
#if 0
......
......@@ -414,7 +414,7 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
}
taosMemoryFreeClear(pReq);
} else {
smaWarn("vgId:%d no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno));
smaDebug("vgId:%d no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno));
}
taosArrayDestroy(pResult);
......
......@@ -180,40 +180,6 @@ void tqClose(STQ* pTq) {
// TODO
}
#if 0
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
/*if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;*/
/*if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;*/
/*if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;*/
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1;
if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
/*if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;*/
/*if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;*/
/*if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;*/
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
}
#endif
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
......@@ -290,8 +256,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
taosWLockLatch(&pHandle->pushHandle.lock);
SRpcMsg* pMsg = atomic_load_ptr(&pHandle->pushHandle.handle);
ASSERT(pMsg);
/*SRpcHandleInfo* pInfo = atomic_load_ptr(&pHandle->pushHandle.pInfo);*/
/*ASSERT(pInfo);*/
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pHandle->pushHandle.reqOffset;
......@@ -318,7 +284,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
// todo free
return -1;
}
......@@ -329,10 +295,16 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp);
SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0};
SRpcMsg resp = {
.info = pHandle->pushHandle.info,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
atomic_store_ptr(&pHandle->pushHandle.handle, NULL);
/*atomic_store_ptr(&pHandle->pushHandle.pInfo, NULL);*/
memset(&pHandle->pushHandle.info, 0, sizeof(SRpcHandleInfo));
taosWUnLockLatch(&pHandle->pushHandle.lock);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
......@@ -374,7 +346,7 @@ int tqCommit(STQ* pTq) {
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t waitTime = pReq->waitTime;
int64_t waitTime = pReq->timeout;
int32_t reqEpoch = pReq->epoch;
int64_t fetchOffset;
......@@ -410,24 +382,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
rsp.withTbName = pReq->withTbName;
if (rsp.withTbName) {
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
}
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
rsp.withSchema = false;
rsp.withTag = false;
} else {
rsp.withSchema = true;
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
rsp.withTag = false;
}
/*int8_t withTbName = pExec->withTbName;*/
/*if (pReq->withTbName != -1) {*/
/*withTbName = pReq->withTbName;*/
/*}*/
/*rsp.withTbName = withTbName;*/
while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) {
......@@ -443,15 +413,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SWalReadHead* pHead = &pHeadWithCkSum->head;
#if 0
SWalReadHead* pHead;
if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset);
#if 0
// add to pushMgr
taosWLockLatch(&pExec->pushHandle.lock);
......@@ -473,10 +434,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
#endif
break;
}
#endif
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
......@@ -533,8 +490,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO wrap in destroy func
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
if (rsp.withSchema) {
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
}
if (rsp.withTbName) {
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
}
return 0;
}
......
......@@ -56,6 +56,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isTsma", pCfg->isTsma) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
......@@ -130,6 +131,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if(code < 0) return -1;
tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak, code);
if(code < 0) return -1;
tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code);
if(code < 0) return -1;
tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code);
if(code < 0) return -1;
tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code);
......
......@@ -97,7 +97,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
}
// open tsdb
if (!vnodeIsRollup(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, TSDB_TYPE_TSDB) < 0) {
if (!vnodeIsRollup(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) {
vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
......
......@@ -1646,8 +1646,8 @@ bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInf
pInfo->startVal = IS_FLOAT_TYPE(pCtx->param[1].param.nType) ? pCtx->param[1].param.d :
(double)pCtx->param[1].param.i;
pInfo->stepVal = IS_FLOAT_TYPE(pCtx->param[1].param.nType) ? pCtx->param[2].param.d :
(double)pCtx->param[1].param.i;
pInfo->stepVal = IS_FLOAT_TYPE(pCtx->param[2].param.nType) ? pCtx->param[2].param.d :
(double)pCtx->param[2].param.i;
return true;
}
......
......@@ -17,7 +17,9 @@ TARGET_INCLUDE_DIRECTORIES(
PUBLIC "${TD_SOURCE_DIR}/source/libs/parser/inc"
PRIVATE "${TD_SOURCE_DIR}/source/libs/scalar/inc"
)
add_test(
if(NOT TD_WINDOWS)
add_test(
NAME scalarTest
COMMAND scalarTest
)
)
endif(NOT TD_WINDOWS)
......@@ -10,7 +10,11 @@ target_include_directories(
PUBLIC "${TD_SOURCE_DIR}/contrib/msvcregex"
)
# iconv
find_path(IconvApiIncludes iconv.h PATHS)
if(TD_WINDOWS)
find_path(IconvApiIncludes iconv.h "${TD_SOURCE_DIR}/contrib/iconv")
else()
find_path(IconvApiIncludes iconv.h PATHS)
endif(TD_WINDOWS)
if(NOT IconvApiIncludes)
add_definitions(-DDISALLOW_NCHAR_WITHOUT_ICONV)
endif ()
......
......@@ -186,9 +186,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_ALREADY_EXIST, "Bnode already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_NOT_EXIST, "Bnode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "Too few mnodes")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_DEPLOYED, "Mnode deployed in this dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CANT_DROP_MASTER, "Can't drop mnode which is master")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "The replicas of mnode cannot less than 1")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_MNODES, "The replicas of mnode cannot exceed 3")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CANT_DROP_MASTER, "Can't drop mnode which is LEADER")
// mnode-acct
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, "Account already exists")
......
......@@ -57,7 +57,6 @@
# ---- mnode
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim
# ---- show
./test.sh -f tsim/show/basic.sim
......
......@@ -2,14 +2,17 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
print =============== step1: create dnodes
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$x = 0
step1:
......@@ -32,6 +35,7 @@ endi
print =============== step2: create mnode 2
sql create mnode on dnode 2
sql create mnode on dnode 3
sql_error create mnode on dnode 4
$x = 0
step2:
......@@ -106,6 +110,10 @@ print $data(1)[0] $data(1)[1] $data(1)[2]
print $data(2)[0] $data(2)[1] $data(2)[2]
print $data(3)[0] $data(3)[1] $data(3)[2]
if $data(2)[2] != OFFLINE then
goto step5
endi
sql show users
if $rows != 2 then
return -1
......@@ -135,3 +143,4 @@ endi
system sh/exec.sh -n dnode1 -s stop
system sh/exec.sh -n dnode2 -s stop
system sh/exec.sh -n dnode3 -s stop
system sh/exec.sh -n dnode4 -s stop
\ No newline at end of file
......@@ -76,14 +76,6 @@ if $data[0][3] != d1 then
return -1
endi
if $data[0][4] != create-db then
return -1
endi
if $data[0][7] != @Unable to establish connection@ then
return -1
endi
sql_error create database d1 vgroups 2;
print =============== start dnode2
......@@ -125,15 +117,7 @@ endi
if $data[0][3] != d2 then
return -1
endi
if $data[0][4] != create-db then
return -1
endi
if $data[0][7] != @Unable to establish connection@ then
return -1
endi
return
sql_error create database d2 vgroups 2;
print =============== kill transaction
......
......@@ -279,9 +279,9 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(item, "id", pObj->id);
tjsonAddIntegerToObject(item, "stage", pObj->stage);
tjsonAddIntegerToObject(item, "policy", pObj->policy);
tjsonAddIntegerToObject(item, "type", pObj->type);
tjsonAddIntegerToObject(item, "conflict", pObj->conflict);
tjsonAddIntegerToObject(item, "exec", pObj->exec);
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
tjsonAddStringToObject(item, "dbname", pObj->dbname);
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions));
tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions));
......
Subproject commit 4d83d8c62973506f760bcaa3a33f4665ed9046d0
Subproject commit 717f5aaa5f0a1b4d92bb2ae68858fec554fb5eda
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册