提交 57d14060 编写于 作者: L Liu Jicong

add tbname column into stream special block

上级 2b3ff125
...@@ -340,6 +340,7 @@ typedef struct SSortExecInfo { ...@@ -340,6 +340,7 @@ typedef struct SSortExecInfo {
#define GROUPID_COLUMN_INDEX 3 #define GROUPID_COLUMN_INDEX 3
#define CALCULATE_START_TS_COLUMN_INDEX 4 #define CALCULATE_START_TS_COLUMN_INDEX 4
#define CALCULATE_END_TS_COLUMN_INDEX 5 #define CALCULATE_END_TS_COLUMN_INDEX 5
#define TABLE_NAME_COLUMN_INDEX 6
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1316,8 +1316,8 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) { ...@@ -1316,8 +1316,8 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
pBlock->info.groupId = 0; pBlock->info.groupId = 0;
pBlock->info.rows = 0; pBlock->info.rows = 0;
pBlock->info.type = type; pBlock->info.type = type;
pBlock->info.rowSize = pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY); sizeof(TSKEY) + TSDB_TABLE_NAME_LEN;
pBlock->info.watermark = INT64_MIN; pBlock->info.watermark = INT64_MIN;
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
...@@ -1343,6 +1343,11 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) { ...@@ -1343,6 +1343,11 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
// calculate end ts // calculate end ts
taosArrayPush(pBlock->pDataBlock, &infoData); taosArrayPush(pBlock->pDataBlock, &infoData);
// table name
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = TSDB_TABLE_NAME_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock; return pBlock;
} }
......
...@@ -23,10 +23,19 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -23,10 +23,19 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
int32_t totRow = pDataBlock->info.rows; int32_t totRow = pDataBlock->info.rows;
SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
for (int32_t row = 0; row < totRow; row++) { for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row); int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
char* name = buildCtbNameByGroupId(stbFullName, groupId); char* name;
void* varTbName = colDataGetVarData(pTbNameCol, row);
if (varTbName != NULL && varTbName != (void*)-1) {
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
} else {
name = buildCtbNameByGroupId(stbFullName, groupId);
}
tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name); tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name);
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0); metaReaderInit(&mr, pVnode->pMeta, 0);
......
...@@ -1067,7 +1067,7 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); ...@@ -1067,7 +1067,7 @@ bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp); void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName);
void printDataBlock(SSDataBlock* pBlock, const char* flag); void printDataBlock(SSDataBlock* pBlock, const char* flag);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
......
...@@ -885,8 +885,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { ...@@ -885,8 +885,8 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1); ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0); SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR); ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
void* pData = colDataGetData(pCol, 0); void* pData = colDataGetVarData(pCol, 0);
// TODO check tbname validation // TODO check tbname validity
if (pData != (void*)-1) { if (pData != (void*)-1) {
memcpy(pDest->info.parTbName, varDataVal(pData), varDataLen(pData)); memcpy(pDest->info.parTbName, varDataVal(pData), varDataLen(pData));
} else { } else {
......
...@@ -1300,19 +1300,22 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, ...@@ -1300,19 +1300,22 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return code; return code;
} }
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp) { void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false); colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false); colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false); colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false); colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false); colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false); colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
colDataAppend(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
pBlock->info.rows++; pBlock->info.rows++;
} }
...@@ -1342,10 +1345,12 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ...@@ -1342,10 +1345,12 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
if ((update || closedWin) && out) { if ((update || closedWin) && out) {
qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
uint64_t gpId = 0; uint64_t gpId = 0;
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId); appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId,
NULL);
if (closedWin && pInfo->partitionSup.needCalc) { if (closedWin && pInfo->partitionSup.needCalc) {
gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId); gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId); appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId,
NULL);
} }
} }
} }
......
...@@ -1687,7 +1687,8 @@ static void freeAllPages(SArray* pageIds, SDiskbasedBuf* pDiskBuf) { ...@@ -1687,7 +1687,8 @@ static void freeAllPages(SArray* pageIds, SDiskbasedBuf* pDiskBuf) {
taosArrayClear(pageIds); taosArrayClear(pageIds);
} }
static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlock) { static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index,
SSDataBlock* pBlock) {
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
int32_t size = taosArrayGetSize(pWins); int32_t size = taosArrayGetSize(pWins);
if (*index == size) { if (*index == size) {
...@@ -1699,7 +1700,14 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo ...@@ -1699,7 +1700,14 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo
uint64_t uid = 0; uint64_t uid = 0;
for (int32_t i = *index; i < size; i++) { for (int32_t i = *index; i < size; i++) {
SWinKey* pWin = taosArrayGet(pWins, i); SWinKey* pWin = taosArrayGet(pWins, i);
appendOneRow(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId); char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pWin->groupId, sizeof(int64_t));
if (tbname == NULL) {
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
} else {
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
}
(*index)++; (*index)++;
} }
} }
...@@ -3239,7 +3247,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3239,7 +3247,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pPullDataRes; return pInfo->pPullDataRes;
} }
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
...@@ -3265,7 +3273,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3265,7 +3273,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return NULL; return NULL;
} else { } else {
if (!IS_FINAL_OP(pInfo)) { if (!IS_FINAL_OP(pInfo)) {
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
...@@ -3392,7 +3400,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -3392,7 +3400,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pPullDataRes; return pInfo->pPullDataRes;
} }
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) { if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data // process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
...@@ -4849,7 +4857,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4849,7 +4857,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
i, &allEqual, pStDeleted); i, &allEqual, pStDeleted);
if (!allEqual) { if (!allEqual) {
uint64_t uid = 0; uint64_t uid = 0;
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, &uid, &groupId); appendOneRowToStreamSpecialBlock(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
&uid, &groupId, NULL);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
continue; continue;
...@@ -5645,7 +5654,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5645,7 +5654,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single interval delete"); printDataBlock(pInfo->pDelRes, "single interval delete");
return pInfo->pDelRes; return pInfo->pDelRes;
...@@ -5729,7 +5738,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -5729,7 +5738,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
taosHashCleanup(pUpdatedMap); taosHashCleanup(pUpdatedMap);
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single interval delete"); printDataBlock(pInfo->pDelRes, "single interval delete");
return pInfo->pDelRes; return pInfo->pDelRes;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册