提交 c191f44b 编写于 作者: L Liu Jicong

feat(stream): session window trigger delete

上级 7f4b46e0
...@@ -108,7 +108,7 @@ typedef struct SDataBlockInfo { ...@@ -108,7 +108,7 @@ typedef struct SDataBlockInfo {
int32_t childId; // used for stream, do not serialize int32_t childId; // used for stream, do not serialize
EStreamType type; // used for stream, do not serialize EStreamType type; // used for stream, do not serialize
STimeWindow calWin; // used for stream, do not serialize STimeWindow calWin; // used for stream, do not serialize
TSKEY watermark;// used for stream TSKEY watermark; // used for stream
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SSDataBlock { typedef struct SSDataBlock {
...@@ -268,6 +268,15 @@ typedef struct SSortExecInfo { ...@@ -268,6 +268,15 @@ typedef struct SSortExecInfo {
int32_t readBytes; // read io bytes int32_t readBytes; // read io bytes
} SSortExecInfo; } SSortExecInfo;
// stream special block column
#define START_TS_COLUMN_INDEX 0
#define END_TS_COLUMN_INDEX 1
#define UID_COLUMN_INDEX 2
#define GROUPID_COLUMN_INDEX 3
#define CALCULATE_START_TS_COLUMN_INDEX 4
#define CALCULATE_END_TS_COLUMN_INDEX 5
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -171,7 +171,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); ...@@ -171,7 +171,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list); int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq); const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq);
// sma // sma
......
...@@ -201,7 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { ...@@ -201,7 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
} }
SBatchDeleteReq deleteReq; SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq = tdBlockToSubmit((const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid, SSubmitReq *pSubmitReq =
tdBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq); pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
if (!pSubmitReq) { if (!pSubmitReq) {
......
...@@ -13,10 +13,44 @@ ...@@ -13,10 +13,44 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tcommon.h"
#include "tmsg.h"
#include "tq.h" #include "tq.h"
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
const char* stbFullName, int32_t vgId, SBatchDeleteReq* deleteReq) { SBatchDeleteReq* deleteReq) {
ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
int32_t totRow = pDataBlock->info.rows;
SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
/*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/
int64_t groupId = 0;
char* name = buildCtbNameByGroupId(stbFullName, groupId);
tqDebug("delete msg: groupId :%ld, name: %s", groupId, name);
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, name) < 0) {
metaReaderClear(&mr);
taosMemoryFree(name);
return -1;
}
int64_t uid = mr.me.uid;
metaReaderClear(&mr);
taosMemoryFree(name);
SSingleDeleteReq req = {
.ts = ts,
.uid = uid,
};
taosArrayPush(deleteReq->deleteReqs, &req);
}
return 0;
}
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL; SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL; SArray* schemaReqSz = NULL;
...@@ -33,9 +67,13 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -33,9 +67,13 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
schemaReqSz = taosArrayInit(sz, sizeof(int32_t)); schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_DATA) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
// int32_t padding1 = 0;
void* padding2 = taosMemoryMalloc(1);
taosArrayPush(schemaReqSz, &padding1);
taosArrayPush(schemaReqs, &padding2);
} }
STagVal tagVal = { STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT, .type = TSDB_DATA_TYPE_UBIGINT,
...@@ -97,6 +135,9 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -97,6 +135,9 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t cap = sizeof(SSubmitReq); int32_t cap = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
continue;
}
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
// TODO min // TODO min
int32_t rowSize = pDataBlock->info.rowSize; int32_t rowSize = pDataBlock->info.rowSize;
...@@ -119,6 +160,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -119,6 +160,11 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid;
tdBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
continue;
}
blkHead->numOfRows = htonl(pDataBlock->info.rows); blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version); blkHead->sversion = htonl(pTSchema->version);
...@@ -187,7 +233,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -187,7 +233,7 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq); pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
...@@ -200,12 +246,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -200,12 +246,14 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(0); ASSERT(0);
} }
SEncoder encoder; SEncoder encoder;
void* buf = taosMemoryCalloc(1, len + sizeof(SMsgHead)); void* buf = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq); tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder); tEncoderClear(&encoder);
((SMsgHead*)buf)->vgId = pVnode->config.vgId;
if (taosArrayGetSize(deleteReq.deleteReqs) != 0) { if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
SRpcMsg msg = { SRpcMsg msg = {
.msgType = TDMT_VND_BATCH_DEL, .msgType = TDMT_VND_BATCH_DEL,
......
...@@ -145,7 +145,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -145,7 +145,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
int32_t len; int32_t len;
int32_t ret; int32_t ret;
vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
version); version);
pVnode->state.applied = version; pVnode->state.applied = version;
...@@ -1071,6 +1071,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void ...@@ -1071,6 +1071,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
// TODO // TODO
} }
} }
taosArrayDestroy(deleteReq.deleteReqs);
return 0; return 0;
} }
......
...@@ -52,13 +52,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int ...@@ -52,13 +52,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
#define START_TS_COLUMN_INDEX 0
#define END_TS_COLUMN_INDEX 1
#define UID_COLUMN_INDEX 2
#define GROUPID_COLUMN_INDEX 3
#define CALCULATE_START_TS_COLUMN_INDEX 4
#define CALCULATE_END_TS_COLUMN_INDEX 5
enum { enum {
// when this task starts to execute, this status will set // when this task starts to execute, this status will set
TASK_NOT_COMPLETED = 0x1u, TASK_NOT_COMPLETED = 0x1u,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册