提交 f24d8433 编写于 作者: S Shengliang Guan

Merge branch 'main' into fix/td-21029

......@@ -10,7 +10,7 @@ Because stream processing is built in to TDengine, you are no longer reliant on
## Create a Stream
```sql
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name SUBTABLE(expression) AS subquery
stream_options: {
TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
WATERMARK time
......@@ -30,6 +30,8 @@ subquery: SELECT [DISTINCT] select_list
Session windows, state windows, and sliding windows are supported. When you configure a session or state window for a supertable, you must use PARTITION BY TBNAME.
Subtable Clause defines the naming rules of auto-created subtable, you can see more details in below part: Partitions of Stream.
```sql
window_clause: {
SESSION(ts_col, tol_val)
......@@ -47,6 +49,47 @@ CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
```
## Partitions of Stream
A Stream can process data in multiple partitions. Partition rules can be defined by PARTITION BY clause in stream processing. Each partition will have different timelines and windows, and will be processed separately and be written into different subtables of target supertable.
If a stream is created without PARTITION BY clause, all data will be written into one subtable.
If a stream is created with PARTITION BY clause without SUBTABLE clause, each partition will be given a random name.
If a stream is created with PARTITION BY clause and SUBTABLE clause, the name of each partition will be calculated according to SUBTABLE clause. For example:
```sql
CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m);
```
IN PARTITION clause, 'tbname', representing each subtable name of source supertable, is given alias 'tname'. And 'tname' is used in SUBTABLE clause. In SUBTABLE clause, each auto created subtable will concat 'new-' and source subtable name as their name. Other expressions are also allowed in SUBTABLE clause, but the output type must be varchar.
If the output length exceeds the limitation of TDengine(192), the name will be truncated. If the generated name is occupied by some other table, the creation and writing of the new subtable will be failed.
## Filling history data
Normally a stream does not process data already or being written into source table when it's being creating. But adding FILL_HISTORY 1 as a stream option when creating the stream will allow it to process data written before and while creating the stream. For example:
```sql
create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 interval(10s)
```
Combining fill_history option and where clause, stream can processing data of specific time range. For example, only process data after a past time. (In this case, 2020-01-30)
```sql
create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > '2020-01-30' interval(10s)
```
As another example, only processing data starting from some past time, and ending at some future time.
```sql
create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > '2020-01-30' and ts < '2023-01-01' interval(10s)
```
If some streams are totally outdated, and you do not want it to monitor or process anymore, those streams can be manually dropped and output data will be still kept.
## Delete a Stream
```sql
......
......@@ -8,7 +8,7 @@ description: 流式计算的相关 SQL 的详细语法
## 创建流式计算
```sql
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name SUBTABLE(expression) AS subquery
stream_options: {
TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
WATERMARK time
......@@ -28,6 +28,9 @@ subquery: SELECT select_list
支持会话窗口、状态窗口与滑动窗口,其中,会话窗口与状态窗口搭配超级表时必须与partition by tbname一起使用
subtable 子句定义了流式计算中创建的子表的命名规则,详见 流式计算的 partition 部分。
```sql
window_clause: {
SESSION(ts_col, tol_val)
......@@ -49,11 +52,43 @@ SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(
## 流式计算的 partition
可以使用 PARTITION BY TBNAME 或 PARTITION BY tag,对一个流进行多分区的计算,每个分区的时间线与时间窗口是独立的,会各自聚合,并写入到目的表中的不同子表。
可以使用 PARTITION BY TBNAME,tag,普通列或者表达式,对一个流进行多分区的计算,每个分区的时间线与时间窗口是独立的,会各自聚合,并写入到目的表中的不同子表。
不带 PARTITION BY 子句时,所有的数据将写入到一张子表。
在创建流时不使用 SUBTABLE 子句时,流式计算创建的超级表有唯一的 tag 列 groupId,每个 partition 会被分配唯一 groupId。与 schemaless 写入一致,我们通过 MD5 计算子表名,并自动创建它。
若创建流的语句中包含 SUBTABLE 子句,用户可以为每个 partition 对应的子表生成自定义的表名,例如:
```sql
CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m);
```
PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子句中的别名可以用于 SUBTABLE 子句中的表达式计算,在上述示例中,流新创建的子表将以前缀 'new-' 连接原表名作为表名。
注意,子表名的长度若超过 TDengine 的限制,将被截断。若要生成的子表名已经存在于另一超级表,由于 TDengine 的子表名是唯一的,因此对应新子表的创建以及数据的写入将会失败。
## 流式计算读取历史数据
不带 PARTITION BY 选项时,所有的数据将写入到一张子表。
正常情况下,流式计算不会处理创建前已经写入源表中的数据,若要处理已经写入的数据,可以在创建流时设置 fill_history 1 选项,这样创建的流式计算会自动处理创建前、创建中、创建后写入的数据。例如:
```sql
create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 interval(10s)
```
结合 fill_history 1 选项,可以实现只处理特定历史时间范围的数据,例如:只处理某历史时刻(2020年1月30日)之后的数据
```sql
create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > '2020-01-30' interval(10s)
```
再如,仅处理某时间段内的数据,结束时间可以是未来时间
```sql
create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > '2020-01-30' and ts < '2023-01-01' interval(10s)
```
流式计算创建的超级表有唯一的 tag 列 groupId,每个 partition 会被分配唯一 groupId。与 schemaless 写入一致,我们通过 MD5 计算子表名,并自动创建它
如果该流任务已经彻底过期,并且您不再想让它检测或处理数据,您可以手动删除它,被计算出的数据仍会被保留
## 删除流式计算
......
......@@ -68,7 +68,7 @@ typedef uint16_t tmsg_t;
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL);
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
}
static inline bool syncUtilUserCommit(tmsg_t msgType) {
......
......@@ -58,8 +58,7 @@ typedef enum {
#define QUERY_RSP_POLICY_QUICK 1
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
#define TEST_SHOW_REWRITE_MASK(m) (((m)&QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
......@@ -128,7 +127,7 @@ typedef struct SDBVgInfo {
int8_t hashMethod;
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs;
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
SArray* vgArray;
} SDBVgInfo;
......@@ -262,23 +261,26 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED || (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND)
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || \
(_code) == TSDB_CODE_VND_STOPPED || (_code) == TSDB_CODE_APP_IS_STARTING || (_code) == TSDB_CODE_APP_IS_STOPPING)
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND)
#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define NEED_REDIRECT_ERROR(_code) \
#define NEED_REDIRECT_ERROR(_code) \
(NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code))
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
((_type) == TDMT_VND_CREATE_TABLE || (_type) == TDMT_MND_CREATE_STB || (_type) == TDMT_VND_DROP_TABLE || \
(_type) == TDMT_MND_DROP_STB)
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
(SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code))
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
(SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \
SYNC_OTHER_LEADER_REDIRECT_ERROR(_code))
#define REQUEST_TOTAL_EXEC_TIMES 2
......
......@@ -342,6 +342,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3)
#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4)
#define TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB TAOS_DEF_ERROR_CODE(0, 0x03F5)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
......
......@@ -1035,7 +1035,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SCMSubscribeReq req = {0};
int32_t code = -1;
tscDebug("call tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz);
tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz);
req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, 256);
......@@ -1043,7 +1043,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
req.topicNames = taosArrayInit(sz, sizeof(void*));
if (req.topicNames == NULL) goto FAIL;
tscDebug("call tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz);
tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz);
for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(container, i);
......@@ -1570,7 +1570,6 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
}
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
/*tscDebug("call poll");*/
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
......@@ -1794,7 +1793,6 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
/*tscDebug("call poll1");*/
void* rspObj;
int64_t startTime = taosGetTimestampMs();
......
......@@ -214,6 +214,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
case SNODE:
terrno = TSDB_CODE_SNODE_NOT_FOUND;
break;
case VNODE:
terrno = TSDB_CODE_VND_STOPPED;
break;
default:
terrno = TSDB_CODE_APP_IS_STOPPING;
break;
......
......@@ -672,6 +672,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
pDnode = mndAcquireDnodeByEp(pMnode, ep);
if (pDnode != NULL) {
terrno = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
goto _OVER;
}
......
......@@ -663,7 +663,9 @@ _OVER:
const STraceId *trace = &pMsg->info.traceId;
SEpSet epSet = {0};
int32_t tmpCode = terrno;
mndGetMnodeEpSet(pMnode, &epSet);
terrno = tmpCode;
mGDebug(
"msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
......
......@@ -164,7 +164,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
STREAM_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw, terrstr());
mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw,
terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......@@ -624,6 +625,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
if (pDb->cfg.replications != 1) {
mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
mndReleaseDb(pMnode, pDb);
pDb = NULL;
goto _OVER;
}
mndReleaseDb(pMnode, pDb);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
......@@ -680,7 +691,6 @@ _OVER:
}
mndReleaseStream(pMnode, pStream);
mndReleaseDb(pMnode, pDb);
tFreeSCMCreateStreamReq(&createStreamReq);
tFreeStreamObj(&streamObj);
......
......@@ -927,7 +927,8 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
} else {
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_APP_IS_STARTING) {
if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY) {
if (pTrans->failedTimes > 60) sendRsp = true;
} else {
if (pTrans->failedTimes > 6) sendRsp = true;
......@@ -1336,6 +1337,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
}
if (mndCannotExecuteTransAction(pMnode)) return false;
terrno = code;
if (code == 0) {
pTrans->code = 0;
......
......@@ -1126,8 +1126,12 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
}
if (!force) {
#if 1
{
#else
if (newVg.replica == 1) {
mInfo("vgId:%d, will add 1 vnode, replca:1", pVgroup->vgId);
#endif
mInfo("vgId:%d, will add 1 vnode, replca:%d", pVgroup->vgId, newVg.replica);
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
for (int32_t i = 0; i < newVg.replica - 1; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
......@@ -1155,6 +1159,9 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
#if 1
}
#else
} else { // new replica == 3
mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
......@@ -1181,6 +1188,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
}
#endif
} else {
mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
......
......@@ -738,7 +738,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
if (code != 0) {
tqError("cannot process tq delete req %s, since no such offset", pReq->subKey);
tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
}
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
......
......@@ -115,7 +115,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1;
......@@ -253,7 +252,9 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
}
tjsonGetNumberValue(pJson, "tsdbPageSize", pCfg->tsdbPageSize, code);
if (code < 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE * 1024;
if (code < 0 || pCfg->tsdbPageSize < TSDB_MIN_PAGESIZE_PER_VNODE * 1024) {
pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE * 1024;
}
return 0;
}
......
......@@ -233,7 +233,7 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
rpcSendResponse(&rpcMsg);
return 0;
} else {
sInfo("no rpcinfo to send timeout response, seq:%" PRId64, seq);
sError("no message handle to send timeout response, seq:%" PRId64, seq);
return -1;
}
}
......
......@@ -35,11 +35,16 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
pObj->seqNum = 0;
taosThreadMutexInit(&(pObj->mutex), NULL);
SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, create resp manager", pNode->vgId);
return pObj;
}
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
if (pObj != NULL) {
SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, destroy resp manager", pNode->vgId);
taosThreadMutexLock(&pObj->mutex);
taosHashCleanup(pObj->pRespHash);
taosThreadMutexUnlock(&pObj->mutex);
......@@ -81,6 +86,8 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) {
taosThreadMutexUnlock(&pObj->mutex);
return 1; // get one object
} else {
sNError(pObj->data, "get message handle, no object of seq:%" PRIu64, seq);
}
taosThreadMutexUnlock(&pObj->mutex);
......@@ -99,6 +106,8 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p
taosThreadMutexUnlock(&pObj->mutex);
return 1; // get one object
} else {
sNError(pObj->data, "get-and-del message handle, no object of seq:%" PRIu64, seq);
}
taosThreadMutexUnlock(&pObj->mutex);
......@@ -114,7 +123,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
if (delIndexArray == NULL) return;
sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
sDebug("vgId:%d, resp manager begin clean by ttl", pSyncNode->vgId);
while (pStub) {
size_t len;
void *key = taosHashGetKey(pStub, &len);
......@@ -143,34 +152,39 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
// TODO: and make rpcMsg body, call commit cb
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta);
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
if (pStub->rpcMsg.info.handle != NULL) {
tmsgSendRsp(&pStub->rpcMsg);
}
SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT};
sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pSyncNode->vgId, rpcMsg.info.handle,
TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg);
}
pStub = taosHashIterate(pObj->pRespHash, pStub);
}
int32_t arraySize = taosArrayGetSize(delIndexArray);
sDebug("vgId:%d, resp mgr end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize);
sDebug("vgId:%d, resp manager end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize);
for (int32_t i = 0; i < arraySize; ++i) {
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
sDebug("vgId:%d, resp mgr clean by ttl, seq:%" PRId64 "", pSyncNode->vgId, *pSeqNum);
sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pSyncNode->vgId, *pSeqNum);
}
taosArrayDestroy(delIndexArray);
}
void syncRespCleanRsp(SSyncRespMgr *pObj) {
SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, clean all rsp", pNode->vgId);
taosThreadMutexLock(&pObj->mutex);
syncRespCleanByTTL(pObj, -1, true);
taosThreadMutexUnlock(&pObj->mutex);
}
void syncRespClean(SSyncRespMgr *pObj) {
SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, clean rsp by ttl", pNode->vgId);
taosThreadMutexLock(&pObj->mutex);
syncRespCleanByTTL(pObj, pObj->ttl, false);
taosThreadMutexUnlock(&pObj->mutex);
......
......@@ -1598,7 +1598,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED ||
code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_APP_IS_STOPPING) {
code == TSDB_CODE_APP_IS_STOPPING || code == TSDB_CODE_VND_STOPPED) {
tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
noDelay = cliResetEpset(pCtx, pResp, true);
transFreeMsg(pResp->pCont);
......
......@@ -464,7 +464,9 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
#if FILE_WITH_LOCK
taosThreadRwlockWrlock(&(pFile->rwlock));
#endif
assert(pFile->fd >= 0); // Please check if you have closed the file.
if (pFile->fd < 0) {
return 0;
}
int64_t nleft = count;
int64_t nwritten = 0;
......
......@@ -281,6 +281,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB, "Stream temporarily does not support source db having replica > 1")
// mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
......
......@@ -445,6 +445,9 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) {
static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *buffer, int32_t len) {
if ((dflag & DEBUG_FILE) && tsLogObj.logHandle && tsLogObj.logHandle->pFile != NULL && osLogSpaceAvailable()) {
taosUpdateLogNums(level);
#if 0
// DEBUG_FATAL and DEBUG_ERROR are duplicated
// fsync will cause thread blocking and may also generate log misalignment in case of asyncLog
if (tsAsyncLog && level != DEBUG_FATAL) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else {
......@@ -453,6 +456,13 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b
taosFsyncFile(tsLogObj.logHandle->pFile);
}
}
#else
if (tsAsyncLog) {
taosPushLogBuffer(tsLogObj.logHandle, buffer, len);
} else {
taosWriteFile(tsLogObj.logHandle->pFile, buffer, len);
}
#endif
if (tsLogObj.maxLines > 0) {
atomic_add_fetch_32(&tsLogObj.lines, 1);
......
......@@ -132,7 +132,7 @@ class TDTestCase:
tdSql.execute(f'drop database {self.dbname}')
def drop_stream_check(self):
tdSql.execute(f'create database {self.dbname} replica {self.replicaVar}')
tdSql.execute(f'create database {self.dbname} replica 1')
tdSql.execute(f'use {self.dbname}')
stbname = tdCom.getLongName(5,"letters")
stream_name = tdCom.getLongName(5,"letters")
......@@ -158,4 +158,4 @@ class TDTestCase:
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册