提交 3572fa7f 编写于 作者: dengyihao's avatar dengyihao

Merge branch '3.0' of https://github.com/taosdata/TDengine into tdd

......@@ -3,6 +3,8 @@ title: 立即开始
description: '快速设置 TDengine 环境并体验其高效写入和查询'
import xiaot from './tdengine.webp'
TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api)
本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。
......@@ -18,4 +20,4 @@ import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
微信扫描下面二维码,加“小 T”为好友,即可加入“物联网大数据技术前沿群”,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。
<img src="./tdengine.webp" width="200" />
<img src={xiaot} alt="小 T 的二维码" width="200" />
......@@ -115,6 +115,7 @@ TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
{{#include examples/c/tmq.c}}
sidebar_label: TDengine 发布历史
title: TDengine 发布历史
title: TDengine 发布历史及下载链接
description: TDengine 发布历史、Release Notes 及下载链接
各版本 TDengine 安装包下载链接如下:
import Release from "/components/ReleaseV3";
......@@ -33,4 +35,3 @@ import Release from "/components/ReleaseV3";
<Release type="tdengine" version="" />
sidebar_label: taosTools 发布历史
title: taosTools 发布历史
title: taosTools 发布历史及下载链接
description: taosTools 的发布历史、Release Notes 和下载链接
各版本 taosTools 安装包下载链接如下:
import Release from "/components/ReleaseV3";
## 2.2.7
......@@ -418,13 +418,17 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeVariantI32(buf, &pSW->nCols);
buf = taosDecodeVariantI32(buf, &pSW->version);
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
return NULL;
if (pSW->nCols > 0) {
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
return NULL;
for (int32_t i = 0; i < pSW->nCols; i++) {
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
for (int32_t i = 0; i < pSW->nCols; i++) {
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
} else {
pSW->pSchema = NULL;
return (void*)buf;
......@@ -839,7 +843,7 @@ typedef struct {
int64_t dbId;
int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs; // ms
int64_t stateTs; // ms
} SUseDbReq;
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
......@@ -2990,7 +2994,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
pSubTopicEp->schema.nCols = 0;
......@@ -58,7 +58,6 @@ typedef int64_t SyncIndex;
typedef uint64_t SyncTerm;
typedef struct SSyncNode SSyncNode;
typedef struct SSyncBuffer SSyncBuffer;
typedef struct SWal SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry;
......@@ -3423,6 +3423,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
ASSERT(code == 0);
if (code == -1) {
// coverity scan
pGroupResInfo->index += 1;
SResultRow* pRow = (SResultRow*)pVal;
......@@ -1773,6 +1773,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
return NULL;
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
......@@ -3580,6 +3580,11 @@ static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessio
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
*pHashKey = *pKey;
pHashKey->win.ekey = pKey->win.skey;
static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
if (tSimpleHashGetSize(pHashMap) == 0) {
......@@ -3588,8 +3593,8 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
for (int32_t i = 0; i < size; i++) {
SSessionKey* pWin = taosArrayGet(pWins, i);
if (!pWin) continue;
SSessionKey key = *pWin;
key.win.ekey = key.win.skey;
SSessionKey key = {0};
getSessionHashKey(pWin, &key);
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
......@@ -3642,7 +3647,9 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo
static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) {
streamStateSessionDel(pAggSup->pState, pKey);
tSimpleHashRemove(pAggSup->pResultRows, pKey, sizeof(SSessionKey));
SSessionKey hashKey = {0};
getSessionHashKey(pKey, &hashKey);
tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
return true;
......@@ -3753,8 +3760,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
SSessionKey key = winInfo.sessionWin;
key.win.ekey = key.win.skey;
SSessionKey key = {0};
getSessionHashKey(&winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
......@@ -3896,8 +3903,8 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup;
SSessionKey chWinKey = *pWinKey;
chWinKey.win.ekey = chWinKey.win.skey;
SSessionKey chWinKey = {0};
getSessionHashKey(pWinKey, &chWinKey);
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey);
SResultRow* pResult = NULL;
SResultRow* pChResult = NULL;
......@@ -3978,8 +3985,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
for (int32_t i = 0; i < size; i++) {
SSessionKey* pWinKey = taosArrayGet(pResWins, i);
if (!pWinKey) continue;
SSessionKey winInfo = *pWinKey;
winInfo.win.ekey = winInfo.win.skey;
SSessionKey winInfo = {0};
getSessionHashKey(pWinKey, &winInfo);
tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0);
......@@ -4561,8 +4568,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
SSessionKey key = curWin.winInfo.sessionWin;
key.win.ekey = key.win.skey;
SSessionKey key = {0};
getSessionHashKey(&curWin.winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
......@@ -4645,6 +4652,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
#if 0
char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState);
qDebug("===stream===final session%s", pBuf);
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete");
......@@ -138,7 +138,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Primary timestamp column cannot be dropped";
return "Only binary/nchar column length could be modified";
return "Only binary/nchar column length could be modified, and the length can only be increased, not decreased";
return "Invalid tbname pseudo column";
......@@ -521,9 +521,13 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
void* tmp = NULL;
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
if (code == 0) {
*key = resKey;
*pVal = tdbRealloc(NULL, *pVLen);
memcpy(*pVal, tmp, *pVLen);
if (key->win.skey != resKey.win.skey) {
code = -1;
} else {
*key = resKey;
*pVal = tdbRealloc(NULL, *pVLen);
memcpy(*pVal, tmp, *pVLen);
return code;
......@@ -21,7 +21,6 @@ extern "C" {
#include "syncInt.h"
#include "syncMessage.h"
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
......@@ -21,7 +21,6 @@ extern "C" {
#include "syncInt.h"
#include "syncMessage.h"
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
......@@ -41,22 +41,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index);
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime);
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime);
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj);
void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj);
void syncIndexMgrLog(SSyncIndexMgr *pObj);
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj);
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime);
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime);
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
#ifdef __cplusplus
......@@ -21,7 +21,6 @@ extern "C" {
#include "sync.h"
#include "syncTools.h"
#include "taosdef.h"
#include "tlog.h"
#include "trpc.h"
......@@ -85,9 +84,33 @@ typedef struct SSyncSnapshotSender SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver;
typedef struct SSyncTimer SSyncTimer;
typedef struct SSyncHbTimerData SSyncHbTimerData;
typedef struct SyncSnapshotSend SyncSnapshotSend;
typedef struct SyncSnapshotRsp SyncSnapshotRsp;
typedef struct SyncLocalCmd SyncLocalCmd;
typedef struct SyncAppendEntriesBatch SyncAppendEntriesBatch;
typedef struct SyncPreSnapshotReply SyncPreSnapshotReply;
typedef struct SyncHeartbeatReply SyncHeartbeatReply;
typedef struct SyncHeartbeat SyncHeartbeat;
typedef struct SyncPreSnapshot SyncPreSnapshot;
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg);
typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg);
typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg);
typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg);
extern bool gRaftDetailLog;
typedef struct SRaftId {
SyncNodeId addr;
SyncGroupId vgId;
} SRaftId;
typedef struct SSyncHbTimerData {
SSyncNode* pSyncNode;
SSyncTimer* pTimer;
......@@ -13,15 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "syncAppendEntries.h"
#include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncMessage.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
#include "wal.h"
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
......@@ -13,17 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "syncAppendEntriesReply.h"
#include "syncMessage.h"
#include "syncCommit.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
......@@ -13,10 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "syncCommit.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
......@@ -18,8 +18,8 @@
#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "syncVoteMgr.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
// TLA+ Spec
// RequestVote(i, j) ==
......@@ -105,6 +105,7 @@ void syncEnvStopTimer() {
static void syncEnvTick(void *param, void *tmrId) {
#if 0
SSyncEnv *pSyncEnv = param;
if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) {
......@@ -121,4 +122,5 @@ static void syncEnvTick(void *param, void *tmrId) {
gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
gSyncEnv.envTickTimerMS, tmrId);
......@@ -13,18 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "syncIndexMgr.h"
#include "syncUtil.h"
// SMatchIndex -----------------------------
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr));
SSyncIndexMgr *pSyncIndexMgr = taosMemoryCalloc(1, sizeof(SSyncIndexMgr));
if (pSyncIndexMgr == NULL) {
return NULL;
memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr));
pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
......@@ -97,54 +95,6 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaf
cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char u64buf[128] = {0};
cJSON *pRoot = cJSON_CreateObject();
if (pSyncIndexMgr != NULL) {
cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum);
cJSON *pReplicas = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i]));
int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
arr[i] = pSyncIndexMgr->index[i];
cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
cJSON_AddItemToObject(pRoot, "index", pIndex);
int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
arr[i] = pSyncIndexMgr->privateTerm[i];
cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
cJSON_AddItemToObject(pRoot, "privateTerm", pIndex);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot);
return pJson;
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char *serialized = cJSON_Print(pJson);
return serialized;
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
......@@ -201,35 +151,6 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa
return -1;
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", (uint64_t)strlen(serialized), serialized);
void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", (uint64_t)strlen(serialized), s, serialized);
void syncIndexMgrLog(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", (uint64_t)strlen(serialized), serialized);
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
if (gRaftDetailLog) {
char *serialized = syncIndexMgr2Str(pObj);
sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", (uint64_t)strlen(serialized), s, serialized);
void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncRaftEntry.h"
......@@ -150,31 +151,6 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) {
// ---- message process SyncPing----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncPing) + dataLen;
SyncPing* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_PING;
pMsg->dataLen = dataLen;
return pMsg;
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) {
uint32_t dataLen = strlen(str) + 1;
SyncPing* pMsg = syncPingBuild(dataLen);
pMsg->vgId = vgId;
pMsg->srcId = *srcId;
pMsg->destId = *destId;
snprintf(pMsg->data, pMsg->dataLen, "%s", str);
return pMsg;
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) {
SyncPing* pMsg = syncPingBuild2(srcId, destId, vgId, "ping");
return pMsg;
void syncPingDestroy(SyncPing* pMsg) {
if (pMsg != NULL) {
......@@ -193,16 +169,6 @@ void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen);
char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncPingSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
return buf;
SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncPing* pMsg = taosMemoryMalloc(bytes);
......@@ -212,117 +178,6 @@ SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) {
return pMsg;
int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) {
return -1;
if (tEncodeU32(&encoder, pMsg->bytes) < 0) {
return -1;
if (tEncodeI32(&encoder, pMsg->vgId) < 0) {
return -1;
if (tEncodeU32(&encoder, pMsg->msgType) < 0) {
return -1;
if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) {
return -1;
if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) {
return -1;
if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) {
return -1;
if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) {
return -1;
if (tEncodeU32(&encoder, pMsg->dataLen) < 0) {
return -1;
if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) {
return -1;
int32_t tlen = encoder.pos;
return tlen;
SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) {
return NULL;
SyncPing* pMsg = NULL;
uint32_t bytes;
if (tDecodeU32(&decoder, &bytes) < 0) {
return NULL;
pMsg = taosMemoryMalloc(bytes);
pMsg->bytes = bytes;
if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
return NULL;
if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
return NULL;
if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
return NULL;
if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
return NULL;
if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
return NULL;
if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
return NULL;
if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
return NULL;
uint32_t len;
char* data = NULL;
if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
return NULL;
ASSERT(len == pMsg->dataLen);
memcpy(pMsg->data, data, len);
return pMsg;
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
......@@ -330,96 +185,6 @@ SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
return pMsg;
cJSON* syncPing2Json(const SyncPing* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
return pJson;
char* syncPing2Str(const SyncPing* pMsg) {
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
return serialized;
// for debug ----------------------
void syncPingPrint(const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
printf("syncPingPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
void syncPingPrint2(char* s, const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
printf("syncPingPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
void syncPingLog(const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
sTrace("syncPingLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
void syncPingLog2(char* s, const SyncPing* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncPing2Str(pMsg);
sTrace("syncPingLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
// ---- message process SyncPingReply----
SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncPingReply) + dataLen;
......@@ -15,6 +15,7 @@
#include "syncRequestVote.h"
#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
......@@ -15,6 +15,7 @@
#include "syncRequestVoteReply.h"
#include "syncMessage.h"
#include "syncRaftStore.h"
#include "syncVoteMgr.h"
......@@ -15,6 +15,7 @@
#include "syncVoteMgr.h"
#include "syncMessage.h"
#include "syncUtil.h"
static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
......@@ -143,7 +143,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) {
sTrace("==callback== ==ReConfigCb== flag:0x%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64
sTrace("==callback== ==ReConfigCb== flag:%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64
", term:%" PRIu64,
cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term);
......@@ -110,7 +110,37 @@ char* snapshotSender2Str(SSyncSnapshotSender* pSender);
cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver);
char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver);
cJSON* syncIndexMgr2Json(SSyncIndexMgr* pSyncIndexMgr);
char* syncIndexMgr2Str(SSyncIndexMgr* pSyncIndexMgr);
void syncIndexMgrPrint(SSyncIndexMgr* pObj);
void syncIndexMgrPrint2(char* s, SSyncIndexMgr* pObj);
void syncIndexMgrLog(SSyncIndexMgr* pObj);
void syncIndexMgrLog2(char* s, SSyncIndexMgr* pObj);
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
cJSON* syncRpcUnknownMsg2Json();
char* syncRpcMsg2Str(SRpcMsg* pRpcMsg);
void syncRpcMsgPrint(SRpcMsg* pMsg);
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
void syncRpcMsgLog(SRpcMsg* pMsg);
void syncRpcMsgLog2(char* s, SRpcMsg* pMsg);
// origin syncMessage
SyncPing* syncPingBuild(uint32_t dataLen);
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str);
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId);
char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len);
int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen);
SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen);
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg);
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg);
cJSON* syncPing2Json(const SyncPing* pMsg);
char* syncPing2Str(const SyncPing* pMsg);
void syncPingPrint(const SyncPing* pMsg);
void syncPingPrint2(char* s, const SyncPing* pMsg);
void syncPingLog(const SyncPing* pMsg);
void syncPingLog2(char* s, const SyncPing* pMsg);
#ifdef __cplusplus
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "syncTest.h"
void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", (uint64_t)strlen(serialized), serialized);
void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", (uint64_t)strlen(serialized), s, serialized);
void syncIndexMgrLog(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", (uint64_t)strlen(serialized), serialized);
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
if (gRaftDetailLog) {
char *serialized = syncIndexMgr2Str(pObj);
sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", (uint64_t)strlen(serialized), s, serialized);
cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char u64buf[128] = {0};
cJSON *pRoot = cJSON_CreateObject();
if (pSyncIndexMgr != NULL) {
cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum);
cJSON *pReplicas = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i]));
int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
arr[i] = pSyncIndexMgr->index[i];
cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
cJSON_AddItemToObject(pRoot, "index", pIndex);
int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
arr[i] = pSyncIndexMgr->privateTerm[i];
cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
cJSON_AddItemToObject(pRoot, "privateTerm", pIndex);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot);
return pJson;
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char *serialized = cJSON_Print(pJson);
return serialized;
......@@ -16,6 +16,244 @@
#include "syncTest.h"
// ---- message process SyncPing----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncPing) + dataLen;
SyncPing* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_SYNC_PING;
pMsg->dataLen = dataLen;
return pMsg;
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) {
uint32_t dataLen = strlen(str) + 1;
SyncPing* pMsg = syncPingBuild(dataLen);
pMsg->vgId = vgId;
pMsg->srcId = *srcId;
pMsg->destId = *destId;
snprintf(pMsg->data, pMsg->dataLen, "%s", str);
return pMsg;
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) {
SyncPing* pMsg = syncPingBuild2(srcId, destId, vgId, "ping");
return pMsg;
char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
ASSERT(buf != NULL);
syncPingSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
return buf;
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) {
return NULL;
SyncPing* pMsg = NULL;
uint32_t bytes;
if (tDecodeU32(&decoder, &bytes) < 0) {
return NULL;
pMsg = taosMemoryMalloc(bytes);
pMsg->bytes = bytes;
if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
return NULL;
if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
return NULL;
if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
return NULL;
if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
return NULL;
if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
return NULL;
if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
return NULL;
if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
return NULL;
uint32_t len;
char* data = NULL;
if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
return NULL;
ASSERT(len == pMsg->dataLen);
memcpy(pMsg->data, data, len);
return pMsg;
int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) {
return -1;
if (tEncodeU32(&encoder, pMsg->bytes) < 0) {
return -1;
if (tEncodeI32(&encoder, pMsg->vgId) < 0) {
return -1;
if (tEncodeU32(&encoder, pMsg->msgType) < 0) {
return -1;
if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) {
return -1;
if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) {
return -1;
if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) {
return -1;
if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) {
return -1;
if (tEncodeU32(&encoder, pMsg->dataLen) < 0) {
return -1;
if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) {
return -1;
int32_t tlen = encoder.pos;
return tlen;
cJSON* syncPing2Json(const SyncPing* pMsg) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pSrcId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
uint64_t u64 = pMsg->srcId.addr;
cJSON* pTmp = pSrcId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
cJSON* pDestId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
cJSON_AddStringToObject(pDestId, "addr", u64buf);
uint64_t u64 = pMsg->destId.addr;
cJSON* pTmp = pDestId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
cJSON_AddItemToObject(pRoot, "destId", pDestId);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
char* s;
s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data", s);
s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
cJSON_AddStringToObject(pRoot, "data2", s);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
return pJson;
char* syncPing2Str(const SyncPing* pMsg) {
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
return serialized;
// for debug ----------------------
void syncPingPrint(const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
printf("syncPingPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
void syncPingPrint2(char* s, const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
printf("syncPingPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
void syncPingLog(const SyncPing* pMsg) {
char* serialized = syncPing2Str(pMsg);
sTrace("syncPingLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
void syncPingLog2(char* s, const SyncPing* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncPing2Str(pMsg);
sTrace("syncPingLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* pRoot;
......@@ -521,7 +521,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TIMELINE_FUNC, "Invalid timeline fu
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY, "Primary timestamp column cannot be dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar column length could be modified")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar column length could be modified, and the length can only be increased, not decreased")
......@@ -544,4 +544,192 @@ if $rows != 10 then
sql drop stream if exists streams4;
sql drop database if exists test4;
sql drop stable if exists streamt4;
sql create database if not exists test4 vgroups 10 precision "ms" ;
sql use test4;
sql create table st (ts timestamp, c1 tinyint, c2 smallint) tags (t1 tinyint) ;
sql create table t1 using st tags (-81) ;
sql create table t2 using st tags (-81) ;
sql create stream if not exists streams4 trigger window_close into streamt4 as select _wstart AS start, min(c1),count(c1) from t1 state_window(c1);
sql insert into t1 (ts, c1) values (1668073288209, 11);
sql insert into t1 (ts, c1) values (1668073288210, 11);
sql insert into t1 (ts, c1) values (1668073288211, 11);
sql insert into t1 (ts, c1) values (1668073288212, 11);
sql insert into t1 (ts, c1) values (1668073288213, 11);
sql insert into t1 (ts, c1) values (1668073288214, 11);
sql insert into t1 (ts, c1) values (1668073288215, 29);
$loop_count = 0
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
if $rows != 1 then
print =====rows=$rows
goto loop7
if $data01 != 11 then
print =====data01=$data01
goto loop7
if $data02 != 6 then
print =====data02=$data02
goto loop7
sql delete from t1 where ts = cast(1668073288214 as timestamp);
sql insert into t1 (ts, c1) values (1668073288216, 29);
sql delete from t1 where ts = cast(1668073288215 as timestamp);
sql insert into t1 (ts, c1) values (1668073288217, 29);
sql delete from t1 where ts = cast(1668073288216 as timestamp);
sql insert into t1 (ts, c1) values (1668073288218, 29);
sql delete from t1 where ts = cast(1668073288217 as timestamp);
sql insert into t1 (ts, c1) values (1668073288219, 29);
sql delete from t1 where ts = cast(1668073288218 as timestamp);
sql insert into t1 (ts, c1) values (1668073288220, 29);
sql delete from t1 where ts = cast(1668073288219 as timestamp);
$loop_count = 0
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
if $rows != 1 then
print =====rows=$rows
goto loop8
if $data01 != 11 then
print =====data01=$data01
goto loop8
if $data02 != 5 then
print =====data02=$data02
goto loop8
sql insert into t1 (ts, c1) values (1668073288221, 65);
sql insert into t1 (ts, c1) values (1668073288222, 65);
sql insert into t1 (ts, c1) values (1668073288223, 65);
sql insert into t1 (ts, c1) values (1668073288224, 65);
sql insert into t1 (ts, c1) values (1668073288225, 65);
sql insert into t1 (ts, c1) values (1668073288226, 65);
$loop_count = 0
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
if $rows != 2 then
print =====rows=$rows
goto loop8
if $data01 != 11 then
print =====data01=$data01
goto loop8
if $data02 != 5 then
print =====data02=$data02
goto loop8
if $data11 != 29 then
print =====data11=$data11
goto loop8
if $data12 != 1 then
print =====data12=$data12
goto loop8
sql insert into t1 (ts, c1) values (1668073288224, 64);
$loop_count = 0
sleep 200
sql select * from streamt4 order by start;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
if $rows != 4 then
print =====rows=$rows
goto loop9
if $data01 != 11 then
print =====data01=$data01
goto loop9
if $data02 != 5 then
print =====data02=$data02
goto loop9
if $data11 != 29 then
print =====data11=$data11
goto loop9
if $data12 != 1 then
print =====data12=$data12
goto loop9
if $data21 != 65 then
print =====data21=$data21
goto loop9
if $data22 != 3 then
print =====data22=$data22
goto loop9
if $data31 != 64 then
print =====data31=$data31
goto loop9
if $data32 != 1 then
print =====data32=$data32
goto loop9
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册