提交 9bc64e80 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

......@@ -47,7 +47,7 @@ int32_t init_env() {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -146,8 +146,8 @@ int32_t create_topic() {
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
printf("commit %d tmq %p offsets %p param %p\n", resp, tmq, offsets, param);
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
......@@ -167,7 +167,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
......@@ -183,10 +183,10 @@ tmq_list_t* build_topic_list() {
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
int32_t code;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
printf("subscribe err\n");
return;
}
......@@ -201,12 +201,13 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
taos_free_result(tmqmessage);
/*} else {*/
/*break;*/
/*tmq_commit_sync(tmq, NULL);*/
}
}
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
......@@ -215,10 +216,10 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1;
int msg_count = 0;
tmq_resp_err_t err;
int32_t code;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
return;
}
......@@ -239,14 +240,14 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
msg_process(tmqmessage);
taos_free_result(tmqmessage);
/*tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);*/
/*tmq_commit_sync(tmq, NULL);*/
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
......
......@@ -209,21 +209,20 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi
/* --------------------------TMQ INTERFACE------------------------------- */
#if 0
enum {
TMQ_RESP_ERR__FAIL = -1,
TMQ_RESP_ERR__SUCCESS = 0,
};
typedef int32_t tmq_resp_err_t;
#endif
typedef struct tmq_t tmq_t;
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
......@@ -233,25 +232,19 @@ DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT const char *tmq_err2str(int32_t code);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
// timeout: -1 means infinitely waiting
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
#endif
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
......@@ -275,11 +268,6 @@ DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
#if 0
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
#endif
/* ------------------------------ TMQ END -------------------------------- */
#if 1 // Shuduo: temporary enable for app build
......
......@@ -35,6 +35,7 @@ enum {
TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP,
TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__END_RSP,
};
typedef enum EStreamType {
......
......@@ -228,6 +228,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_FILL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION,
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE,
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE,
......
......@@ -130,13 +130,17 @@ typedef struct SMergeLogicNode {
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
typedef enum EIntervalAlgorithm {
typedef enum EWindowAlgorithm {
INTERVAL_ALGO_HASH = 1,
INTERVAL_ALGO_MERGE,
INTERVAL_ALGO_STREAM_FINAL,
INTERVAL_ALGO_STREAM_SEMI,
INTERVAL_ALGO_STREAM_SINGLE,
} EIntervalAlgorithm;
SESSION_ALGO_STREAM_SEMI,
SESSION_ALGO_STREAM_FINAL,
SESSION_ALGO_STREAM_SINGLE,
SESSION_ALGO_MERGE,
} EWindowAlgorithm;
typedef struct SWindowLogicNode {
SLogicNode node;
......@@ -153,7 +157,7 @@ typedef struct SWindowLogicNode {
int8_t triggerType;
int64_t watermark;
double filesFactor;
EIntervalAlgorithm intervalAlgo;
EWindowAlgorithm windowAlgo;
} SWindowLogicNode;
typedef struct SFillLogicNode {
......@@ -371,6 +375,8 @@ typedef struct SSessionWinodwPhysiNode {
} SSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamSemiSessionWinodwPhysiNode;
typedef SSessionWinodwPhysiNode SStreamFinalSessionWinodwPhysiNode;
typedef struct SStateWinodwPhysiNode {
SWinodwPhysiNode window;
......
......@@ -430,7 +430,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09)
#define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A)
#define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B)
#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0B)
#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0C)
#define TSDB_CODE_TQ_NO_COMMITTED_OFFSET TAOS_DEF_ERROR_CODE(0, 0x0A0D)
// wal
#define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000)
......@@ -710,6 +711,8 @@ int32_t* taosGetErrno();
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
//tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#ifdef __cplusplus
}
......
此差异已折叠。
......@@ -761,11 +761,12 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false);
char b[tListLen(offlineReason) + VARSTR_HEADER_SIZE] = {0};
char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1);
STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, b, false);
taosMemoryFreeClear(b);
numOfRows++;
sdbRelease(pSdb, pDnode);
......
......@@ -510,8 +510,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
pMsg->msgType != TDMT_MND_TRANS_TIMER) {
mError("msg:%p, failed to check mnode state since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType));
SEpSet epSet = {0};
mndGetMnodeEpSet(pMsg->info.node, &epSet);
......@@ -534,7 +533,8 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
mError("msg:%p, failed to check msg content, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
mError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_INVALID_MSG_LEN;
return -1;
}
......@@ -558,7 +558,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mTrace("msg:%p, won't response immediately since in progress", pMsg);
} else if (code == 0) {
mTrace("msg:%p, successfully processed and response", pMsg);
mTrace("msg:%p, successfully processed", pMsg);
} else {
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
......
......@@ -661,7 +661,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
}
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("trans:%d, sync to other mnodes", pTrans->id);
mDebug("trans:%d, sync to other mnodes, stage:%s", pTrans->id, mndTransStr(pTrans->stage));
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
......
......@@ -1058,7 +1058,7 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t del
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
int32_t newDnodeId) {
mDebug("vgId:%d, will add 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
mDebug("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
pVgroup->replica++;
......@@ -1074,7 +1074,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
int32_t delDnodeId) {
mDebug("vgId:%d, will remove 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
mDebug("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
SVnodeGid *pGid = NULL;
SVnodeGid delGid = {0};
......@@ -1116,7 +1116,8 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
memcpy(&newVg, pVgroup, sizeof(SVgObj));
mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
for (int32_t i = 0; i < newVg.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId,
syncStr(newVg.vnodeGid[i].role));
}
if (pNew1 != NULL && pOld1 != NULL) {
......@@ -1198,7 +1199,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
goto _OVER;
}
mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
......
......@@ -628,7 +628,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
void * tagData = NULL;
if (param->val == NULL) {
metaError("vgId:%d failed to filter NULL data", TD_VID(pMeta->pVnode));
metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
return -1;
} else {
if (IS_VAR_DATA_TYPE(param->type)) {
......
......@@ -124,18 +124,21 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
tDecoderClear(&decoder);
if (offset.type == TMQ_OFFSET__SNAPSHOT) {
tqDebug("receive offset commit msg to %s, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey, offset.uid,
offset.ts);
tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey,
pTq->pVnode->config.vgId, offset.uid, offset.ts);
} else if (offset.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s, offset(type:log) version: %ld", offset.subKey, offset.version);
tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey,
pTq->pVnode->config.vgId, offset.version);
} else {
ASSERT(0);
}
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
if (pOffset == NULL || pOffset->version < offset.version) {
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
ASSERT(0);
return -1;
}
}
return 0;
}
......@@ -149,16 +152,33 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int32_t code = 0;
// get offset to fetch message
if (pReq->currentOffset >= 0) {
fetchOffset = pReq->currentOffset + 1;
} else {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey);
if (pOffset != NULL) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
pTq->pVnode->config.vgId, pOffset->version);
fetchOffset = pOffset->version + 1;
} else {
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetCommittedVer(pTq->pWal);
} else {
fetchOffset = pReq->currentOffset + 1;
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId,
pTq->pVnode->config.vgId, pReq->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1;
}
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
pTq->pVnode->config.vgId, pReq->currentOffset, fetchOffset);
}
}
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %ld fetch offset %ld", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/
......
......@@ -15,7 +15,4 @@
#include "tq.h"
int tqCommit(STQ* pTq) {
// do nothing
return 0;
}
int tqCommit(STQ* pTq) { return tqOffsetSnapshot(pTq->pOffsetStore); }
......@@ -92,6 +92,8 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
}
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
ASSERT(pOffset->version >= 0);
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}
......@@ -129,7 +131,7 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
tEncodeSTqOffset(&encoder, pOffset);
// write file
int64_t writeLen;
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != bodyLen) {
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
ASSERT(0);
tqError("write offset incomplete, len %d, write len %ld", bodyLen, writeLen);
taosHashCancelIterate(pStore->pHash, pIter);
......
......@@ -61,6 +61,7 @@ static int32_t vnodeSetStandBy(SVnode *pVnode) {
return -1;
}
vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode));
if (syncLeaderTransfer(pVnode->sync) != 0) {
vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
return -1;
......@@ -297,7 +298,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
ret = -1;
}
if (ret != 0) {
if (ret != 0 && terrno == 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return ret;
......
......@@ -845,8 +845,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
......
......@@ -4728,18 +4728,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOptr =
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType};
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
pTaskInfo);
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
int32_t children = 0;
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
int32_t children = 1;
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
......
......@@ -392,10 +392,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
pInfo->bufPageSize = getProperSortPageSize(rowSize);
uint32_t numOfSources = taosArrayGetSize(pSortInfo);
numOfSources = TMAX(4, numOfSources);
pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
// one additional is reserved for merged result.
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1);
pOperator->fpSet =
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
......
......@@ -2425,9 +2425,12 @@ int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, Sql
return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo));
}
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -2453,12 +2456,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
}
initDummyFunction(pInfo->pDummyCtx, pInfo->binfo.pCtx, numOfCols);
pInfo->twAggSup = *pTwAggSupp;
pInfo->twAggSup = (STimeWindowAggSupp) {.waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType,
.maxTs = INT64_MIN};
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = tsSlotId;
pInfo->gap = gap;
pInfo->gap = pSessionNode->gap;
pInfo->binfo.pRes = pResBlock;
pInfo->order = TSDB_ORDER_ASC;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
......@@ -2960,25 +2965,20 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream,
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamSessionAggOperatorInfo* pInfo = NULL;
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pExprInfo, numOfCols, pResBlock, gap,
tsSlotId, pTwAggSupp, pTaskInfo);
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo);
if (pOperator == NULL) {
goto _error;
}
pOperator->name = "StreamFinalSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
int32_t numOfChild = 1; // Todo(liuyao) get it from phy plan
pInfo = pOperator->info;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo->pChildren = taosArrayInit(8, sizeof(void*));
for (int32_t i = 0; i < numOfChild; i++) {
SOperatorInfo* pChild =
createStreamSessionAggOperatorInfo(NULL, pExprInfo, numOfCols, NULL, gap, tsSlotId, pTwAggSupp, pTaskInfo);
createStreamSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo);
if (pChild == NULL) {
goto _error;
}
......@@ -2988,7 +2988,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
_error:
if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo, numOfCols);
destroyStreamSessionAggOperatorInfo(pInfo, pOperator->numOfExprs);
}
taosMemoryFreeClear(pInfo);
......
......@@ -195,6 +195,11 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
}
static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) {
pSource->src.rowIndex = -1;
++pHandle->numOfCompletedSources;
}
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) {
cmpParam->pSources = taosArrayGet(pSources, startIndex);
cmpParam->numOfSources = (endIndex - startIndex + 1);
......@@ -205,8 +210,10 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i];
// set current source is done
if (taosArrayGetSize(pSource->pageIdList) == 0) {
return TSDB_CODE_SUCCESS;
setCurrentSourceIsDone(pSource, pHandle);
continue;
}
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
......@@ -233,10 +240,9 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
SSortSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
// set current source id done
// set current source is done
if (pSource->src.pBlock == NULL) {
pSource->src.rowIndex = -1;
++pHandle->numOfCompletedSources;
setCurrentSourceIsDone(pSource, pHandle);
}
}
}
......
......@@ -427,7 +427,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(filesFactor);
COPY_SCALAR_FIELD(intervalAlgo);
COPY_SCALAR_FIELD(windowAlgo);
return (SNode*)pDst;
}
......@@ -538,6 +538,12 @@ static SNode* physiIntervalCopy(const SIntervalPhysiNode* pSrc, SIntervalPhysiNo
return (SNode*)pDst;
}
static SNode* physiSessionCopy(const SSessionWinodwPhysiNode* pSrc, SSessionWinodwPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(window, physiWindowCopy);
COPY_SCALAR_FIELD(gap);
return (SNode*)pDst;
}
static SNode* dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) {
COPY_SCALAR_FIELD(dataBlockId);
CLONE_NODE_LIST_FIELD(pSlots);
......@@ -678,6 +684,9 @@ SNode* nodesCloneNode(const SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return physiIntervalCopy((const SIntervalPhysiNode*)pNode, (SIntervalPhysiNode*)pDst);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return physiSessionCopy((const SSessionWinodwPhysiNode*)pNode, (SSessionWinodwPhysiNode*)pDst);
default:
break;
}
......
......@@ -246,6 +246,10 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return "PhysiStreamSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
return "PhysiStreamSemiSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return "PhysiStreamFinalSessionWindow";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return "PhysiStateWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
......@@ -3998,6 +4002,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiFillNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return physiSessionWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
......@@ -4131,6 +4137,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiFillNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return jsonToPhysiSessionWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
......
......@@ -289,6 +289,10 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return makeNode(type, sizeof(SStreamSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
return makeNode(type, sizeof(SStreamSemiSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return makeNode(type, sizeof(SStreamFinalSessionWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
return makeNode(type, sizeof(SStateWinodwPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
......@@ -806,6 +810,7 @@ void nodesDestroyNode(SNode* pNode) {
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode);
break;
......
......@@ -285,7 +285,7 @@ class MockCatalogServiceImpl {
}
void createSmaIndex(const SMCreateSmaReq* pReq) {
STableIndexInfo info;
STableIndexInfo info = {0};
info.intervalUnit = pReq->intervalUnit;
info.slidingUnit = pReq->slidingUnit;
info.interval = pReq->interval;
......
......@@ -548,6 +548,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW
pWindow->winType = WINDOW_TYPE_SESSION;
pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i;
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE;
pWindow->pTspk = nodesCloneNode((SNode*)pSession->pCol);
if (NULL == pWindow->pTspk) {
......@@ -572,7 +573,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
pWindow->slidingUnit =
(NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit);
pWindow->intervalAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH;
pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH;
pWindow->pTspk = nodesCloneNode(pInterval->pCol);
if (NULL == pWindow->pTspk) {
......
......@@ -977,8 +977,8 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
return code;
}
static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
switch (intervalAlgo) {
static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
switch (windowAlgo) {
case INTERVAL_ALGO_HASH:
return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
case INTERVAL_ALGO_MERGE:
......@@ -989,6 +989,14 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
case INTERVAL_ALGO_STREAM_SINGLE:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
case SESSION_ALGO_STREAM_FINAL:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
case SESSION_ALGO_STREAM_SEMI:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION;
case SESSION_ALGO_STREAM_SINGLE:
return QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
case SESSION_ALGO_MERGE:
return QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
default:
break;
}
......@@ -998,7 +1006,7 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) {
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->intervalAlgo));
pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
if (NULL == pInterval) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -1015,8 +1023,7 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION : QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION));
pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
if (NULL == pSession) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -376,8 +376,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH;
((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE;
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE;
SNodeList* pMergeKeys = NULL;
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
......@@ -400,8 +400,8 @@ static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInf
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_STREAM_FINAL;
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -421,8 +421,29 @@ static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo)
}
}
static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->windowAlgo = SESSION_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = SESSION_ALGO_STREAM_FINAL;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
++(pCxt->groupId);
return code;
}
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) {
return stbSplSplitSessionForStream(pCxt, pInfo);
} else {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
}
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
......@@ -537,10 +558,12 @@ static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets,
SNode* pNode = NULL;
FOREACH(pNode, pSortKeys) {
SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
SExprNode* pSortExpr = (SExprNode*)pSortKey->pExpr;
SNode* pTarget = NULL;
bool found = false;
FOREACH(pTarget, pTargets) {
if (0 == strcmp(((SExprNode*)pSortKey->pExpr)->aliasName, ((SColumnNode*)pTarget)->colName)) {
if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) ||
(0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
if (TSDB_CODE_SUCCESS != code) {
break;
......@@ -549,7 +572,7 @@ static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets,
}
}
if (TSDB_CODE_SUCCESS == code && !found) {
SNode* pCol = stbSplCreateColumnNode((SExprNode*)pSortKey->pExpr);
SNode* pCol = stbSplCreateColumnNode(pSortExpr);
code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol));
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(pTargets, pCol);
......
......@@ -28,6 +28,8 @@ TEST_F(PlanOrderByTest, basic) {
// ORDER BY key is not in the projection list
run("SELECT c1 FROM t1 ORDER BY c2");
run("SELECT c1 AS a FROM t1 ORDER BY a");
run("SELECT c1 + 10 AS a FROM t1 ORDER BY a");
}
......@@ -59,4 +61,6 @@ TEST_F(PlanOrderByTest, stable) {
run("SELECT c2 FROM st1 ORDER BY c1");
run("SELECT c2 FROM st1 PARTITION BY c2 ORDER BY c1");
run("SELECT c1 AS a FROM st1 ORDER BY a");
}
......@@ -713,7 +713,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId,
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
......@@ -1062,7 +1062,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
ths->commitIndex = snapshot.lastApplyIndex;
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
commitBegin, commitEnd);
}
......
......@@ -190,7 +190,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu "
......@@ -201,7 +201,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
taosMemoryFree(s);
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
......
......@@ -56,7 +56,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex);
}
......
......@@ -414,7 +414,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
assert(rid == pSyncNode->rid);
sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;
sTrace("sync get snapshot meta: lastConfigIndex:%ld", pSyncNode->pRaftCfg->lastConfigIndex);
sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
......@@ -437,7 +437,8 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
}
}
sMeta->lastConfigIndex = lastIndex;
sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex);
sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lastConfigIndex:%" PRId64, pSyncNode->vgId, snapshotIndex,
sMeta->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
......@@ -598,7 +599,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return -1;
}
assert(rid == pSyncNode->rid);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
TMSG_INFO(pMsg->msgType), pMsg->msgType);
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
......@@ -609,7 +610,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
TMSG_INFO(pMsg->msgType), pMsg->msgType);
......@@ -857,7 +858,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1;
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId,
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
return pSyncNode;
......@@ -905,7 +906,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void syncNodeClose(SSyncNode* pSyncNode) {
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId,
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
int32_t ret;
......@@ -1294,7 +1295,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
int len = 256;
char* s = (char*)taosMemoryMalloc(len);
snprintf(s, len,
"syncNode: vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, "
"syncNode: vgId:%d, currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, "
"electTimerLogicClock:%lu, "
"electTimerLogicClockUser:%lu, "
"electTimerMS:%d, replicaNum:%d",
......@@ -1345,7 +1346,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu",
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm);
if (gRaftDetailLog) {
......@@ -1400,7 +1401,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
"privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j],
......@@ -1413,7 +1414,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i;
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
"reset:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
......@@ -1426,7 +1427,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm);
}
......@@ -1436,7 +1437,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) {
snapshotSenderDestroy(oldSenders[i]);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d",
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldSenders[i], i);
oldSenders[i] = NULL;
......@@ -1509,7 +1510,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
......@@ -1551,7 +1552,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->restoreFinish = false;
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
......@@ -1857,7 +1858,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
if (pSyncNode->FpEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) {
sError("vgId:%d sync enqueue ping msg error, code:%d", pSyncNode->vgId, code);
sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg);
return;
......@@ -1891,7 +1892,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
if (pSyncNode->FpEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) {
sError("vgId:%d sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg);
return;
......@@ -1929,7 +1930,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
if (pSyncNode->FpEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) {
sError("vgId:%d sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont);
syncTimeoutDestroy(pSyncMsg);
return;
......@@ -2129,12 +2130,12 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId,
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm);
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr);
......@@ -2275,7 +2276,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0;
ESyncState state = flag;
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64
", %s",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex,
endIndex, syncUtilState2String(state));
......@@ -2327,7 +2328,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
syncUtilState2String(ths->state), pEntry->index);
}
......
......@@ -164,7 +164,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d",
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
......@@ -325,7 +325,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync(pWal, true);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d",
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
......
......@@ -47,7 +47,7 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
SSyncNode *pSyncNode = pObj->data;
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
......@@ -74,7 +74,7 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
SSyncNode *pSyncNode = pObj->data;
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
......@@ -96,7 +96,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
SSyncNode *pSyncNode = pObj->data;
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
......
......@@ -141,7 +141,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
......@@ -153,7 +153,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu",
......@@ -287,7 +287,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
......@@ -299,7 +299,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu",
......@@ -310,7 +310,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
}
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu",
......@@ -349,7 +349,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
......@@ -358,7 +358,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm);
......@@ -594,7 +594,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
......@@ -604,7 +604,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld privateTerm:%lu",
......@@ -648,7 +648,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
bool isDrop;
if (IamInNew) {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld ",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
......@@ -656,7 +656,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
"newCfg, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ",
......@@ -694,7 +694,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s",
......@@ -704,7 +704,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree(logSimpleStr);
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu",
......@@ -721,7 +721,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
......@@ -730,7 +730,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
......@@ -750,7 +750,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv "
......@@ -761,7 +761,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu",
......@@ -787,7 +787,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
......@@ -797,7 +797,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu",
......
......@@ -946,8 +946,7 @@ int32_t taosGetFqdn(char *fqdn) {
#endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) {
// printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
assert(0);
fprintf(stderr,"failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1;
}
......
......@@ -72,6 +72,9 @@ int32_t taosEnvToCfg(const char *envStr, char *cfgStr) {
if (cfgNameLen > 0) {
memcpy(cfgStr, buf, cfgNameLen);
memset(&cfgStr[cfgNameLen], ' ', p - cfgStr - cfgNameLen + 1);
} else {
*cfgStr = '\0';
return -1;
}
}
return strlen(cfgStr);
......
......@@ -578,9 +578,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
//tq
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset")
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
#ifdef TAOS_ERROR_C
};
#endif
......
@echo off
echo Executing copy_udf.bat
set SCRIPT_DIR=%cd%
echo SCRIPT_DIR: %SCRIPT_DIR%
cd ..\..\..
set TAOS_DIR=%cd%
echo find udf library in %TAOS_DIR%
set UDF1_DIR=%TAOS_DIR%\debug\build\lib\udf1.dll
set UDF2_DIR=%TAOS_DIR%\debug\build\lib\udf2.dll
echo %UDF1_DIR%
echo %UDF2_DIR%
set UDF_TMP=C:\Windows\Temp\udf
rm -rf %UDF_TMP%
mkdir %UDF_TMP%
echo Copy udf shared library files to %UDF_TMP%
cp %UDF1_DIR% %UDF_TMP%
cp %UDF2_DIR% %UDF_TMP%
......@@ -19,8 +19,14 @@ sql show databases;
sql create table t (ts timestamp, f int);
sql insert into t values(now, 1)(now+1s, 2);
sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8;
sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8;
system_content printf %OS%
if $system_content == Windows_NT then
sql create function udf1 as 'C:\\Windows\\Temp\\udf1.dll' outputtype int bufSize 8;
sql create aggregate function udf2 as 'C:\\Windows\\Temp\\udf2.dll' outputtype double bufSize 8;
else
sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8;
sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8;
endi
sql show functions;
if $rows != 2 then
return -1
......
......@@ -442,7 +442,7 @@ class TDTestCase:
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
time.sleep(3)
time.sleep(6)
tdLog.info("pkill consume processor")
if (platform.system().lower() == 'windows'):
os.system("TASKKILL /F /IM tmq_sim.exe")
......
......@@ -354,7 +354,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
tmq_resp_err_t err;
int32_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
......@@ -379,7 +379,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
}
void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) {
tmq_resp_err_t err;
int32_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
......
......@@ -110,24 +110,18 @@ static void printHelp() {
char* getCurrentTimeString(char* timeString) {
time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL);
sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d",
tm.tm_year + 1900,
tm.tm_mon + 1,
tm.tm_mday,
tm.tm_hour,
tm.tm_min,
tm.tm_sec);
sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
tm.tm_min, tm.tm_sec);
return timeString;
}
void initLogFile() {
char filename[256];
char tmpString[128];
sprintf(filename,"%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString));
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
sprintf(filename, "%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString));
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
#ifdef WINDOWS
for (int i = 2; i < sizeof(filename); i++) {
if (filename[i] == ':') filename[i] = '-';
......@@ -259,7 +253,8 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId, pInfo->numOfVgroups, vgroupId);
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
pInfo->numOfVgroups, vgroupId);
taosCloseFile(&g_fp);
exit(-1);
}
......@@ -277,7 +272,8 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
sprintf(sqlStr, "insert into %s.content_%d values (%"PRId64", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId, pInfo->ts++, buf);
sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
pInfo->ts++, buf);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
......@@ -300,7 +296,8 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
int32_t vgroupId = tmq_get_vgroup_id(msg);
taosFprintfFile(g_fp, "msg index:%" PRId64 ", consumerId: %d\n", msgIndex, pInfo->consumerId);
//taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId, tmq_get_table_name(msg));
// taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId,
// tmq_get_table_name(msg));
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), vgroupId);
while (1) {
......@@ -316,7 +313,7 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
const char* tbName = tmq_get_table_name(msg);
if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName:"null table"), totalRows, buf);
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
if (0 != g_stConfInfo.saveRowFlag) {
saveConsumeContentToTbl(pInfo, buf);
}
......@@ -342,8 +339,8 @@ int queryDB(TAOS* taos, char* command) {
return 0;
}
static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) {
pError("tmq_commit_cb_print() commit %d\n", resp);
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
pError("tmq_commit_cb_print() commit %d\n", code);
}
void build_consumer(SThreadInfo* pInfo) {
......@@ -401,16 +398,11 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
int64_t now = taosGetTimestampMs();
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (%"PRId64", %d, %" PRId64 ", %" PRId64 ", %d)",
g_stConfInfo.cdbName,
now,
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->consumeRowCnt,
pInfo->checkresult);
sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
g_stConfInfo.cdbName, now, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
char tmpString[128];
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr);
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
......@@ -421,7 +413,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
taos_free_result(pRes);
#if 0
#if 0
// vgroups
for (i = 0; i < pInfo->numOfVgroups; i++) {
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
......@@ -445,19 +437,20 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
taos_free_result(pRes);
}
#endif
#endif
return 0;
}
void loop_consume(SThreadInfo* pInfo) {
tmq_resp_err_t err;
int32_t code;
int64_t totalMsgs = 0;
int64_t totalRows = 0;
char tmpString[128];
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
pInfo->consumerId);
pInfo->ts = taosGetTimestampMs();
......@@ -503,8 +496,8 @@ void* consumeThreadFunc(void* param) {
return NULL;
}
tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err) {
int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
......@@ -519,17 +512,19 @@ void* consumeThreadFunc(void* param) {
pPrint("tmq_commit() manual commit when consume end.\n");
/*tmq_commit(pInfo->tmq, NULL, 0);*/
tmq_commit_sync(pInfo->tmq, NULL);
taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n");
pPrint("tmq_commit() manual commit over.\n");
}
err = tmq_unsubscribe(pInfo->tmq);
if (err) {
if (err != 0) {
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
/*pInfo->consumeMsgCnt = -1;*/
/*return NULL;*/
}
err = tmq_consumer_close(pInfo->tmq);
if (err) {
if (err != 0) {
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
/*exit(-1);*/
}
......
......@@ -458,11 +458,17 @@ bool simExecuteSystemContentCmd(SScript *script, char *option) {
char buf[4096] = {0};
char buf1[4096 + 512] = {0};
char filename[400] = {0};
sprintf(filename, "%s/%s.tmp", simScriptDir, script->fileName);
sprintf(filename, "%s" TD_DIRSEP "%s.tmp", simScriptDir, script->fileName);
#ifdef WINDOWS
sprintf(buf, "cd %s && ", simScriptDir);
simVisuallizeOption(script, option, buf + strlen(buf));
sprintf(buf1, "%s > %s 2>nul", buf, filename);
#else
sprintf(buf, "cd %s; ", simScriptDir);
simVisuallizeOption(script, option, buf + strlen(buf));
sprintf(buf1, "%s > %s 2>/dev/null", buf, filename);
#endif
sprintf(script->system_exit_code, "%d", system(buf1));
simStoreSystemContentResult(script, filename);
......
......@@ -206,7 +206,7 @@ SScript *simParseScript(char *fileName) {
for (int32_t i = 0; i < cmdlen; ++i) {
if (buffer[i] == '\r' || buffer[i] == '\n') {
buffer[i] = ' ';
buffer[i] = '\0';
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册