提交 338a62c3 编写于 作者: G gccgdb1234

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taosws-rs # taosws-rs
ExternalProject_Add(taosws-rs ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git GIT_REPOSITORY https://github.com/taosdata/taosws-rs.git
GIT_TAG 9fa7e2f GIT_TAG 648cc62
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -76,6 +76,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n ...@@ -76,6 +76,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
*/ */
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
/**
* @brief Cleanup SSDataBlock for StreamScanInfo
*
* @param tinfo
*/
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
* *
......
...@@ -46,9 +46,10 @@ enum { ...@@ -46,9 +46,10 @@ enum {
}; };
enum { enum {
TASK_EXEC_STATUS__IDLE = 1, TASK_SCHED_STATUS__INACTIVE = 1,
TASK_EXEC_STATUS__EXECUTING, TASK_SCHED_STATUS__WAITING,
TASK_EXEC_STATUS__CLOSING, TASK_SCHED_STATUS__ACTIVE,
TASK_SCHED_STATUS__FAILED,
}; };
enum { enum {
...@@ -201,16 +202,9 @@ typedef struct { ...@@ -201,16 +202,9 @@ typedef struct {
int8_t reserved; int8_t reserved;
} STaskSinkFetch; } STaskSinkFetch;
enum {
TASK_SOURCE__SCAN = 1,
TASK_SOURCE__PIPE,
TASK_SOURCE__MERGE,
};
enum { enum {
TASK_EXEC__NONE = 1, TASK_EXEC__NONE = 1,
TASK_EXEC__PIPE, TASK_EXEC__PIPE,
TASK_EXEC__MERGE,
}; };
enum { enum {
...@@ -226,11 +220,6 @@ enum { ...@@ -226,11 +220,6 @@ enum {
TASK_SINK__FETCH, TASK_SINK__FETCH,
}; };
enum {
TASK_INPUT_TYPE__SUMBIT_BLOCK = 1,
TASK_INPUT_TYPE__DATA_BLOCK,
};
enum { enum {
TASK_TRIGGER_STATUS__IN_ACTIVE = 1, TASK_TRIGGER_STATUS__IN_ACTIVE = 1,
TASK_TRIGGER_STATUS__ACTIVE, TASK_TRIGGER_STATUS__ACTIVE,
...@@ -256,7 +245,7 @@ typedef struct SStreamTask { ...@@ -256,7 +245,7 @@ typedef struct SStreamTask {
int16_t dispatchMsgType; int16_t dispatchMsgType;
int8_t taskStatus; int8_t taskStatus;
int8_t execStatus; int8_t schedStatus;
// node info // node info
int32_t selfChildId; int32_t selfChildId;
...@@ -475,7 +464,6 @@ typedef struct { ...@@ -475,7 +464,6 @@ typedef struct {
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask);
...@@ -487,6 +475,9 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) ...@@ -487,6 +475,9 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
typedef struct SStreamMeta SStreamMeta; typedef struct SStreamMeta SStreamMeta;
SStreamMeta* streamMetaOpen(); SStreamMeta* streamMetaOpen();
......
...@@ -189,20 +189,6 @@ typedef struct { ...@@ -189,20 +189,6 @@ typedef struct {
tsem_t rspSem; tsem_t rspSem;
} SMqPollCbParam; } SMqPollCbParam;
#if 0
typedef struct {
tmq_t* tmq;
int8_t async;
int8_t automatic;
int8_t freeOffsets;
tmq_commit_cb* userCb;
tsem_t rspSem;
int32_t rspErr;
SArray* offsets;
void* userParam;
} SMqCommitCbParam;
#endif
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
int8_t automatic; int8_t automatic;
...@@ -385,29 +371,6 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { ...@@ -385,29 +371,6 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg); return sprintf(dst, "%s:%d", topicName, vg);
} }
#if 0
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
pParam->rspErr = code;
if (pParam->async) {
if (pParam->automatic && pParam->tmq->commitCb) {
pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
} else if (!pParam->automatic && pParam->userCb) {
pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
}
if (pParam->freeOffsets) {
taosArrayDestroy(pParam->offsets);
}
taosMemoryFree(pParam);
} else {
tsem_post(&pParam->rspSem);
}
return 0;
}
#endif
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) { int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param; SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
...@@ -660,123 +623,6 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_ ...@@ -660,123 +623,6 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
return 0; return 0;
} }
#if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
tmq_commit_cb* userCb, void* userParam) {
SMqCMCommitOffsetReq req;
SArray* pOffsets = NULL;
void* buf = NULL;
SMqCommitCbParam* pParam = NULL;
SMsgSendInfo* sendInfo = NULL;
int8_t freeOffsets;
int32_t code = -1;
if (msg == NULL) {
freeOffsets = 1;
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqOffset offset;
tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
offset.vgId = pVg->vgId;
offset.offset = pVg->currentOffset;
taosArrayPush(pOffsets, &offset);
}
}
} else {
freeOffsets = 0;
pOffsets = (SArray*)&msg->container;
}
req.num = (int32_t)pOffsets->size;
req.offsets = pOffsets->pData;
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
if (code < 0) {
goto END;
}
int32_t tlen = encoder.pos;
buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
tEncoderClear(&encoder);
goto END;
}
tEncoderClear(&encoder);
tEncoderInit(&encoder, buf, tlen);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tEncoderClear(&encoder);
pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
if (pParam == NULL) {
goto END;
}
pParam->tmq = tmq;
pParam->automatic = automatic;
pParam->async = async;
pParam->offsets = pOffsets;
pParam->freeOffsets = freeOffsets;
pParam->userCb = userCb;
pParam->userParam = userParam;
if (!async) tsem_init(&pParam->rspSem, 0, 0);
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) goto END;
sendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqCommitCb;
sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
code = pParam->rspErr;
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
} else {
code = 0;
}
// avoid double free if msg is sent
buf = NULL;
END:
if (buf) taosMemoryFree(buf);
/*if (pParam) taosMemoryFree(pParam);*/
/*if (sendInfo) taosMemoryFree(sendInfo);*/
if (code != 0 && async) {
if (automatic) {
tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
} else {
userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
}
}
if (!async && freeOffsets) {
taosArrayDestroy(pOffsets);
}
return code;
}
#endif
void tmqAssignAskEpTask(void* param, void* tmrId) { void tmqAssignAskEpTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param; tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
...@@ -1839,13 +1685,21 @@ int32_t tmq_consumer_close(tmq_t* tmq) { ...@@ -1839,13 +1685,21 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
return rsp; return rsp;
} }
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new(); tmq_list_t* lst = tmq_list_new();
rsp = tmq_subscribe(tmq, lst); while (1) {
rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
retryCnt++;
taosMsleep(500);
}
}
tmq_list_destroy(lst); tmq_list_destroy(lst);
if (rsp != 0) { return rsp;
return rsp;
}
} }
// TODO: free resources // TODO: free resources
return 0; return 0;
......
...@@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -389,7 +389,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4); tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeStreamThreads = tsNumOfCores; tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
......
...@@ -812,12 +812,14 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p ...@@ -812,12 +812,14 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
ASSERT(0);
goto _OVER; goto _OVER;
} }
// drop stream // drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
ASSERT(0);
goto _OVER; goto _OVER;
} }
} }
...@@ -832,6 +834,9 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p ...@@ -832,6 +834,9 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
code = 0; code = 0;
_OVER: _OVER:
if(code != 0) {
ASSERT(0);
}
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseVgroup(pMnode, pVgroup); mndReleaseVgroup(pMnode, pVgroup);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
...@@ -930,6 +935,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) { ...@@ -930,6 +935,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} else { } else {
terrno = TSDB_CODE_MND_SMA_NOT_EXIST; terrno = TSDB_CODE_MND_SMA_NOT_EXIST;
ASSERT(0);
goto _OVER; goto _OVER;
} }
} }
......
...@@ -599,9 +599,11 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { ...@@ -599,9 +599,11 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
for (int i = 0; i < pSW->number; ++i) { for (int i = 0; i < pSW->number; ++i) {
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i); smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
if (metaGetTableEntryByUid(&mr, smaId) < 0) { if (metaGetTableEntryByUid(&mr, smaId) < 0) {
tDecoderClear(&mr.coder);
metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId); metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
continue; continue;
} }
tDecoderClear(&mr.coder);
pTSma = pSW->tSma + smaIdx; pTSma = pSW->tSma + smaIdx;
memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma)); memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
if (deepCopy) { if (deepCopy) {
......
...@@ -92,7 +92,7 @@ static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { ...@@ -92,7 +92,7 @@ static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) {
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
// Note: free/kill may in RC // Note: free/kill may in RC
if (!taskHandle) return; if (!taskHandle || !(*taskHandle)) return;
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level); smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level);
...@@ -848,6 +848,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { ...@@ -848,6 +848,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
terrstr()); terrstr());
goto _err; goto _err;
} }
tDecoderClear(&mr.coder);
ASSERT(mr.me.type == TSDB_SUPER_TABLE); ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == suid); ASSERT(mr.me.uid == suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) { if (TABLE_IS_ROLLUP(mr.me.flags)) {
...@@ -1336,6 +1337,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1336,6 +1337,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK);
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
tdCleanupStreamInputDataBlock(pItem->taskInfo);
tdUnRefRSmaInfo(pSma, pRSmaInfo); tdUnRefRSmaInfo(pSma, pRSmaInfo);
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); // atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
......
...@@ -216,9 +216,11 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -216,9 +216,11 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
if (offset.val.type == TMQ_OFFSET__LOG) { if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
if (walRefVer(pHandle->pRef, offset.val.version) < 0) { if (pHandle) {
ASSERT(0); if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
return -1; ASSERT(0);
return -1;
}
} }
} }
...@@ -515,7 +517,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -515,7 +517,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
// todo lock // todo lock
STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
ASSERT(req.oldConsumerId == -1); if (req.oldConsumerId != -1) {
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
req.newConsumerId, req.oldConsumerId);
}
ASSERT(req.newConsumerId != -1); ASSERT(req.newConsumerId != -1);
STqHandle tqHandle = {0}; STqHandle tqHandle = {0};
pHandle = &tqHandle; pHandle = &tqHandle;
...@@ -604,7 +609,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ...@@ -604,7 +609,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
} }
pTask->execStatus = TASK_EXEC_STATUS__IDLE; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(); pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen();
...@@ -720,7 +725,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { ...@@ -720,7 +725,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
continue; continue;
} }
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) { if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId); qError("stream task launch failed, task id %d", pTask->taskId);
continue; continue;
} }
...@@ -751,12 +756,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -751,12 +756,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
ASSERT(0);
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamDispatchReq req; SStreamDispatchReq req;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
......
...@@ -136,31 +136,6 @@ int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) { ...@@ -136,31 +136,6 @@ int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) {
return 0; return 0;
} }
int32_t tqEnqueueAll(STQ* pTq, SSubmitReq* pReq) {
void* pIter = NULL;
SStreamDataSubmit* pSubmit = streamDataSubmitNew(pReq);
if (pSubmit == NULL) {
return -1;
}
while (1) {
pIter = taosHashIterate(pTq->handles, pIter);
if (pIter == NULL) break;
STqHandle* pHandle = (STqHandle*)pIter;
if (tqEnqueue(pHandle, pSubmit) < 0) {
continue;
}
int8_t execStatus = atomic_load_8(&pHandle->pushHandle.execStatus);
if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) {
tqSendExecReq(pTq, pHandle);
}
}
streamDataSubmitRefDec(pSubmit);
return 0;
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) { int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
if (msgType != TDMT_VND_SUBMIT) return 0; if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL; void* pIter = NULL;
......
...@@ -448,7 +448,8 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -448,7 +448,8 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
} }
vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), code); vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
code);
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
if (code != 0 && terrno == 0) { if (code != 0 && terrno == 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
...@@ -629,8 +630,8 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void ...@@ -629,8 +630,8 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, pVnode->config.vgId, isApply, vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code); vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
...@@ -707,7 +708,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -707,7 +708,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
} }
setPingTimerMS(pVnode->sync, 5000); setPingTimerMS(pVnode->sync, 5000);
setElectTimerMS(pVnode->sync, 1300); setElectTimerMS(pVnode->sync, 2800);
setHeartbeatTimerMS(pVnode->sync, 900); setHeartbeatTimerMS(pVnode->sync, 900);
return 0; return 0;
} }
......
...@@ -632,7 +632,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* ...@@ -632,7 +632,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
tListLen(pExp->pExpr->_function.functionName)); tListLen(pExp->pExpr->_function.functionName));
#if 1 #if 1
// todo refactor: add the parameter for tbname function // todo refactor: add the parameter for tbname function
if (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0) { if (!pFuncNode->pParameterList && (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0)) {
pFuncNode->pParameterList = nodesMakeList(); pFuncNode->pParameterList = nodesMakeList();
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0); ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
......
...@@ -93,6 +93,29 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -93,6 +93,29 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
} }
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) {
return;
}
SOperatorInfo* pOptrInfo = pTaskInfo->pRoot->pDownstream[0];
if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOptrInfo->info;
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBlockLists); ++i) {
SSDataBlock* p = *(SSDataBlock**)taosArrayGet(pInfo->pBlockLists, i);
taosArrayDestroy(p->pDataBlock);
taosMemoryFreeClear(p);
}
} else {
ASSERT(0);
}
} else {
ASSERT(0);
}
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
...@@ -104,7 +127,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO ...@@ -104,7 +127,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo)); int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
...@@ -193,6 +216,8 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S ...@@ -193,6 +216,8 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
continue; continue;
} }
tDecoderClear(&mr.coder);
// TODO handle ntb case // TODO handle ntb case
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) { if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
continue; continue;
...@@ -241,7 +266,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -241,7 +266,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
// todo refactor STableList // todo refactor STableList
bool assignUid = false; bool assignUid = false;
size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0; size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
char* keyBuf = NULL; char* keyBuf = NULL;
if (bufLen > 0) { if (bufLen > 0) {
......
...@@ -3330,18 +3330,16 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { ...@@ -3330,18 +3330,16 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
} }
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
if (pExpr) { for (int32_t i = 0; i < numOfExprs; ++i) {
for (int32_t i = 0; i < numOfExprs; ++i) { SExprInfo* pExprInfo = &pExpr[i];
SExprInfo* pExprInfo = &pExpr[i]; for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) { taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
}
} }
taosMemoryFree(pExprInfo->base.pParam);
taosMemoryFree(pExprInfo->pExpr);
} }
taosMemoryFree(pExprInfo->base.pParam);
taosMemoryFree(pExprInfo->pExpr);
} }
} }
...@@ -3483,9 +3481,8 @@ void cleanupExprSupp(SExprSupp* pSupp) { ...@@ -3483,9 +3481,8 @@ void cleanupExprSupp(SExprSupp* pSupp) {
destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs); destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
if (pSupp->pExprInfo != NULL) { if (pSupp->pExprInfo != NULL) {
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs); destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
taosMemoryFreeClear(pSupp->pExprInfo);
} }
taosMemoryFreeClear(pSupp->pExprInfo);
taosMemoryFree(pSupp->rowEntryInfoOffset); taosMemoryFree(pSupp->rowEntryInfoOffset);
} }
...@@ -4076,6 +4073,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4076,6 +4073,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser); ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
if (ops[i] == NULL) { if (ops[i] == NULL) {
taosMemoryFree(ops);
return NULL; return NULL;
} else { } else {
ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId; ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
...@@ -4517,7 +4515,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4517,7 +4515,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
return code; return code;
_complete: _complete:
taosMemoryFreeClear(*pTaskInfo); doDestroyTask(*pTaskInfo);
terrno = code; terrno = code;
return code; return code;
} }
......
...@@ -758,6 +758,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con ...@@ -758,6 +758,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid, int32_t* rowLen, con
} }
} else if (mr.me.type == TSDB_CHILD_TABLE) { } else if (mr.me.type == TSDB_CHILD_TABLE) {
uint64_t suid = mr.me.ctbEntry.suid; uint64_t suid = mr.me.ctbEntry.suid;
tDecoderClear(&mr.coder);
code = metaGetTableEntryByUid(&mr, suid); code = metaGetTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr); qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno), idstr);
...@@ -1493,6 +1494,11 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -1493,6 +1494,11 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
if (pStreamScan->pColMatchInfo) { if (pStreamScan->pColMatchInfo) {
taosArrayDestroy(pStreamScan->pColMatchInfo); taosArrayDestroy(pStreamScan->pColMatchInfo);
} }
if (pStreamScan->pPseudoExpr) {
destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
taosMemoryFreeClear(pStreamScan->pPseudoExpr);
}
updateInfoDestroy(pStreamScan->pUpdateInfo); updateInfoDestroy(pStreamScan->pUpdateInfo);
blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pRes);
blockDataDestroy(pStreamScan->pUpdateRes); blockDataDestroy(pStreamScan->pUpdateRes);
...@@ -2505,6 +2511,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -2505,6 +2511,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos); STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
int32_t code = metaGetTableEntryByUid(&mr, item->uid); int32_t code = metaGetTableEntryByUid(&mr, item->uid);
tDecoderClear(&mr.coder);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
......
...@@ -1094,6 +1094,7 @@ static SColumnInfoData* doVectorConvert(SScalarParam* pInput, int32_t* doConvert ...@@ -1094,6 +1094,7 @@ static SColumnInfoData* doVectorConvert(SScalarParam* pInput, int32_t* doConvert
static void doReleaseVec(SColumnInfoData* pCol, int32_t type) { static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
if (type == VECTOR_DO_CONVERT) { if (type == VECTOR_DO_CONVERT) {
colDataDestroy(pCol); colDataDestroy(pCol);
taosMemoryFree(pCol);
} }
} }
......
...@@ -47,7 +47,7 @@ void streamCleanUp() { ...@@ -47,7 +47,7 @@ void streamCleanUp() {
} }
} }
void streamTriggerByTimer(void* param, void* tmrId) { void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param; SStreamTask* pTask = (void*)param;
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
...@@ -68,28 +68,30 @@ void streamTriggerByTimer(void* param, void* tmrId) { ...@@ -68,28 +68,30 @@ void streamTriggerByTimer(void* param, void* tmrId) {
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
streamTaskInput(pTask, (SStreamQueueItem*)trigger); streamTaskInput(pTask, (SStreamQueueItem*)trigger);
streamLaunchByWrite(pTask, pTask->nodeId); streamSchedExec(pTask);
} }
taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
} }
int32_t streamSetupTrigger(SStreamTask* pTask) { int32_t streamSetupTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) { if (pTask->triggerParam != 0) {
pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
} }
return 0; return 0;
} }
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { int32_t streamSchedExec(SStreamTask* pTask) {
int8_t execStatus = atomic_load_8(&pTask->execStatus); int8_t schedStatus =
if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) { atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) return -1; if (pRunReq == NULL) {
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
// TODO: do we need htonl? return -1;
pRunReq->head.vgId = vgId; }
pRunReq->head.vgId = pTask->nodeId;
pRunReq->streamId = pTask->streamId; pRunReq->streamId = pTask->streamId;
pRunReq->taskId = pTask->taskId; pRunReq->taskId = pTask->taskId;
SRpcMsg msg = { SRpcMsg msg = {
...@@ -182,14 +184,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -182,14 +184,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
streamTaskEnqueue(pTask, pReq, pRsp); streamTaskEnqueue(pTask, pReq, pRsp);
if (exec) { if (exec) {
streamExec(pTask); streamTryExec(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) { if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
streamDispatch(pTask); streamDispatch(pTask);
} }
} else { } else {
streamLaunchByWrite(pTask, pTask->nodeId); streamSchedExec(pTask);
} }
return 0; return 0;
...@@ -219,7 +220,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { ...@@ -219,7 +220,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
} }
int32_t streamProcessRunReq(SStreamTask* pTask) { int32_t streamProcessRunReq(SStreamTask* pTask) {
streamExec(pTask); streamTryExec(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) { if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatch(pTask); streamDispatch(pTask);
...@@ -272,10 +273,12 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S ...@@ -272,10 +273,12 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
streamTaskEnqueueRetrieve(pTask, pReq, pRsp); streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->execType != TASK_EXEC__NONE); ASSERT(pTask->execType != TASK_EXEC__NONE);
streamExec(pTask); streamSchedExec(pTask);
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); /*streamTryExec(pTask);*/
streamDispatch(pTask);
/*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/
/*streamDispatch(pTask);*/
return 0; return 0;
} }
......
...@@ -440,13 +440,13 @@ FAIL: ...@@ -440,13 +440,13 @@ FAIL:
int32_t streamDispatch(SStreamTask* pTask) { int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
#if 1 ASSERT(pTask->sinkType == TASK_SINK__NONE);
int8_t old = int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) { if (old != TASK_OUTPUT_STATUS__NORMAL) {
return 0; return 0;
} }
#endif
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -466,22 +466,8 @@ int32_t streamDispatch(SStreamTask* pTask) { ...@@ -466,22 +466,8 @@ int32_t streamDispatch(SStreamTask* pTask) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
goto FREE; goto FREE;
} }
/*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/
FREE: FREE:
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock); taosFreeQitem(pBlock);
#if 0
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return -1;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
tmsgSendReq(pEpSet, &dispatchMsg);
#endif
return code; return code;
} }
...@@ -147,24 +147,23 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { ...@@ -147,24 +147,23 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
return 0; return 0;
} }
// TODO: handle version
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
int32_t cnt = 1; int32_t cnt = 1;
void* data = NULL; void* data = NULL;
while (1) { while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) { if (qItem == NULL) {
qDebug("stream exec over, queue empty"); qDebug("stream task exec over, queue empty, task: %d", pTask->taskId);
break; break;
} }
if (data == NULL) { if (data == NULL) {
data = qItem; data = qItem;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->execType == TASK_EXEC__NONE) break; if (pTask->execType == TASK_EXEC__NONE) {
/*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/ break;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ }
/*}*/
} else { } else {
void* newRet; void* newRet;
if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) { if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) {
...@@ -181,11 +180,12 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -181,11 +180,12 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (pTask->taskStatus == TASK_STATUS__DROPPING) { if (pTask->taskStatus == TASK_STATUS__DROPPING) {
if (data) streamFreeQitem(data); if (data) streamFreeQitem(data);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0;
return NULL;
} }
if (data == NULL) break; if (data == NULL) {
break;
}
if (pTask->execType == TASK_EXEC__NONE) { if (pTask->execType == TASK_EXEC__NONE) {
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
...@@ -193,6 +193,8 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -193,6 +193,8 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
continue; continue;
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
streamTaskExecImpl(pTask, data, pRes); streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId); qDebug("stream task %d exec end", pTask->taskId);
...@@ -203,76 +205,44 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -203,76 +205,44 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
// TODO log failed ver // TODO log failed ver
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
return NULL; return -1;
} }
qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes; qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) {
// TODO log failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return NULL;
}
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) { if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qRes->childId = pTask->selfChildId; qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver; qRes->sourceVer = pSubmit->ver;
} }
if (streamTaskOutput(pTask, qRes) < 0) {
// TODO save failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return -1;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/ /*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes = taosArrayInit(0, sizeof(SSDataBlock));
} }
streamFreeQitem(data);
} }
return pRes; return 0;
} }
// TODO: handle version int32_t streamTryExec(SStreamTask* pTask) {
int32_t streamExec(SStreamTask* pTask) { int8_t schedStatus =
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
if (pRes == NULL) return -1; if (schedStatus == TASK_SCHED_STATUS__WAITING) {
while (1) { int32_t code = streamExecForAll(pTask);
int8_t execStatus = if (code < 0) {
atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING); atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__FAILED);
if (execStatus == TASK_EXEC_STATUS__IDLE) { return -1;
// first run }
qDebug("stream exec, enter exec status"); atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
// temporarily disable status closing, since it runs out of threads
#if 0
// set status closing
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
// second run, make sure inputQ and qall are cleared
qDebug("stream exec, enter closing status");
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
#endif
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); if (!taosQueueEmpty(pTask->inputQueue->queue)) {
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); streamSchedExec(pTask);
qDebug("stream exec, return result");
return 0;
} else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
continue;
} else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
ASSERT(taosArrayGetSize(pRes) == 0);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
} else {
ASSERT(0);
} }
} }
FAIL: return 0;
if (pRes) taosArrayDestroy(pRes);
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
tFreeSStreamTask(pTask);
return 0;
} else {
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return -1;
}
} }
...@@ -35,9 +35,10 @@ FAIL: ...@@ -35,9 +35,10 @@ FAIL:
void streamQueueClose(SStreamQueue* queue) { void streamQueueClose(SStreamQueue* queue) {
while (1) { while (1) {
void* qItem = streamQueueNextItem(queue); void* qItem = streamQueueNextItem(queue);
if (qItem) if (qItem) {
taosFreeQitem(qItem); taosFreeQitem(qItem);
else } else {
return; return;
}
} }
} }
...@@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { ...@@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
} }
pTask->taskId = tGenIdPI32(); pTask->taskId = tGenIdPI32();
pTask->streamId = streamId; pTask->streamId = streamId;
pTask->execStatus = TASK_EXEC_STATUS__IDLE; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
...@@ -59,7 +59,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -59,7 +59,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1; if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1; if (tEncodeI8(pEncoder, pTask->schedStatus) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
...@@ -114,7 +114,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -114,7 +114,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->schedStatus) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
......
...@@ -28,13 +28,13 @@ extern "C" { ...@@ -28,13 +28,13 @@ extern "C" {
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF #define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000 #define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 5000 #define PING_TIMER_MS 5000
#define ELECT_TIMER_MS_MIN 1300 #define ELECT_TIMER_MS_MIN 5000
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 900 #define HEARTBEAT_TIMER_MS 900
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
......
...@@ -730,7 +730,8 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs ...@@ -730,7 +730,8 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs
for (int i = 0; i < arrSize; ++i) { for (int i = 0; i < arrSize; ++i) {
do { do {
char eventLog[128]; char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "propose message, type:%s batch:%d", TMSG_INFO(pMsgPArr[i]->msgType), arrSize); snprintf(eventLog, sizeof(eventLog), "propose message, type:%s batch:%d", TMSG_INFO(pMsgPArr[i]->msgType),
arrSize);
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
} while (0); } while (0);
...@@ -834,7 +835,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -834,7 +835,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
ret = 1; ret = 1;
sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType)); sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType));
} else { } else {
ret = -1; ret = -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
...@@ -1114,7 +1116,7 @@ void syncNodeStart(SSyncNode* pSyncNode) { ...@@ -1114,7 +1116,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
} }
int32_t ret = 0; int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode); // ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0); ASSERT(ret == 0);
} }
...@@ -1250,6 +1252,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { ...@@ -1250,6 +1252,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
&pSyncNode->pElectTimer); &pSyncNode->pElectTimer);
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
} else { } else {
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
} }
...@@ -1281,6 +1290,14 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) { ...@@ -1281,6 +1290,14 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
} }
ret = syncNodeRestartElectTimer(pSyncNode, electMS); ret = syncNodeRestartElectTimer(pSyncNode, electMS);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine,
2 * pSyncNode->electBaseLine, electMS);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
return ret; return ret;
} }
...@@ -1293,6 +1310,13 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { ...@@ -1293,6 +1310,13 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
} else { } else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
} }
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
return ret; return ret;
} }
...@@ -1304,6 +1328,13 @@ int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) { ...@@ -1304,6 +1328,13 @@ int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) {
} else { } else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
} }
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "start heartbeat timer, ms:%d", 1);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
return ret; return ret;
} }
...@@ -1312,6 +1343,8 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { ...@@ -1312,6 +1343,8 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1);
taosTmrStop(pSyncNode->pHeartbeatTimer); taosTmrStop(pSyncNode->pHeartbeatTimer);
pSyncNode->pHeartbeatTimer = NULL; pSyncNode->pHeartbeatTimer = NULL;
sTrace("vgId:%d, stop heartbeat timer", pSyncNode->vgId);
return ret; return ret;
} }
...@@ -1559,12 +1592,13 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1559,12 +1592,13 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
", sby:%d, " ", sby:%d, "
"stgy:%d, bch:%d, " "stgy:%d, bch:%d, "
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", "lcfg:%" PRId64 ", chging:%d, rsto:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing,
pSyncNode->restoreFinish, printStr); pSyncNode->restoreFinish, pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser,
printStr);
} else { } else {
snprintf(logBuf, sizeof(logBuf), "%s", str); snprintf(logBuf, sizeof(logBuf), "%s", str);
} }
...@@ -1894,7 +1928,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde ...@@ -1894,7 +1928,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// Raft 3.6.2 Committing entries from previous terms // Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode); syncNodeAppendNoop(pSyncNode);
#if 0 // simon #if 0 // simon
syncNodeReplicate(pSyncNode); syncNodeReplicate(pSyncNode);
#endif #endif
syncMaybeAdvanceCommitIndex(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode);
......
...@@ -141,7 +141,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { ...@@ -141,7 +141,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
", match-index:%d, raftid:%" PRId64, ", match-index:%d, raftid:%" PRId64,
pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
syncNodeRestartNowHeartbeatTimer(pSyncNode); // syncNodeRestartNowHeartbeatTimer(pSyncNode);
syncNodeStartNowHeartbeatTimer(pSyncNode);
return -1; return -1;
} }
......
...@@ -48,14 +48,16 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ...@@ -48,14 +48,16 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) { if (atomic_load_64(&ths->electTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->electTimerCounter); ++(ths->electTimerCounter);
sInfo("vgId:%d, sync timeout, type:election count:%d", ths->vgId, ths->electTimerCounter); sInfo("vgId:%d, sync timeout, type:election count:%d, electTimerLogicClockUser:%ld", ths->vgId,
ths->electTimerCounter, ths->electTimerLogicClockUser);
syncNodeElect(ths); syncNodeElect(ths);
} }
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) { if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->heartbeatTimerCounter); ++(ths->heartbeatTimerCounter);
sInfo("vgId:%d, sync timeout, type:replicate count:%d", ths->vgId, ths->heartbeatTimerCounter); sInfo("vgId:%d, sync timeout, type:replicate count:%d, heartbeatTimerLogicClockUser:%ld", ths->vgId,
ths->heartbeatTimerCounter, ths->heartbeatTimerLogicClockUser);
syncNodeReplicate(ths); syncNodeReplicate(ths);
} }
} else { } else {
......
...@@ -125,7 +125,10 @@ int32_t syncUtilRand(int32_t max) { return taosRand() % max; } ...@@ -125,7 +125,10 @@ int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) { int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
ASSERT(min > 0 && max > 0 && max >= min); ASSERT(min > 0 && max > 0 && max >= min);
return min + syncUtilRand(max - min); int32_t rdm = min + syncUtilRand(max - min);
// sDebug("random min:%d, max:%d, rdm:%d", min, max, rdm);
return rdm;
} }
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; } int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
......
...@@ -176,6 +176,8 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in ...@@ -176,6 +176,8 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
tdbBtcOpen(&btc, pBt, pTxn); tdbBtcOpen(&btc, pBt, pTxn);
tdbTrace("tdb insert, btc: %p, pTxn: %p", &btc, pTxn);
// move to the position to insert // move to the position to insert
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c); ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
if (ret < 0) { if (ret < 0) {
...@@ -214,6 +216,8 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) { ...@@ -214,6 +216,8 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
tdbBtcOpen(&btc, pBt, pTxn); tdbBtcOpen(&btc, pBt, pTxn);
tdbTrace("tdb delete, btc: %p, pTxn: %p", &btc, pTxn);
// move the cursor // move the cursor
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c); ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
if (ret < 0) { if (ret < 0) {
...@@ -244,6 +248,8 @@ int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, i ...@@ -244,6 +248,8 @@ int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, i
tdbBtcOpen(&btc, pBt, pTxn); tdbBtcOpen(&btc, pBt, pTxn);
tdbTrace("tdb upsert, btc: %p, pTxn: %p", &btc, pTxn);
// move the cursor // move the cursor
ret = tdbBtcMoveTo(&btc, pKey, nKey, &c); ret = tdbBtcMoveTo(&btc, pKey, nKey, &c);
if (ret < 0) { if (ret < 0) {
...@@ -283,10 +289,12 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL ...@@ -283,10 +289,12 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
int ret; int ret;
void *pTKey = NULL; void *pTKey = NULL;
void *pTVal = NULL; void *pTVal = NULL;
SCellDecoder cd; SCellDecoder cd = {0};
tdbBtcOpen(&btc, pBt, NULL); tdbBtcOpen(&btc, pBt, NULL);
tdbTrace("tdb pget, btc: %p", &btc);
ret = tdbBtcMoveTo(&btc, pKey, kLen, &cret); ret = tdbBtcMoveTo(&btc, pKey, kLen, &cret);
if (ret < 0) { if (ret < 0) {
tdbBtcClose(&btc); tdbBtcClose(&btc);
...@@ -295,6 +303,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL ...@@ -295,6 +303,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
if (btc.idx < 0 || cret) { if (btc.idx < 0 || cret) {
tdbBtcClose(&btc); tdbBtcClose(&btc);
return -1; return -1;
} }
...@@ -330,9 +339,13 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL ...@@ -330,9 +339,13 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
} }
if (TDB_CELLDECODER_FREE_VAL(&cd)) { if (TDB_CELLDECODER_FREE_VAL(&cd)) {
tdbDebug("tdb btc/pget/2 decoder: %p pVal free: %p", &cd, cd.pVal);
tdbFree(cd.pVal); tdbFree(cd.pVal);
} }
tdbTrace("tdb pget end, btc decoder: %p/0x%x, local decoder:%p", &btc.coder, btc.coder.freeKV, &cd);
tdbBtcClose(&btc); tdbBtcClose(&btc);
return 0; return 0;
...@@ -722,7 +735,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx ...@@ -722,7 +735,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
int szCell; int szCell;
SBtreeInitPageArg iarg; SBtreeInitPageArg iarg;
int iNew, nNewCells; int iNew, nNewCells;
SCellDecoder cd; SCellDecoder cd = {0};
iarg.pBt = pBt; iarg.pBt = pBt;
iarg.flags = TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]); iarg.flags = TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]);
...@@ -1235,6 +1248,8 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, ...@@ -1235,6 +1248,8 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
} }
TDB_CELLDECODER_SET_FREE_VAL(pDecoder); TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
tdbDebug("tdb btc decoder: %p/0x%x pVal: %p ", pDecoder, pDecoder->freeKV, pDecoder->pVal);
memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - kLen - sizeof(SPgno)); memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - kLen - sizeof(SPgno));
nLeft -= nLocal - kLen - sizeof(SPgno); nLeft -= nLocal - kLen - sizeof(SPgno);
...@@ -1376,6 +1391,9 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD ...@@ -1376,6 +1391,9 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
leaf = TDB_BTREE_PAGE_IS_LEAF(pPage); leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
// Clear the state of decoder // Clear the state of decoder
if (TDB_CELLDECODER_FREE_VAL(pDecoder)) {
tdbFree(pDecoder->pVal);
}
pDecoder->kLen = -1; pDecoder->kLen = -1;
pDecoder->pKey = NULL; pDecoder->pKey = NULL;
pDecoder->vLen = -1; pDecoder->vLen = -1;
...@@ -1383,6 +1401,8 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD ...@@ -1383,6 +1401,8 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
pDecoder->pgno = 0; pDecoder->pgno = 0;
TDB_CELLDECODER_SET_FREE_NIL(pDecoder); TDB_CELLDECODER_SET_FREE_NIL(pDecoder);
tdbDebug("tdb btc decoder set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
// 1. Decode header part // 1. Decode header part
if (!leaf) { if (!leaf) {
ASSERT(pPage->vLen == sizeof(SPgno)); ASSERT(pPage->vLen == sizeof(SPgno));
...@@ -1650,7 +1670,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) { ...@@ -1650,7 +1670,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
SCell *pCell; SCell *pCell;
SCellDecoder cd; SCellDecoder cd = {0};
void *pKey, *pVal; void *pKey, *pVal;
int ret; int ret;
...@@ -1696,7 +1716,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { ...@@ -1696,7 +1716,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
SCell *pCell; SCell *pCell;
SCellDecoder cd; SCellDecoder cd = {0};
void *pKey, *pVal; void *pKey, *pVal;
int ret; int ret;
...@@ -2037,7 +2057,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { ...@@ -2037,7 +2057,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
const void *pTKey; const void *pTKey;
int tkLen; int tkLen;
tdbTrace("ttl moveto, pager:%p, ipage:%d", pPager, pBtc->iPage); tdbTrace("tdb moveto, pager:%p, ipage:%d", pPager, pBtc->iPage);
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move from a clear cursor // move from a clear cursor
ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage, ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
...@@ -2093,6 +2113,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { ...@@ -2093,6 +2113,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
} }
// search downward to the leaf // search downward to the leaf
tdbTrace("tdb search downward, pager:%p, ipage:%d", pPager, pBtc->iPage);
for (;;) { for (;;) {
int lidx, ridx; int lidx, ridx;
SPage *pPage; SPage *pPage;
...@@ -2127,6 +2148,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { ...@@ -2127,6 +2148,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
} }
// binary search // binary search
tdbTrace("tdb binary search, pager:%p, ipage:%d", pPager, pBtc->iPage);
for (;;) { for (;;) {
if (lidx > ridx) break; if (lidx > ridx) break;
...@@ -2157,6 +2179,8 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { ...@@ -2157,6 +2179,8 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
} }
} }
tdbTrace("tdb moveto end, pager:%p, ipage:%d", pPager, pBtc->iPage);
return 0; return 0;
} }
...@@ -2175,6 +2199,12 @@ int tdbBtcClose(SBTC *pBtc) { ...@@ -2175,6 +2199,12 @@ int tdbBtcClose(SBTC *pBtc) {
pBtc->idx = pBtc->idxStack[pBtc->iPage]; pBtc->idx = pBtc->idxStack[pBtc->iPage];
} }
if (TDB_CELLDECODER_FREE_VAL(&pBtc->coder)) {
tdbDebug("tdb btc/close decoder: %p pVal free: %p", &pBtc->coder, pBtc->coder.pVal);
tdbFree(pBtc->coder.pVal);
}
return 0; return 0;
} }
......
...@@ -151,8 +151,8 @@ int walMetaDeserialize(SWal* pWal, const char* bytes); ...@@ -151,8 +151,8 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
// meta section end // meta section end
// seek section // seek section
int walChangeWrite(SWal* pWal, int64_t ver); int64_t walChangeWrite(SWal* pWal, int64_t ver);
int walInitWriteFile(SWal* pWal); int walInitWriteFile(SWal* pWal);
// seek section end // seek section end
int64_t walGetSeq(); int64_t walGetSeq();
......
...@@ -74,7 +74,7 @@ int walInitWriteFile(SWal* pWal) { ...@@ -74,7 +74,7 @@ int walInitWriteFile(SWal* pWal) {
return 0; return 0;
} }
int walChangeWrite(SWal* pWal, int64_t ver) { int64_t walChangeWrite(SWal* pWal, int64_t ver) {
int code; int code;
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
......
...@@ -208,6 +208,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -208,6 +208,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
taosCloseFile(&pIdxFile); taosCloseFile(&pIdxFile);
taosCloseFile(&pLogFile); taosCloseFile(&pLogFile);
taosFsyncFile(pWal->pLogFile);
taosFsyncFile(pWal->pIdxFile);
walSaveMeta(pWal); walSaveMeta(pWal);
// unlock // unlock
......
...@@ -832,8 +832,9 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { ...@@ -832,8 +832,9 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
if (pNode) { if (pNode) {
SHashEntry *pe = pHashObj->hashList[slot]; SHashEntry *pe = pHashObj->hashList[slot];
uint16_t prevRef = atomic_load_16(&pNode->refCount); /*uint16_t prevRef = atomic_load_16(&pNode->refCount);*/
uint16_t afterRef = atomic_add_fetch_16(&pNode->refCount, 1); uint16_t afterRef = atomic_add_fetch_16(&pNode->refCount, 1);
#if 0
ASSERT(prevRef < afterRef); ASSERT(prevRef < afterRef);
// the reference count value is overflow, which will cause the delete node operation immediately. // the reference count value is overflow, which will cause the delete node operation immediately.
...@@ -845,6 +846,8 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { ...@@ -845,6 +846,8 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) {
} else { } else {
data = GET_HASH_NODE_DATA(pNode); data = GET_HASH_NODE_DATA(pNode);
} }
#endif
data = GET_HASH_NODE_DATA(pNode);
if (afterRef >= MAX_WARNING_REF_COUNT) { if (afterRef >= MAX_WARNING_REF_COUNT) {
uWarn("hash entry ref count is abnormally high: %d", afterRef); uWarn("hash entry ref count is abnormally high: %d", afterRef);
......
...@@ -1339,8 +1339,9 @@ class Task(): ...@@ -1339,8 +1339,9 @@ class Task():
0x03A1, # STable [does] not exist 0x03A1, # STable [does] not exist
0x03AA, # Tag already exists 0x03AA, # Tag already exists
0x0603, # Table already exists 0x0603, # Table already exists
0x2603, # Table does not exist 0x2603, # Table does not exist, replaced by 2662 below
0x260d, # Tags number not matched 0x260d, # Tags number not matched
0x2662, # Table does not exist #TODO: what about 2603 above?
......
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include "taosudf.h"
DLL_EXPORT int32_t bit_and_init() {
return 0;
}
DLL_EXPORT int32_t bit_and_destroy() {
return 0;
}
DLL_EXPORT int32_t bit_and(SUdfDataBlock* block, SUdfColumn *resultCol) {
if (block->numOfCols < 2) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT)) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
if (udfColDataIsNull(block->udfCols[0], i)) {
udfColDataSetNull(resultCol, i);
continue;
}
int32_t result = *(int32_t*)udfColDataGetData(block->udfCols[0], i);
int j = 1;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
char* colData = udfColDataGetData(block->udfCols[j], i);
result &= *(int32_t*)colData;
}
if (j == block->numOfCols) {
udfColDataSet(resultCol, i, (char*)&result, false);
}
}
return TSDB_CODE_SUCCESS;
}
set +e
rm -rf /tmp/udf/libbitand.so /tmp/udf/libsqrsum.so
mkdir -p /tmp/udf
echo "compile udf bit_and and sqr_sum"
gcc -fPIC -shared sh/bit_and.c -o /tmp/udf/libbitand.so
gcc -fPIC -shared sh/sqr_sum.c -o /tmp/udf/libsqrsum.so
echo "debug show /tmp/udf/*.so"
ls /tmp/udf/*.so
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include "taosudf.h"
DLL_EXPORT int32_t sqr_sum_init() {
return 0;
}
DLL_EXPORT int32_t sqr_sum_destroy() {
return 0;
}
DLL_EXPORT int32_t sqr_sum_start(SUdfInterBuf *buf) {
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(double);
buf->numOfResult = 0;
return 0;
}
DLL_EXPORT int32_t sqr_sum(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
double sumSquares = *(double*)interBuf->buf;
int8_t numNotNull = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT ||
col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i];
if (udfColDataIsNull(col, j)) {
continue;
}
switch (col->colMeta.type) {
case TSDB_DATA_TYPE_INT: {
char* cell = udfColDataGetData(col, j);
int32_t num = *(int32_t*)cell;
sumSquares += (double)num * num;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
char* cell = udfColDataGetData(col, j);
double num = *(double*)cell;
sumSquares += num * num;
break;
}
default:
break;
}
++numNotNull;
}
}
*(double*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(double);
if (interBuf->numOfResult == 0 && numNotNull == 0) {
newInterBuf->numOfResult = 0;
} else {
newInterBuf->numOfResult = 1;
}
return 0;
}
DLL_EXPORT int32_t sqr_sum_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
if (buf->numOfResult == 0) {
resultData->numOfResult = 0;
return 0;
}
double sumSquares = *(double*)(buf->buf);
*(double*)(resultData->buf) = sqrt(sumSquares);
resultData->bufLen = sizeof(double);
resultData->numOfResult = 1;
return 0;
}
system_content printf %OS%
if $system_content == Windows_NT then
return 0;
endi
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c udf -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step1 udf
system sh/compile_udf.sh
sql create database udf vgroups 3;
sql use udf;
sql show databases;
sql create table t (ts timestamp, f int);
sql insert into t values(now, 1)(now+1s, 2);
system_content printf %OS%
if $system_content == Windows_NT then
return 0;
endi
if $system_content == Windows_NT then
sql create function bit_and as 'C:\\Windows\\Temp\\bitand.dll' outputtype int bufSize 8;
sql create aggregate function sqr_sum as 'C:\\Windows\\Temp\\sqrsum.dll' outputtype double bufSize 8;
else
sql create function bit_and as '/tmp/udf/libbitand.so' outputtype int bufSize 8;
sql create aggregate function sqr_sum as '/tmp/udf/libsqrsum.so' outputtype double bufSize 8;
endi
sql show functions;
if $rows != 2 then
return -1
endi
sql select bit_and(f, f) from t;
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data10 != 2 then
return -1
endi
sql select sqr_sum(f) from t;
if $rows != 1 then
print expect 1, actual $rows
return -1
endi
if $data00 != 2.236067977 then
return -1
endi
sql create table t2 (ts timestamp, f1 int, f2 int);
sql insert into t2 values(now, 0, 0)(now+1s, 1, 1);
sql select bit_and(f1, f2) from t2;
if $rows != 2 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data10 != 1 then
return -1
endi
sql select sqr_sum(f1, f2) from t2;
if $rows != 1 then
return -1
endi
if $data00 != 1.414213562 then
return -1
endi
sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2);
sql select bit_and(f1, f2) from t2;
print $rows , $data00 , $data10 , $data20 , $data30
if $rows != 4 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data10 != 1 then
return -1
endi
if $data20 != NULL then
return -1
endi
if $data30 != NULL then
return -1
endi
sql select sqr_sum(f1, f2) from t2;
print $rows, $data00
if $rows != 1 then
return -1
endi
if $data00 != 2.645751311 then
return -1
endi
sql insert into t2 values(now+4s, 4, 8)(now+5s, 5, 9);
sql select sqr_sum(f1-f2), sqr_sum(f1+f2) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1;
endi
if $data00 != 5.656854249 then
return -1
endi
if $data01 != 18.547236991 then
return -1
endi
sql select sqr_sum(bit_and(f2, f1)), sqr_sum(bit_and(f1, f2)) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1
endi
if $data00 != 1.414213562 then
return -1
endi
if $data01 != 1.414213562 then
return -1
endi
sql select sqr_sum(f2) from udf.t2 group by 1-bit_and(f1, f2) order by 1-bit_and(f1,f2);
print $rows , $data00 , $data10 , $data20
if $rows != 3 then
return -1
endi
if $data00 != 2.000000000 then
return -1
endi
if $data10 != 9.055385138 then
return -1
endi
if $data20 != 8.000000000 then
return -1
endi
sql drop function bit_and;
sql show functions;
if $rows != 1 then
return -1
endi
if $data00 != @sqr_sum@ then
return -1
endi
sql drop function sqr_sum;
sql show functions;
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -78,7 +78,7 @@ sql select tbcol4, length(tbcol4), lower(tbcol4), upper(tbcol4), ltrim(tbcol4), ...@@ -78,7 +78,7 @@ sql select tbcol4, length(tbcol4), lower(tbcol4), upper(tbcol4), ltrim(tbcol4),
sql select * from tb1 where tbcol not in (1,2,3,null); sql select * from tb1 where tbcol not in (1,2,3,null);
sql select * from tb1 where tbcol + 3 <> null; sql select * from tb1 where tbcol + 3 <> null;
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from tb1 where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0) sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from tb1 where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0)
#sql select tbcol5 - tbcol3 from tb1 sql select tbcol5 - tbcol3 from tb1
print =============== step4: stb print =============== step4: stb
sql select avg(tbcol) as c from stb sql select avg(tbcol) as c from stb
...@@ -109,7 +109,7 @@ sql select * from stb where tbcol + 3 <> null; ...@@ -109,7 +109,7 @@ sql select * from stb where tbcol + 3 <> null;
sql select count(tbcol), avg(tbcol), sum(tbcol), min(tbcol), max(tbcol), first(tbcol), last(tbcol) from stb where tbcol = 1 and tbcol2 = 1 and tbcol3 = 1 partition by tgcol interval(1d) sql select count(tbcol), avg(tbcol), sum(tbcol), min(tbcol), max(tbcol), first(tbcol), last(tbcol) from stb where tbcol = 1 and tbcol2 = 1 and tbcol3 = 1 partition by tgcol interval(1d)
sql select _wstart, count(*) from tb1 session(ts, 1m) sql select _wstart, count(*) from tb1 session(ts, 1m)
sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from stb where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0) sql select count(tbcol), avg(tbcol), max(tbcol), min(tbcol), count(tbcol) from stb where ts <= 1601481840000 and ts >= 1601481800000 partition by tgcol interval(1m) fill(value, 0)
#sql select tbcol5 - tbcol3 from stb sql select tbcol5 - tbcol3 from stb
print =============== step5: explain print =============== step5: explain
sql explain analyze select ts from stb where -2; sql explain analyze select ts from stb where -2;
......
from tabnanny import check
import taos import taos
import time import time
import inspect import inspect
import traceback import traceback
import socket import socket
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
from util.log import * from util.log import *
from util.sql import * from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.common import *
PRIVILEGES_ALL = "ALL" PRIVILEGES_ALL = "ALL"
PRIVILEGES_READ = "READ" PRIVILEGES_READ = "READ"
...@@ -21,17 +22,40 @@ WEIGHT_WRITE = 3 ...@@ -21,17 +22,40 @@ WEIGHT_WRITE = 3
PRIMARY_COL = "ts" PRIMARY_COL = "ts"
INT_COL = "c1" INT_COL = "c_int"
BINT_COL = "c2" BINT_COL = "c_bint"
SINT_COL = "c3" SINT_COL = "c_sint"
TINT_COL = "c4" TINT_COL = "c_tint"
FLOAT_COL = "c5" FLOAT_COL = "c_float"
DOUBLE_COL = "c6" DOUBLE_COL = "c_double"
BOOL_COL = "c7" BOOL_COL = "c_bool"
TINT_UN_COL = "c_utint"
BINARY_COL = "c8" SINT_UN_COL = "c_usint"
NCHAR_COL = "c9" BINT_UN_COL = "c_ubint"
TS_COL = "c10" INT_UN_COL = "c_uint"
BINARY_COL = "c_binary"
NCHAR_COL = "c_nchar"
TS_COL = "c_ts"
NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
CHAR_COL = [BINARY_COL, NCHAR_COL, ]
BOOLEAN_COL = [BOOL_COL, ]
TS_TYPE_COL = [TS_COL, ]
INT_TAG = "t_int"
ALL_COL = [PRIMARY_COL, INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BINARY_COL, NCHAR_COL, BOOL_COL, TS_COL]
TAG_COL = [INT_TAG]
# insert data args:
TIME_STEP = 10000
NOW = int(datetime.timestamp(datetime.now()) * 1000)
# init db/table
DBNAME = "db"
STBNAME = "stb1"
CTBNAME = "ct1"
NTBNAME = "nt1"
class TDconnect: class TDconnect:
def __init__(self, def __init__(self,
...@@ -247,25 +271,26 @@ class TDTestCase: ...@@ -247,25 +271,26 @@ class TDTestCase:
with taos_connect(user=user.name, passwd=user.passwd) as use: with taos_connect(user=user.name, passwd=user.passwd) as use:
time.sleep(2) time.sleep(2)
if check_priv == PRIVILEGES_ALL: if check_priv == PRIVILEGES_ALL:
use.query("use db") use.query(f"use {DBNAME}")
use.query("show tables") use.query(f"show {DBNAME}.tables")
use.query("select * from ct1") use.query(f"select * from {DBNAME}.{CTBNAME}")
use.query("insert into t1 (ts) values (now())") use.query(f"insert into {DBNAME}.{CTBNAME} (ts) values (now())")
elif check_priv == PRIVILEGES_READ: elif check_priv == PRIVILEGES_READ:
use.query("use db") use.query(f"use {DBNAME}")
use.query("show tables") use.query(f"show {DBNAME}.tables")
use.query("select * from ct1") use.query(f"select * from {DBNAME}.{CTBNAME}")
use.error("insert into t1 (ts) values (now())") use.error(f"insert into {DBNAME}.{CTBNAME} (ts) values (now())")
elif check_priv == PRIVILEGES_WRITE: elif check_priv == PRIVILEGES_WRITE:
use.query("use db") use.query(f"use {DBNAME}")
use.query("show tables") use.query(f"show {DBNAME}.tables")
use.error("select * from ct1") use.error(f"select * from {DBNAME}.{CTBNAME}")
use.query("insert into t1 (ts) values (now())") use.query(f"insert into {DBNAME}.{CTBNAME} (ts) values (now())")
elif check_priv is None: elif check_priv is None:
use.error("use db") use.error(f"use {DBNAME}")
use.error("show tables") # use.error(f"show {DBNAME}.tables")
use.error("select * from db.ct1") use.error(f"show tables")
use.error("insert into db.t1 (ts) values (now())") use.error(f"select * from {DBNAME}.{CTBNAME}")
use.error(f"insert into {DBNAME}.{CTBNAME} (ts) values (now())")
def __change_user_priv(self, user: User, pre_priv, invoke=False): def __change_user_priv(self, user: User, pre_priv, invoke=False):
if user.priv == pre_priv and invoke : if user.priv == pre_priv and invoke :
...@@ -418,7 +443,7 @@ class TDTestCase: ...@@ -418,7 +443,7 @@ class TDTestCase:
self.__grant_user_privileges(privilege="", dbname="db", user_name=self.__user_list[0]) , self.__grant_user_privileges(privilege="", dbname="db", user_name=self.__user_list[0]) ,
self.__grant_user_privileges(privilege=" ".join(self.__privilege), user_name=self.__user_list[0]) , self.__grant_user_privileges(privilege=" ".join(self.__privilege), user_name=self.__user_list[0]) ,
f"GRANT {self.__privilege[0]} ON * TO {self.__user_list[0]}" , f"GRANT {self.__privilege[0]} ON * TO {self.__user_list[0]}" ,
f"GRANT {self.__privilege[0]} ON db.t1 TO {self.__user_list[0]}" , f"GRANT {self.__privilege[0]} ON {DBNAME}.{NTBNAME} TO {self.__user_list[0]}" ,
] ]
def __revoke_err(self): def __revoke_err(self):
...@@ -430,7 +455,7 @@ class TDTestCase: ...@@ -430,7 +455,7 @@ class TDTestCase:
self.__revoke_user_privileges(privilege="", dbname="db", user_name=self.__user_list[0]) , self.__revoke_user_privileges(privilege="", dbname="db", user_name=self.__user_list[0]) ,
self.__revoke_user_privileges(privilege=" ".join(self.__privilege), user_name=self.__user_list[0]) , self.__revoke_user_privileges(privilege=" ".join(self.__privilege), user_name=self.__user_list[0]) ,
f"REVOKE {self.__privilege[0]} ON * FROM {self.__user_list[0]}" , f"REVOKE {self.__privilege[0]} ON * FROM {self.__user_list[0]}" ,
f"REVOKE {self.__privilege[0]} ON db.t1 FROM {self.__user_list[0]}" , f"REVOKE {self.__privilege[0]} ON {DBNAME}.{NTBNAME} FROM {self.__user_list[0]}" ,
] ]
def test_grant_err(self): def test_grant_err(self):
...@@ -505,101 +530,48 @@ class TDTestCase: ...@@ -505,101 +530,48 @@ class TDTestCase:
self.drop_user_error() self.drop_user_error()
self.drop_user_current() self.drop_user_current()
def __create_tb(self): def __create_tb(self, stb=STBNAME, ctb_num=20, ntbnum=1, dbname=DBNAME):
tdLog.printNoPrefix("==========step: create table")
tdLog.printNoPrefix("==========step1:create table") create_stb_sql = f'''create table {dbname}.{stb}(
create_stb_sql = f'''create table stb1( {PRIMARY_COL} timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
) tags (t1 int) {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
''' ) tags ({INT_TAG} int)
create_ntb_sql = f'''create table t1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
)
''' '''
tdSql.execute(create_stb_sql) tdSql.execute(create_stb_sql)
tdSql.execute(create_ntb_sql)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
{ i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}
def __insert_data(self, rows):
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(rows):
tdSql.execute(
f"insert into ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f"insert into ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f"insert into ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f'''insert into ct1 values
( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } )
( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } )
'''
)
tdSql.execute( for i in range(ntbnum):
f'''insert into ct4 values create_ntb_sql = f'''create table {dbname}.nt{i+1}(
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) {PRIMARY_COL} timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
( {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{ now_time + 5184000000}, {pow(2,31)-pow(2,15)}, {pow(2,63)-pow(2,30)}, 32767, 127, {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
{ 3.3 * pow(10,38) }, { 1.3 * pow(10,308) }, { rows % 2 }, "binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000}
) )
( '''
{ now_time + 2592000000 }, {pow(2,31)-pow(2,16)}, {pow(2,63)-pow(2,31)}, 32766, 126, tdSql.execute(create_ntb_sql)
{ 3.2 * pow(10,38) }, { 1.2 * pow(10,308) }, { (rows-1) % 2 }, "binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000}
)
'''
)
tdSql.execute( for i in range(ctb_num):
f'''insert into ct2 values tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )')
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) def __insert_data(self, rows, ctb_num=20, dbname=DBNAME, star_time=NOW):
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) tdLog.printNoPrefix("==========step: start inser data into tables now.....")
( # from ...pytest.util.common import DataSet
{ now_time + 5184000000 }, { -1 * pow(2,31) + pow(2,15) }, { -1 * pow(2,63) + pow(2,30) }, -32766, -126, data = DataSet()
{ -1 * 3.2 * pow(10,38) }, { -1.2 * pow(10,308) }, { rows % 2 }, "binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000 } data.get_order_set(rows)
)
(
{ now_time + 2592000000 }, { -1 * pow(2,31) + pow(2,16) }, { -1 * pow(2,63) + pow(2,31) }, -32767, -127,
{ - 3.3 * pow(10,38) }, { -1.3 * pow(10,308) }, { (rows-1) % 2 }, "binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000 }
)
'''
)
for i in range(rows): for i in range(rows):
insert_data = f'''insert into t1 values row_data = f'''
( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}, {data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
"binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } ) {data.bool_data[i]}, '{data.vchar_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.utint_data[i]},
''' {data.usint_data[i]}, {data.uint_data[i]}, {data.ubint_data[i]}
tdSql.execute(insert_data)
tdSql.execute(
f'''insert into t1 values
( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7200000 }, { pow(2,31) - pow(2,15) }, { pow(2,63) - pow(2,30) }, 32767, 127,
{ 3.3 * pow(10,38) }, { 1.3 * pow(10,308) }, { rows % 2 },
"binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000 }
)
(
{ now_time + 3600000 } , { pow(2,31) - pow(2,16) }, { pow(2,63) - pow(2,31) }, 32766, 126,
{ 3.2 * pow(10,38) }, { 1.2 * pow(10,308) }, { (rows-1) % 2 },
"binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000 }
)
''' '''
) tdSql.execute( f"insert into {dbname}.{NTBNAME} values ( {star_time - i * int(TIME_STEP * 1.2)}, {row_data} )" )
for j in range(ctb_num):
tdSql.execute( f"insert into {dbname}.ct{j+1} values ( {star_time - j * i * TIME_STEP}, {row_data} )" )
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
...@@ -656,27 +628,81 @@ class TDTestCase: ...@@ -656,27 +628,81 @@ class TDTestCase:
with taos_connect(user=self.__user_list[0], passwd=f"new{self.__passwd_list[0]}") as user: with taos_connect(user=self.__user_list[0], passwd=f"new{self.__passwd_list[0]}") as user:
# user = conn # user = conn
# 不能创建用户 # 不能创建用户
tdLog.printNoPrefix("==========step5: normal user can not create user") tdLog.printNoPrefix("==========step4.1: normal user can not create user")
user.error("create use utest1 pass 'utest1pass'") user.error("create use utest1 pass 'utest1pass'")
# 可以查看用户 # 可以查看用户
tdLog.printNoPrefix("==========step6: normal user can show user") tdLog.printNoPrefix("==========step4.2: normal user can show user")
user.query("show users") user.query("show users")
assert user.queryRows == self.users_count + 1 assert user.queryRows == self.users_count + 1
# 不可以修改其他用户的密码 # 不可以修改其他用户的密码
tdLog.printNoPrefix("==========step7: normal user can not alter other user pass") tdLog.printNoPrefix("==========step4.3: normal user can not alter other user pass")
user.error(self.__alter_pass_sql(self.__user_list[1], self.__passwd_list[1] )) user.error(self.__alter_pass_sql(self.__user_list[1], self.__passwd_list[1] ))
user.error(self.__alter_pass_sql("root", "taosdata_root" )) user.error(self.__alter_pass_sql("root", "taosdata_root" ))
# 可以修改自己的密码 # 可以修改自己的密码
tdLog.printNoPrefix("==========step8: normal user can alter owner pass") tdLog.printNoPrefix("==========step4.4: normal user can alter owner pass")
user.query(self.__alter_pass_sql(self.__user_list[0], self.__passwd_list[0])) user.query(self.__alter_pass_sql(self.__user_list[0], self.__passwd_list[0]))
# 不可以删除用户,包括自己 # 不可以删除用户,包括自己
tdLog.printNoPrefix("==========step9: normal user can not drop any user ") tdLog.printNoPrefix("==========step4.5: normal user can not drop any user ")
user.error(f"drop user {self.__user_list[0]}") user.error(f"drop user {self.__user_list[0]}")
user.error(f"drop user {self.__user_list[1]}") user.error(f"drop user {self.__user_list[1]}")
user.error("drop user root") user.error("drop user root")
tdLog.printNoPrefix("==========step5: enable info")
taos1_conn = taos.connect(user=self.__user_list[1], password=f"new{self.__passwd_list[1]}")
taos1_conn.query(f"show databases")
tdSql.execute(f"alter user {self.__user_list[1]} enable 0")
tdSql.execute(f"alter user {self.__user_list[2]} enable 0")
taos1_except = True
try:
taos1_conn.query("show databases")
except BaseException:
taos1_except = False
if taos1_except:
tdLog.exit("taos 1 connect except error not occured, when enable == 0, should not r/w ")
else:
tdLog.info("taos 1 connect except error occured, enable == 0")
taos2_except = True
try:
taos.connect(user=self.__user_list[2], password=f"new{self.__passwd_list[2]}")
except BaseException:
taos2_except = False
if taos2_except:
tdLog.exit("taos 2 connect except error not occured, when enable == 0, should not connect")
else:
tdLog.info("taos 2 connect except error occured, enable == 0, can not login")
tdLog.printNoPrefix("==========step6: sysinfo info")
taos3_conn = taos.connect(user=self.__user_list[3], password=f"new{self.__passwd_list[3]}")
taos3_conn.query(f"show dnodes")
taos3_conn.query(f"show {DBNAME}.vgroups")
tdSql.execute(f"alter user {self.__user_list[3]} sysinfo 0")
tdSql.execute(f"alter user {self.__user_list[4]} sysinfo 0")
taos3_except = True
try:
taos3_conn.query(f"show dnodes")
taos3_conn.query(f"show {DBNAME}.vgroups")
except BaseException:
taos3_except = False
if taos3_except:
tdLog.exit("taos 3 query except error not occured, when sysinfo == 0, should not show info:dnode/monde/qnode ")
else:
tdLog.info("taos 3 query except error occured, sysinfo == 0, can not show dnode/vgroups")
taos4_conn = taos.connect(user=self.__user_list[4], password=f"new{self.__passwd_list[4]}")
taos4_except = True
try:
taos4_conn.query(f"show mnodes")
taos4_conn.query(f"show {DBNAME}.vgroups")
except BaseException:
taos4_except = False
if taos4_except:
tdLog.exit("taos 4 query except error not occured, when sysinfo == 0, when enable == 0, should not show info:dnode/monde/qnode")
else:
tdLog.info("taos 4 query except error occured, sysinfo == 0, can not show dnode/vgroups")
# root删除用户测试 # root删除用户测试
tdLog.printNoPrefix("==========step10: super user drop normal user") tdLog.printNoPrefix("==========step7: super user drop normal user")
self.test_drop_user() self.test_drop_user()
tdSql.query("show users") tdSql.query("show users")
......
import datetime from datetime import datetime
import time import time
from dataclasses import dataclass from dataclasses import dataclass
...@@ -8,6 +8,7 @@ from util.sql import * ...@@ -8,6 +8,7 @@ from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.constant import * from util.constant import *
from util.common import *
PRIMARY_COL = "ts" PRIMARY_COL = "ts"
...@@ -38,7 +39,7 @@ TAG_COL = [INT_TAG] ...@@ -38,7 +39,7 @@ TAG_COL = [INT_TAG]
# insert data args: # insert data args:
TIME_STEP = 10000 TIME_STEP = 10000
NOW = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) NOW = int(datetime.timestamp(datetime.now()) * 1000)
# init db/table # init db/table
DBNAME = "db" DBNAME = "db"
...@@ -47,40 +48,6 @@ CTBNAME = "ct1" ...@@ -47,40 +48,6 @@ CTBNAME = "ct1"
NTBNAME = "nt1" NTBNAME = "nt1"
@dataclass
class DataSet:
ts_data : List[int] = None
int_data : List[int] = None
bint_data : List[int] = None
sint_data : List[int] = None
tint_data : List[int] = None
int_un_data : List[int] = None
bint_un_data: List[int] = None
sint_un_data: List[int] = None
tint_un_data: List[int] = None
float_data : List[float] = None
double_data : List[float] = None
bool_data : List[int] = None
binary_data : List[str] = None
nchar_data : List[str] = None
def __post_init__(self):
self.ts_data = []
self.int_data = []
self.bint_data = []
self.sint_data = []
self.tint_data = []
self.int_un_data = []
self.bint_un_data = []
self.sint_un_data = []
self.tint_un_data = []
self.float_data = []
self.double_data = []
self.bool_data = []
self.binary_data = []
self.nchar_data = []
@dataclass @dataclass
class SMAschema: class SMAschema:
creation : str = "CREATE" creation : str = "CREATE"
...@@ -164,10 +131,6 @@ class SMAschema: ...@@ -164,10 +131,6 @@ class SMAschema:
del self.other[k] del self.other[k]
# from ...pytest.util.sql import *
# from ...pytest.util.constant import *
class TDTestCase: class TDTestCase:
updatecfgDict = {"querySmaOptimize": 1} updatecfgDict = {"querySmaOptimize": 1}
...@@ -469,14 +432,12 @@ class TDTestCase: ...@@ -469,14 +432,12 @@ class TDTestCase:
err_sqls.append( SMAschema(index_flag="SMA INDEX ,", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) ) err_sqls.append( SMAschema(index_flag="SMA INDEX ,", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(index_name="tbname", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) ) err_sqls.append( SMAschema(index_name="tbname", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
# current_set # current_set
cur_sqls.append( SMAschema(max_delay="",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) ) cur_sqls.append( SMAschema(max_delay="",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
cur_sqls.append( SMAschema(watermark="",index_name="sma_index_2",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) ) cur_sqls.append( SMAschema(watermark="",index_name="sma_index_2",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
cur_sqls.append( SMAschema(sliding="",index_name='sma_index_3',tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) ) cur_sqls.append( SMAschema(sliding="",index_name='sma_index_3',tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
return err_sqls, cur_sqls return err_sqls, cur_sqls
def test_create_sma(self): def test_create_sma(self):
...@@ -512,102 +473,48 @@ class TDTestCase: ...@@ -512,102 +473,48 @@ class TDTestCase:
self.test_create_sma() self.test_create_sma()
self.test_drop_sma() self.test_drop_sma()
pass def __create_tb(self, stb=STBNAME, ctb_num=20, ntbnum=1, dbname=DBNAME):
def __create_tb(self):
tdLog.printNoPrefix("==========step: create table") tdLog.printNoPrefix("==========step: create table")
create_stb_sql = f'''create table {STBNAME}( create_stb_sql = f'''create table {dbname}.{stb}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, {PRIMARY_COL} timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp, {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned, {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
) tags ({INT_TAG} int) ) tags ({INT_TAG} int)
''' '''
create_ntb_sql = f'''create table {NTBNAME}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
)
'''
tdSql.execute(create_stb_sql) tdSql.execute(create_stb_sql)
tdSql.execute(create_ntb_sql)
for i in range(4): for i in range(ntbnum):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') create_ntb_sql = f'''create table {dbname}.nt{i+1}(
{PRIMARY_COL} timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
def __data_set(self, rows): {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
data_set = DataSet() {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
for i in range(rows): {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
data_set.ts_data.append(NOW + 1 * (rows - i)) )
data_set.int_data.append(rows - i) '''
data_set.bint_data.append(11111 * (rows - i)) tdSql.execute(create_ntb_sql)
data_set.sint_data.append(111 * (rows - i) % 32767)
data_set.tint_data.append(11 * (rows - i) % 127) for i in range(ctb_num):
data_set.int_un_data.append(rows - i) tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )')
data_set.bint_un_data.append(11111 * (rows - i))
data_set.sint_un_data.append(111 * (rows - i) % 32767) def __insert_data(self, rows, ctb_num=20, dbname=DBNAME, star_time=NOW):
data_set.tint_un_data.append(11 * (rows - i) % 127)
data_set.float_data.append(1.11 * (rows - i))
data_set.double_data.append(1100.0011 * (rows - i))
data_set.bool_data.append((rows - i) % 2)
data_set.binary_data.append(f'binary{(rows - i)}')
data_set.nchar_data.append(f'nchar_测试_{(rows - i)}')
return data_set
def __insert_data(self):
tdLog.printNoPrefix("==========step: start inser data into tables now.....") tdLog.printNoPrefix("==========step: start inser data into tables now.....")
data = self.__data_set(rows=self.rows) # from ...pytest.util.common import DataSet
data = DataSet()
data.get_order_set(rows, bint_step=2)
# now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) for i in range(rows):
null_data = '''null, null, null, null, null, null, null, null, null, null, null, null, null, null'''
zero_data = "0, 0, 0, 0, 0, 0, 0, 'binary_0', 'nchar_0', 0, 0, 0, 0, 0"
for i in range(self.rows):
row_data = f''' row_data = f'''
{data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]}, {data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
{data.bool_data[i]}, '{data.binary_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.tint_un_data[i]}, {data.bool_data[i]}, '{data.vchar_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.utint_data[i]},
{data.sint_un_data[i]}, {data.int_un_data[i]}, {data.bint_un_data[i]} {data.usint_data[i]}, {data.uint_data[i]}, {data.ubint_data[i]}
'''
neg_row_data = f'''
{-1 * data.int_data[i]}, {-1 * data.bint_data[i]}, {-1 * data.sint_data[i]}, {-1 * data.tint_data[i]}, {-1 * data.float_data[i]}, {-1 * data.double_data[i]},
{data.bool_data[i]}, '{data.binary_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {1 * data.tint_un_data[i]},
{1 * data.sint_un_data[i]}, {1 * data.int_un_data[i]}, {1 * data.bint_un_data[i]}
''' '''
tdSql.execute( f"insert into {dbname}.{NTBNAME} values ( {star_time - i * int(TIME_STEP * 1.2)}, {row_data} )" )
tdSql.execute( for j in range(ctb_num):
f"insert into ct1 values ( {NOW - i * TIME_STEP}, {row_data} )") tdSql.execute( f"insert into {dbname}.ct{j+1} values ( {star_time - j * i * TIME_STEP}, {row_data} )" )
tdSql.execute(
f"insert into ct2 values ( {NOW - i * int(TIME_STEP * 0.6)}, {neg_row_data} )")
tdSql.execute(
f"insert into ct4 values ( {NOW - i * int(TIME_STEP * 0.8) }, {row_data} )")
tdSql.execute(
f"insert into {NTBNAME} values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )")
tdSql.execute(
f"insert into ct2 values ( {NOW + int(TIME_STEP * 0.6)}, {null_data} )")
tdSql.execute(
f"insert into ct2 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.6)}, {null_data} )")
tdSql.execute(
f"insert into ct2 values ( {NOW - self.rows * int(TIME_STEP * 0.29) }, {null_data} )")
tdSql.execute(
f"insert into ct4 values ( {NOW + int(TIME_STEP * 0.8)}, {null_data} )")
tdSql.execute(
f"insert into ct4 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.8)}, {null_data} )")
tdSql.execute(
f"insert into ct4 values ( {NOW - self.rows * int(TIME_STEP * 0.39)}, {null_data} )")
tdSql.execute(
f"insert into {NTBNAME} values ( {NOW + int(TIME_STEP * 1.2)}, {null_data} )")
tdSql.execute(
f"insert into {NTBNAME} values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )")
tdSql.execute(
f"insert into {NTBNAME} values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )")
def run(self): def run(self):
self.rows = 10 self.rows = 10
...@@ -616,14 +523,60 @@ class TDTestCase: ...@@ -616,14 +523,60 @@ class TDTestCase:
tdLog.printNoPrefix("==========step1:create table in normal database") tdLog.printNoPrefix("==========step1:create table in normal database")
tdSql.prepare() tdSql.prepare()
self.__create_tb() self.__create_tb(dbname=DBNAME)
self.__insert_data() self.__insert_data(rows=self.rows)
self.all_test() self.all_test()
# # from ...pytest.util.sql import *
# drop databases, create same name db、stb and sma index # drop databases, create same name db、stb and sma index
tdSql.prepare() tdSql.prepare()
self.__create_tb() self.__create_tb(dbname=DBNAME)
self.__insert_data() self.__insert_data(rows=self.rows,star_time=NOW + self.rows * 2 * TIME_STEP)
tdLog.printNoPrefix("==========step1.1 : create a tsma index and checkdata")
tdSql.execute(f"create sma index {DBNAME}.sma_index_name1 on {DBNAME}.{STBNAME} function(max({INT_COL}),max({BINT_COL}),min({INT_COL})) interval(6m,10s) sliding(6m)")
self.__insert_data(rows=self.rows)
tdSql.query(f"select max({INT_COL}), max({BINT_COL}), min({INT_COL}) from {DBNAME}.{STBNAME} interval(6m,10s) sliding(6m)")
tdSql.checkData(0, 0, self.rows - 1)
tdSql.checkData(0, 1, (self.rows - 1) * 2 )
tdSql.checkData(tdSql.queryRows - 1, 2, 0)
# tdSql.checkData(0, 2, 0)
tdLog.printNoPrefix("==========step1.2 : alter table schema, drop col without index")
tdSql.execute(f"alter stable {DBNAME}.{STBNAME} drop column {BINARY_COL}")
tdSql.query(f"select max({INT_COL}), max({BINT_COL}), min({INT_COL}) from {DBNAME}.{STBNAME} interval(6m,10s) sliding(6m)")
tdSql.checkData(0, 0, self.rows - 1)
tdSql.checkData(0, 1, (self.rows - 1) * 2 )
tdSql.checkData(tdSql.queryRows - 1, 2, 0)
tdLog.printNoPrefix("==========step1.3 : alter table schema, drop col with index")
# TODO: TD-18047, can not drop col, when col in tsma-index and tsma-index is not dropped.
tdSql.error(f"alter stable {DBNAME}.stb1 drop column {BINT_COL}")
tdLog.printNoPrefix("==========step1.4 : alter table schema, add col")
tdSql.execute(f"alter stable {DBNAME}.{STBNAME} add column {BINT_COL}_1 bigint")
tdSql.execute(f"insert into {DBNAME}.{CTBNAME} ({PRIMARY_COL}, {BINT_COL}_1) values(now(), 111)")
tdSql.query(f"select max({INT_COL}), max({BINT_COL}), min({INT_COL}) from {DBNAME}.{STBNAME} interval(6m,10s) sliding(6m)")
tdSql.checkData(0, 0, self.rows - 1)
tdSql.checkData(0, 1, (self.rows - 1) * 2 )
tdSql.checkData(tdSql.queryRows - 1, 2, 0)
# tdSql.checkData(0, 2, 0)
tdSql.query(f"select max({BINT_COL}_1) from {DBNAME}.{STBNAME} ")
tdSql.checkData(0, 0 , 111)
tdSql.execute(f"flush database {DBNAME}")
tdLog.printNoPrefix("==========step1.5 : drop child table")
tdSql.execute(f"drop table {CTBNAME}")
tdSql.query(f"select max({INT_COL}), max({BINT_COL}), min({INT_COL}) from {DBNAME}.{STBNAME} interval(6m,10s) sliding(6m)")
tdSql.checkData(0, 0, self.rows - 1)
tdSql.checkData(0, 1, (self.rows - 1) * 2 )
tdSql.checkData(tdSql.queryRows - 1, 2, 0)
tdLog.printNoPrefix("==========step1.6 : drop stable")
tdSql.execute(f"drop table {STBNAME}")
tdSql.error(f"select * from {DBNAME}.{STBNAME}")
self.all_test() self.all_test()
tdLog.printNoPrefix("==========step2:create table in rollup database") tdLog.printNoPrefix("==========step2:create table in rollup database")
...@@ -640,7 +593,6 @@ class TDTestCase: ...@@ -640,7 +593,6 @@ class TDTestCase:
tdSql.execute("flush database db ") tdSql.execute("flush database db ")
tdLog.printNoPrefix("==========step4:after wal, all check again ") tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test() self.all_test()
......
...@@ -27,7 +27,7 @@ python3 ./test.py -f 1-insert/alter_stable.py ...@@ -27,7 +27,7 @@ python3 ./test.py -f 1-insert/alter_stable.py
python3 ./test.py -f 1-insert/alter_table.py python3 ./test.py -f 1-insert/alter_table.py
python3 ./test.py -f 1-insert/insertWithMoreVgroup.py python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
python3 ./test.py -f 1-insert/table_comment.py python3 ./test.py -f 1-insert/table_comment.py
python3 ./test.py -f 1-insert/time_range_wise.py #python3 ./test.py -f 1-insert/time_range_wise.py #TD-18130
python3 ./test.py -f 1-insert/block_wise.py python3 ./test.py -f 1-insert/block_wise.py
python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/create_retentions.py
python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 1-insert/table_param_ttl.py
...@@ -196,28 +196,28 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeStopFollowerLeader.py -N 5 -M 3 ...@@ -196,28 +196,28 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeStopFollowerLeader.py -N 5 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3
# vnode case # vnode case
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_createDb_replica1.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_createDb_replica1.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica1_insertdatas.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica1_insertdatas.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica1_insertdatas_querys.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica1_insertdatas_querys.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_force_stop_all_dnodes.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_force_stop_all_dnodes.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_all_vnode.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_all_vnode.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_follower.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_follower.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_leader.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys_loop_restart_leader.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_all_dnodes.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_all_dnodes.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_sync.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_sync.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_unsync_force_stop.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_unsync_force_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_unsync.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_follower_unsync.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader_forece_stop.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader_forece_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_leader.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_mnode3_insertdatas_querys.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_mnode3_insertdatas_querys.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower_force_stop.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower_force_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_follower.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader_force_stop.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader_force_stop.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader.py -N 4 -M 1 # python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_querydatas_stop_leader.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups_stopOne.py -N 4 -M 1 python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups_stopOne.py -N 4 -M 1
python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3 python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册