提交 8326bc73 编写于 作者: K kailixu

chore: rsma delete req process

上级 78bdb043
...@@ -1897,6 +1897,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat ...@@ -1897,6 +1897,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
} }
} }
uInfo("prop:%s:%d pDataBlock->info.type is %d", __func__, __LINE__, pDataBlock->info.type);
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
......
...@@ -136,8 +136,8 @@ struct SSmaStat { ...@@ -136,8 +136,8 @@ struct SSmaStat {
#define RSMA_FS_LOCK(r) (&(r)->lock) #define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem { struct SRSmaInfoItem {
int8_t level : 4; int8_t level;
int8_t fetchLevel : 4; int8_t fetchLevel;
int8_t triggerStat; int8_t triggerStat;
uint16_t nScanned; uint16_t nScanned;
int32_t maxDelay; // ms int32_t maxDelay; // ms
......
...@@ -271,6 +271,7 @@ int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); ...@@ -271,6 +271,7 @@ int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType); int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType);
int32_t tdProcessRSmaDelete(SSma* pSma, int64_t ver, void* pReq, int32_t len, tb_uid_t suid);
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
......
...@@ -256,7 +256,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -256,7 +256,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
taosMemoryFree(s); taosMemoryFree(s);
} }
SStreamTask task = {.id.taskId = 0, .id.streamId = 0}; // TODO: assign value SStreamTask task = {.id.taskId = idx + 1, .id.streamId = pRSmaInfo->suid};
task.pMeta = pVnode->pTq->pStreamMeta; task.pMeta = pVnode->pTq->pStreamMeta;
pStreamState = streamStateOpen(taskInfDir, &task, true, -1, -1); pStreamState = streamStateOpen(taskInfDir, &task, true, -1, -1);
if (!pStreamState) { if (!pStreamState) {
...@@ -619,6 +619,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma ...@@ -619,6 +619,10 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
SSubmitReq2 *pReq = NULL; SSubmitReq2 *pReq = NULL;
char *pBuf = NULL;
smaDebug("%s", dumpBlockData(output, "prop:", &pBuf));
taosMemoryFree(pBuf);
// TODO: the schema update should be handled later(TD-17965) // TODO: the schema update should be handled later(TD-17965)
if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) {
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
...@@ -917,6 +921,34 @@ _err: ...@@ -917,6 +921,34 @@ _err:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t tdProcessRSmaDelete(SSma *pSma, int64_t ver, void *pReq, int32_t len, tb_uid_t suid) {
SRSmaInfo *pInfo = NULL;
if (!(pInfo = tdAcquireRSmaInfoBySuid(pSma, suid))) {
return TSDB_CODE_SUCCESS;
}
// for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
// SRSmaInfoItem *pItem = &pInfo->items[i];
// if (pItem) {
// atomic_store_8(&pItem->fetchLevel, pItem->level);
// }
// }
SStreamRefDataBlock *pItem = NULL;
extractDelDataBlock(pReq, len, ver, (SStreamRefDataBlock **)pItem);
// if (atomic_load_8(&pInfo->assigned) == 0) {
// SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
// SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
// tsem_post(&(pStat->notEmpty));
// }
tdReleaseRSmaInfo(pSma, pInfo);
return TSDB_CODE_SUCCESS;
}
/** /**
* @brief retrieve rsma meta and init * @brief retrieve rsma meta and init
* *
...@@ -1166,7 +1198,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1166,7 +1198,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
// async process // async process
pItem->fetchLevel = pItem->level; atomic_store_8(&pItem->fetchLevel,pItem->level);
#if 0 #if 0
// debugging codes // debugging codes
SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid); SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid);
...@@ -1218,8 +1250,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { ...@@ -1218,8 +1250,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
if (pItem->fetchLevel) { if (atomic_load_8(&pItem->fetchLevel)) {
pItem->fetchLevel = 0; atomic_store_8(&pItem->fetchLevel, 0);
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1); qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
if (!taskInfo) { if (!taskInfo) {
continue; continue;
...@@ -1410,7 +1442,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ...@@ -1410,7 +1442,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if (pEnv->flag & SMA_ENV_FLG_CLOSE) { if (pEnv->flag & SMA_ENV_FLG_CLOSE) {
break; break;
} }
smaInfo("prop:%s:%d tsem_wait(&pRSmaStat->notEmpty)", __func__, __LINE__);
tsem_wait(&pRSmaStat->notEmpty); tsem_wait(&pRSmaStat->notEmpty);
if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) {
......
...@@ -1768,6 +1768,8 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in ...@@ -1768,6 +1768,8 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
tDecoderClear(pCoder); tDecoderClear(pCoder);
taosArrayDestroy(pRes->uidList); taosArrayDestroy(pRes->uidList);
tdProcessRSmaDelete(pVnode->pSma, ver, pReq, len, pRes->suid);
SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows}; SVDeleteRsp rsp = {.affectedRows = pRes->affectedRows};
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret); tEncodeSize(tEncodeSVDeleteRsp, &rsp, pRsp->contLen, ret);
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database with retentions
sql create database d0 retentions 5s:7d,10s:21d,15s:365d;
sql use d0
print =============== create super table and register rsma
sql create table if not exists stb (ts timestamp, c1 int) tags (city binary(20),district binary(20)) rollup(min) max_delay 3s,3s watermark 2s,3s;;
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang");
sql show tables
if $rows != 1 then
return -1
endi
print =============== insert data and trigger rollup
sql insert into ct1 values(now, 10);
sql insert into ct1 values(now+1s, 1);
sql insert into ct1 values(now+2s, 100);
print =============== wait maxdelay 3+2 seconds for results
sleep 5000
print =============== select * from retention level 2 from memory
sql select * from ct1;
print $data00 $data01
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
if $data01 != 1 then
if $data01 != 10 then
print =============> $data01
print retention level 2 file result $data01 != 1 or 10
return -1
endi
endi
print =============== select * from retention level 1 from memory
sql select * from ct1 where ts > now-8d;
print $data00 $data01
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
if $data01 != 1 then
if $data01 != 10 then
print retention level 1 file result $data01 != 1 or 10
return -1
endi
endi
print =============== select * from retention level 0 from memory
sql select * from ct1 where ts > now-3d order by ts;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file result $data01 != 10
return -1
endi
print =============== delete 1 item from table ct1
sql delete from ct1 where ts > now - 1d;
print =============== select * from retention level 0 from memory after delete
sql select * from ct1 where ts > now-3d order by ts;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows != 0 then
print retention level 0 file rows $rows != 0
return -1
endi
print =============== wait maxdelay 3+2 seconds for results
sleep 5000
print =============== select * from retention level 1 from memory after delete
sql select * from ct1 where ts > now-8d order by ts;
print $data00 $data01
print $data10 $data11
if $rows != 0 then
print retention level 1 file rows $rows != 0
return -1
endi
print =============== select * from retention level 2 from memory after delete
sql select * from ct1;
print $data00 $data01
print $data10 $data11
if $rows != 0 then
print retention level 1 file rows $rows != 0
return -1
endi
return 1
#===================================================================
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== wait maxdelay 3+2 seconds for results after reboot
sleep 5000
print =============== select * from retention level 2 from memory after reboot
sql select * from ct1;
print $data00 $data01
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
if $data01 != 1 then
if $data01 != 10 then
print =============> $data01
print retention level 2 file result $data01 != 1 or 10
return -1
endi
endi
print =============== select * from retention level 1 from memory after reboot
sql select * from ct1 where ts > now-8d;
print $data00 $data01
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
if $data01 != 1 then
if $data01 != 10 then
print retention level 1 file result $data01 != 1 or 10
return -1
endi
endi
print =============== select * from retention level 0 from memory after reboot
sql select * from ct1 where ts > now-3d;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file result $data01 != 10
return -1
endi
#==================== flush database to trigger commit data to file
sql flush database d0;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== select * from retention level 2 from file
sql select * from ct1;
print $data00 $data01
if $rows > 2 then
print retention level 2 file rows $rows > 2
return -1
endi
if $data01 != 1 then
if $data01 != 10 then
print retention level 2 file result $data01 != 1 or 10
return -1
endi
endi
print =============== select * from retention level 1 from file
sql select * from ct1 where ts > now-8d;
print $data00 $data01
if $rows > 2 then
print retention level 1 file rows $rows > 2
return -1
endi
if $data01 != 1 then
if $data01 != 10 then
print retention level 1 file result $data01 != 1 or 10
return -1
endi
endi
print =============== select * from retention level 0 from file
sql select * from ct1 where ts > now-3d;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows < 1 then
print retention level 0 file rows $rows < 1
return -1
endi
if $data01 != 10 then
print retention level 0 file result $data01 != 10
return -1
endi
print =============== delete raw data
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册