未验证 提交 586bea29 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19465 from taosdata/feat/ly_stream

feat(stream):user define tag
......@@ -162,6 +162,7 @@ typedef enum EStreamType {
STREAM_PULL_DATA,
STREAM_PULL_OVER,
STREAM_FILL_OVER,
STREAM_CREATE_CHILD_TABLE,
} EStreamType;
#pragma pack(push, 1)
......@@ -205,8 +206,6 @@ typedef struct SDataBlockInfo {
TSKEY watermark; // used for stream
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
int32_t tagLen;
void* pTag; // used for stream partition
} SDataBlockInfo;
typedef struct SSDataBlock {
......@@ -379,6 +378,11 @@ typedef struct SSortExecInfo {
#define CALCULATE_END_TS_COLUMN_INDEX 5
#define TABLE_NAME_COLUMN_INDEX 6
// stream create table block column
#define UD_TABLE_NAME_COLUMN_INDEX 0
#define UD_GROUPID_COLUMN_INDEX 1
#define UD_TAG_COLUMN_INDEX 2
#ifdef __cplusplus
}
#endif
......
......@@ -1293,7 +1293,6 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
taosArrayDestroy(pBlock->pDataBlock);
pBlock->pDataBlock = NULL;
taosMemoryFreeClear(pBlock->pBlockAgg);
taosMemoryFree(pBlock->info.pTag);
memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
}
......@@ -1961,10 +1960,10 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len,
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "\n",
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey);
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
if (len >= size - 1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) {
......
......@@ -1331,9 +1331,9 @@ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
}
n += tPutTagVal(p + n, (STagVal *)taosArrayGet(pArray, iTag), isJson);
}
#ifdef TD_DEBUG_PRINT_TAG
// #ifdef TD_DEBUG_PRINT_TAG
debugPrintSTag(*ppTag, __func__, __LINE__);
#endif
// #endif
return code;
......
......@@ -167,6 +167,10 @@ void tFreeStreamObj(SStreamObj *pStream) {
taosArrayDestroy(pLevel);
}
taosArrayDestroy(pStream->tasks);
// tagSchema.pSchema
if (pStream->tagSchema.nCols > 0) {
taosMemoryFree(pStream->tagSchema.pSchema);
}
}
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
......
......@@ -465,7 +465,6 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
createReq.numOfColumns = pStream->outputSchema.nCols;
createReq.numOfTags = 1; // group id
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
// build fields
taosArraySetSize(createReq.pColumns, createReq.numOfColumns);
......@@ -476,14 +475,29 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
pField->type = pStream->outputSchema.pSchema[i].type;
pField->bytes = pStream->outputSchema.pSchema[i].bytes;
}
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
taosArraySetSize(createReq.pTags, 1);
// build tags
SField *pField = taosArrayGet(createReq.pTags, 0);
strcpy(pField->name, "group_id");
pField->type = TSDB_DATA_TYPE_UBIGINT;
pField->flags = 0;
pField->bytes = 8;
if (pStream->tagSchema.nCols == 0) {
createReq.numOfTags = 1;
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
taosArraySetSize(createReq.pTags, createReq.numOfTags);
// build tags
SField *pField = taosArrayGet(createReq.pTags, 0);
strcpy(pField->name, "group_id");
pField->type = TSDB_DATA_TYPE_UBIGINT;
pField->flags = 0;
pField->bytes = 8;
} else {
createReq.numOfTags = pStream->tagSchema.nCols;
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
taosArraySetSize(createReq.pTags, createReq.numOfTags);
for (int32_t i = 0; i < createReq.numOfTags; i++) {
SField *pField = taosArrayGet(createReq.pTags, i);
pField->bytes = pStream->tagSchema.pSchema[i].bytes;
pField->flags = pStream->tagSchema.pSchema[i].flags;
pField->type = pStream->tagSchema.pSchema[i].type;
tstrncpy(pField->name, pStream->tagSchema.pSchema[i].name, TSDB_COL_NAME_LEN);
}
}
if (mndCheckCreateStbReq(&createReq) != 0) {
goto _OVER;
......
......@@ -323,6 +323,70 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
taosArrayDestroy(tagArray);
}
static int32_t encodeCreateChildTableForRPC(SVCreateTbReq* req, int32_t vgId, void** pBuf, int32_t* contLen) {
int32_t ret = 0;
SVCreateTbBatchReq reqs = {0};
reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
if (NULL == reqs.pArray) {
ret = -1;
goto end;
}
taosArrayPush(reqs.pArray, req);
reqs.nReqs = 1;
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqs, *contLen, ret);
if (ret < 0) {
ret = -1;
goto end;
}
*contLen += sizeof(SMsgHead);
*pBuf = rpcMallocCont(*contLen);
if (NULL == *pBuf) {
ret = -1;
goto end;
}
((SMsgHead*)(*pBuf))->vgId = vgId;
((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
SEncoder coder = {0};
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) );
if (tEncodeSVCreateTbBatchReq(&coder, &reqs) < 0) {
rpcFreeCont(*pBuf);
*pBuf = NULL;
*contLen = 0;
tEncoderClear(&coder);
ret = -1;
goto end;
}
tEncoderClear(&coder);
end:
taosArrayDestroy(reqs.pArray);
return ret;
}
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbReq* pReq) {
void* buf = NULL;
int32_t tlen = 0;
encodeCreateChildTableForRPC(pReq, TD_VID(pVnode), &buf, &tlen);
SRpcMsg msg = {
.msgType = TDMT_VND_CREATE_TABLE,
.pCont = buf,
.contLen = tlen,
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqError("failed to put into write-queue since %s", terrstr());
}
return TSDB_CODE_SUCCESS;
_error:
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr());
return TSDB_CODE_OUT_OF_MEMORY;
}
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
......@@ -339,12 +403,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SArray* tagArray = NULL;
SArray* pVals = NULL;
if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) {
goto _end;
}
for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
SBatchDeleteReq deleteReq = {0};
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
......@@ -380,9 +441,98 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
} else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq));
if (!pCreateTbReq) {
goto _end;
}
// set const
pCreateTbReq->flags = 0;
pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
// set tag content
int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
if (size == 2) {
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
goto _end;
}
STagVal tagVal = {
.cid = pTSchema->numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
taosArrayPush(tagArray, &tagVal);
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName;
} else {
tagArray = taosArrayInit(size - 1, sizeof(STagVal));
if (!tagArray) {
goto _end;
}
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
STagVal tagVal = {
.cid = pTSchema->numOfCols + step,
.type = pTagData->info.type,
};
void* pData = colDataGetData(pTagData, rowId);
if (colDataIsNull_s(pTagData, rowId)) {
tagVal.type = TSDB_DATA_TYPE_NULL;
tagVal.pData = NULL;
tagVal.nData = 0;
} else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
tagVal.nData = varDataLen(pData);
tagVal.pData = varDataVal(pData);
} else {
memcpy(&tagVal.i64, pData, pTagData->info.bytes);
}
taosArrayPush(tagArray, &tagVal);
}
}
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set table name
SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
if (colDataIsNull_s(pTbColInfo, rowId)) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
} else {
void* pTbData = colDataGetData(pTbColInfo, rowId);
pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1);
memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData));
}
if (tqPutReqToQueue(pVnode, pCreateTbReq) != TSDB_CODE_SUCCESS) {
goto _end;
}
tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
}
} else {
SSubmitTbData tbData = {0};
int32_t rows = pDataBlock->info.rows;
tqDebug("tq sink pipe2, convert block1 %d, rows: %d", i, rows);
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
......@@ -394,6 +544,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tbData.sver = pTSchema->version;
char* ctbName = NULL;
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), pDataBlock->info.parTbName);
if (pDataBlock->info.parTbName[0]) {
ctbName = strdup(pDataBlock->info.parTbName);
} else {
......@@ -423,7 +574,10 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
pCreateTbReq->ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
// set tag content
taosArrayClear(tagArray);
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
goto _end;
}
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
......@@ -434,6 +588,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
......
......@@ -474,9 +474,11 @@ typedef struct SStreamScanInfo {
SNode* pTagIndexCond;
// recover
int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt;
int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt;
SSDataBlock* pRecoverRes;
SSDataBlock* pCreateTbRes;
} SStreamScanInfo;
typedef struct {
......@@ -567,6 +569,8 @@ typedef struct SStreamIntervalOperatorInfo {
SStreamState* pState;
SWinKey delKey;
uint64_t numOfDatapack;
SArray* pUpdated;
SHashObj* pUpdatedMap;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
......@@ -613,6 +617,8 @@ typedef struct SStreamSessionAggOperatorInfo {
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
SArray* pUpdated;
SSHashObj* pStUpdated;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
......@@ -628,6 +634,8 @@ typedef struct SStreamStateAggOperatorInfo {
void* pDelIterator;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
SArray* pUpdated;
SSHashObj* pSeUpdated;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
......@@ -638,9 +646,11 @@ typedef struct SStreamPartitionOperatorInfo {
SExprSupp tagCalSup;
SHashObj* pPartitions;
void* parIte;
void* pTbNameIte;
SSDataBlock* pInputDataBlock;
int32_t tsColIndex;
SSDataBlock* pDelRes;
SSDataBlock* pCreateTbRes;
} SStreamPartitionOperatorInfo;
typedef struct SStreamFillSupporter {
......@@ -762,7 +772,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo);
......@@ -838,7 +848,6 @@ bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pS
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
......@@ -857,6 +866,11 @@ void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t ord
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
int32_t order, int64_t* pData);
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
#ifdef __cplusplus
}
......
......@@ -1424,6 +1424,18 @@ void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
createExprFromOneNode(pExp, pTargetNode->pExpr, pTargetNode->slotId);
}
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs) {
*numOfExprs = LIST_LENGTH(pNodeList);
SExprInfo* pExprs = taosMemoryCalloc(*numOfExprs, sizeof(SExprInfo));
for (int32_t i = 0; i < (*numOfExprs); ++i) {
SExprInfo* pExp = &pExprs[i];
createExprFromOneNode(pExp, nodesListGetNode(pNodeList, i), i + UD_TAG_COLUMN_INDEX);
}
return pExprs;
}
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) {
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
int32_t numOfGroupKeys = 0;
......
......@@ -2260,9 +2260,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type);
pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo, isStream);
pOptr = createIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
pOptr = createStreamIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) {
......
......@@ -917,6 +917,7 @@ uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, S
}
static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo->parIte != NULL; }
static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; }
static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
......@@ -937,40 +938,13 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
colDataAppend(pDestCol, pDest->info.rows, pSrcData, isNull);
}
pDest->info.rows++;
if (pInfo->tbnameCalSup.numOfExprs > 0 && i == 0) {
void* tbname = NULL;
if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) {
memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
tdbFree(tbname);
} else {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrc, rowIndex);
SSDataBlock* pResBlock = createDataBlock();
pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
taosArrayPush(pResBlock->pDataBlock, &data);
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL);
ASSERT(pResBlock->info.rows == 1);
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
void* pData = colDataGetVarData(pCol, 0);
// TODO check tbname validity
if (pData != (void*)-1) {
memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(pDest->info.parTbName, varDataVal(pData), len);
/*pDest->info.parTbName[len + 1] = 0;*/
} else {
pDest->info.parTbName[0] = 0;
}
if (pParInfo->groupId && pDest->info.parTbName[0]) {
streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName);
}
/*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/
blockDataDestroy(pTmpBlock);
blockDataDestroy(pResBlock);
}
}
pDest->info.parTbName[0] = 0;
if (pInfo->tbnameCalSup.numOfExprs > 0) {
void* tbname = NULL;
if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) {
memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
tdbFree(tbname);
}
}
taosArrayDestroy(pParInfo->rowIds);
......@@ -986,6 +960,60 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
return pDest;
}
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) {
void* pValue = NULL;
if (groupId != 0 && streamStateGetParName(pState, groupId, &pValue) != 0) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
if (pTableSup->numOfExprs > 0) {
projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL);
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1);
char* tbName = pSrcBlock->info.parTbName;
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(tbName, varDataVal(pData), len);
streamStatePutParName(pState, groupId, tbName);
pDestBlock->info.rows--;
} else {
void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
colDataAppendNULL(pTbNameCol, pDestBlock->info.rows);
pSrcBlock->info.parTbName[0] = 0;
}
if (pTagSup->numOfExprs > 0) {
projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL);
pDestBlock->info.rows--;
}
void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
colDataAppend(pGpIdCol, pDestBlock->info.rows, (const char*)&groupId, false);
pDestBlock->info.rows++;
blockDataDestroy(pTmpBlock);
}
streamStateReleaseBuf(pState, NULL, pValue);
}
static SSDataBlock* buildStreamCreateTableResult(SOperatorInfo* pOperator) {
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
if ( (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) || taosHashGetSize(pInfo->pPartitions) == 0) {
return NULL;
}
blockDataCleanup(pInfo->pCreateTbRes);
blockDataEnsureCapacity(pInfo->pCreateTbRes, taosHashGetSize(pInfo->pPartitions));
SSDataBlock* pSrc = pInfo->pInputDataBlock;
while (pInfo->pTbNameIte != NULL) {
SPartitionDataInfo* pParInfo = (SPartitionDataInfo*)pInfo->pTbNameIte;
int32_t rowId = *(int32_t*) taosArrayGet(pParInfo->rowIds, 0);
appendCreateTableRow(pOperator->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
pParInfo->groupId, pSrc, rowId, pInfo->pCreateTbRes);
pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, pInfo->pTbNameIte);
}
return pInfo->pCreateTbRes->info.rows > 0 ? pInfo->pCreateTbRes : NULL;
}
static void doStreamHashPartitionImpl(SStreamPartitionOperatorInfo* pInfo, SSDataBlock* pBlock) {
pInfo->pInputDataBlock = pBlock;
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
......@@ -1012,6 +1040,15 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pCtRes = NULL;
if (hasRemainTbName(pInfo)) {
pCtRes = buildStreamCreateTableResult(pOperator);
if (pCtRes != NULL) {
return pCtRes;
}
}
if (hasRemainPartion(pInfo)) {
return buildStreamPartitionResult(pOperator);
}
......@@ -1039,6 +1076,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
return pInfo->pDelRes;
} break;
default:
ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE, "invalid SSDataBlock type");
return pBlock;
}
......@@ -1056,6 +1094,11 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
pInfo->parIte = taosHashIterate(pInfo->pPartitions, NULL);
pInfo->pTbNameIte = taosHashIterate(pInfo->pPartitions, NULL);
pCtRes = buildStreamCreateTableResult(pOperator);
if (pCtRes != NULL) {
return pCtRes;
}
return buildStreamPartitionResult(pOperator);
}
......@@ -1076,6 +1119,7 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
cleanupExprSupp(&pInfo->tagCalSup);
blockDataDestroy(pInfo->pDelRes);
taosHashCleanup(pInfo->pPartitions);
blockDataDestroy(pInfo->pCreateTbRes);
taosMemoryFreeClear(param);
}
......@@ -1091,6 +1135,46 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
}
}
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
pBlock->info.hasVarCol = false;
pBlock->info.id.groupId = 0;
pBlock->info.rows = 0;
pBlock->info.type = STREAM_CREATE_CHILD_TABLE;
pBlock->info.watermark = INT64_MIN;
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
if (tbName->numOfExprs > 0) {
infoData.info.bytes = tbName->pExprInfo->base.resSchema.bytes;
} else {
infoData.info.bytes = 1;
}
pBlock->info.rowSize += infoData.info.bytes;
// sub table name
taosArrayPush(pBlock->pDataBlock, &infoData);
SColumnInfoData gpIdData = {0};
gpIdData.info.type = TSDB_DATA_TYPE_UBIGINT;
gpIdData.info.bytes = 8;
pBlock->info.rowSize += gpIdData.info.bytes;
// group id
taosArrayPush(pBlock->pDataBlock, &gpIdData);
for (int32_t i = 0; i < tag->numOfExprs; i++) {
SColumnInfoData tagCol = {0};
tagCol.info.type = tag->pExprInfo[i].base.resSchema.type;
tagCol.info.bytes = tag->pExprInfo[i].base.resSchema.bytes;
tagCol.info.precision = tag->pExprInfo[i].base.resSchema.precision;
// tag info
taosArrayPush(pBlock->pDataBlock, &tagCol);
pBlock->info.rowSize += tagCol.info.bytes;
}
return pBlock;
}
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo) {
SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
......@@ -1110,6 +1194,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
}
}
pInfo->tbnameCalSup.numOfExprs = 0;
if (pPartNode->pSubtable != NULL) {
SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
if (pSubTableExpr == NULL) {
......@@ -1124,9 +1209,10 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
}
}
pInfo->tagCalSup.numOfExprs = 0;
if (pPartNode->pTags != NULL) {
int32_t numOfTags;
SExprInfo* pTagExpr = createExprInfo(pPartNode->pTags, NULL, &numOfTags);
SExprInfo* pTagExpr = createExpr(pPartNode->pTags, &numOfTags);
if (pTagExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
......@@ -1137,6 +1223,12 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
}
}
if (pInfo->tbnameCalSup.numOfExprs != 0 || pInfo->tagCalSup.numOfExprs != 0) {
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
} else {
pInfo->pCreateTbRes = NULL;
}
int32_t keyLen = 0;
code = initGroupOptrInfo(&pInfo->partitionSup.pGroupColVals, &keyLen, &pInfo->partitionSup.keyBuf,
pInfo->partitionSup.pGroupCols);
......@@ -1153,6 +1245,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
blockDataEnsureCapacity(pInfo->binfo.pRes, 4096);
pInfo->parIte = NULL;
pInfo->pTbNameIte = NULL;
pInfo->pInputDataBlock = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
......
......@@ -278,7 +278,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// for stream interval
if (pBlock->info.type == STREAM_RETRIEVE || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_DELETE_DATA) {
pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
// printDataBlock1(pBlock, "project1");
return pBlock;
}
......
......@@ -1361,54 +1361,16 @@ void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
}
#endif
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
if (pBlock == NULL || pBlock->info.rows == 0) return;
void* tbname = NULL;
if (streamStateGetParName(pState, pBlock->info.id.groupId, &tbname) == 0) {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
tdbFree(tbname);
return;
} else {
blockDataCleanup(pInfo->pCreateTbRes);
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
pBlock->info.parTbName[0] = 0;
}
tdbFree(tbname);
SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
ASSERT(pSrcBlock->info.rows == 1);
SSDataBlock* pResBlock = createDataBlock();
pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
taosArrayPush(pResBlock->pDataBlock, &data);
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL);
ASSERT(pResBlock->info.rows == 1);
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
void* pData = colDataGetData(pCol, 0);
// TODO check tbname validation
if (pData != (void*)-1 && pData != NULL) {
memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(pBlock->info.parTbName, varDataVal(pData), len);
/*pBlock->info.parTbName[len + 1] = 0;*/
} else {
pBlock->info.parTbName[0] = 0;
appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes);
}
if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
streamStatePutParName(pState, pBlock->info.id.groupId, pBlock->info.parTbName);
}
blockDataDestroy(pSrcBlock);
blockDataDestroy(pResBlock);
}
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
......@@ -1710,47 +1672,30 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
}
}
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey) {
if (pInfo->pUpdateInfo) {
checkUpdateData(pInfo, true, pInfo->pRes, true);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
if (pInfo->pUpdateDataRes->info.rows > 0) {
pInfo->updateResIndex = 0;
if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
// return pInfo->pUpdateDataRes;
} else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
}
}
}
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
qDebug("stream scan called");
#if 0
SStreamState* pState = pTaskInfo->streamInfo.pState;
if (pState) {
printf(">>>>>>>> stream write backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char tmp[100] = "abcdefg1";
if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
ASSERT(0);
}
key.ts = 2;
char tmp2[100] = "abcdefg2";
if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
ASSERT(0);
}
key.groupId = 5;
key.ts = 1;
char tmp3[100] = "abcdefg3";
if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
ASSERT(0);
}
char* val2 = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val2, sz);
streamFreeVal(val2);
}
#endif
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
......@@ -1784,17 +1729,32 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->blockRecoverContiCnt = 0;
return NULL;
}
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
if (pBlock != NULL) {
switch (pInfo->scanMode) {
case STREAM_SCAN_FROM_RES: {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
printDataBlock(pInfo->pRecoverRes, "scan recover");
return pInfo->pRecoverRes;
} break;
default:
break;
}
pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
if (pInfo->pRecoverRes != NULL) {
pInfo->blockRecoverContiCnt++;
calBlockTbName(pInfo, pBlock);
calBlockTbName(pInfo, pInfo->pRecoverRes);
if (pInfo->pUpdateInfo) {
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
}
qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
printDataBlock(pBlock, "scan recover");
return pBlock;
if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return pInfo->pCreateTbRes;
}
qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);
printDataBlock(pInfo->pRecoverRes, "scan recover");
return pInfo->pRecoverRes;
}
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
......@@ -1896,8 +1856,11 @@ FETCH_NEXT_BLOCK:
qDebug("scan mode %d", pInfo->scanMode);
switch (pInfo->scanMode) {
case STREAM_SCAN_FROM_RES: {
blockDataDestroy(pInfo->pUpdateRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
return pInfo->pRes;
} break;
case STREAM_SCAN_FROM_DELETE_DATA: {
......@@ -1988,22 +1951,12 @@ FETCH_NEXT_BLOCK:
continue;
}
if (pInfo->pUpdateInfo) {
checkUpdateData(pInfo, true, pInfo->pRes, true);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
if (pInfo->pUpdateDataRes->info.rows > 0) {
pInfo->updateResIndex = 0;
if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return pInfo->pUpdateDataRes;
} else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
}
}
if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
return pInfo->pCreateTbRes;
}
doCheckUpdate(pInfo, pBlockInfo->window.ekey);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
......@@ -2017,7 +1970,6 @@ FETCH_NEXT_BLOCK:
} else {
continue;
}
/*blockDataCleanup(pInfo->pRes);*/
}
// record the scan action.
......@@ -2223,6 +2175,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
}
cleanupExprSupp(&pStreamScan->tbnameCalSup);
cleanupExprSupp(&pStreamScan->tagCalSup);
updateInfoDestroy(pStreamScan->pUpdateInfo);
blockDataDestroy(pStreamScan->pRes);
......@@ -2230,6 +2183,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
blockDataDestroy(pStreamScan->pPullDataRes);
blockDataDestroy(pStreamScan->pDeleteDataRes);
blockDataDestroy(pStreamScan->pUpdateDataRes);
blockDataDestroy(pStreamScan->pCreateTbRes);
taosArrayDestroy(pStreamScan->pBlockLists);
taosMemoryFree(pStreamScan);
}
......@@ -2285,7 +2239,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pTableScanNode->pTags != NULL) {
int32_t numOfTags;
SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags);
SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
if (pTagExpr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
......@@ -2343,6 +2297,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid;
pTaskInfo->streamInfo.snapshotVer = pHandle->version;
pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
// set the extract column id to streamHandle
tqReaderSetColIdList(pInfo->tqReader, pColIds);
......
......@@ -860,13 +860,6 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu
return TSDB_CODE_SUCCESS;
}
void releaseSource(STuplePos* pPos) {
if (pPos->pageId == -1) {
return;
}
// Todo(liuyao) relase row
}
// This function append the selectivity to subsidiaries function context directly, without fetching data
// from intermediate disk based buf page
void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos) {
......@@ -899,7 +892,6 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos)
}
void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) {
releaseSource(pDestPos);
*pDestPos = *pSourcePos;
}
......
......@@ -85,9 +85,7 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w
watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
}/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
watermark = MIN_NUM_SCALABLE_BF * adjInterval;
}*/ // Todo(liuyao) save window info to tdb
}
return watermark;
}
......
......@@ -247,6 +247,8 @@
,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
,,y,script,./test.sh -f tsim/stream/tableAndTag0.sim
,,y,script,./test.sh -f tsim/stream/tableAndTag1.sim
,,y,script,./test.sh -f tsim/trans/lossdata1.sim
,,y,script,./test.sh -f tsim/trans/create_db.sim
,,y,script,./test.sh -f tsim/tmq/basic1.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
print ===== step1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ===== step2
print ===== table name
sql create database result vgroups 1;
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s);
sql create stream streams1 trigger at_once into result.streamt SUBTABLE(concat("aaa-", tbname)) as select _wstart, count(*) c1 from st partition by tbname interval(10s);
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,1,2,3);
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result" order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop0
endi
if $data00 != aaa-t1 then
print =====data00=$data00
goto loop0
endi
if $data10 != aaa-t2 then
print =====data10=$data10
goto loop0
endi
$loop_count = 0
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result.streamt;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop1
endi
print ===== step3
print ===== tag name
sql create database result2 vgroups 1;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams2 trigger at_once into result2.streamt2 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as cc interval(10s);
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,1,2,3);
$loop_count = 0
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop2
endi
if $data00 != cc then
print data00 != cc
print =====data00=$data00
goto loop2
endi
if $data10 != cc then
print =====data10=$data10
goto loop2
endi
sql select cc from result2.streamt2 order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop2
endi
if $data00 != tag-t1 then
print data00 != tag-t1
print =====data00=$data00
goto loop2
endi
if $data10 != tag-t2 then
print =====data10=$data10
goto loop2
endi
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result2.streamt2;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop3
endi
print ===== step4
print ===== tag name + table name
sql create database result3 vgroups 1;
sql create database test3 vgroups 4;
sql use test3;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", tbname)) as select _wstart, count(*) c1 from st partition by concat("tag-", tbname) as dd, tbname interval(10s);
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,1,2,3);
$loop_count = 0
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select tag_name from information_schema.ins_tags where db_name="result3" and stable_name = "streamt3" order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop4
endi
if $data00 != dd then
print =====data00=$data00
goto loop4
endi
if $data10 != dd then
print =====data10=$data10
goto loop4
endi
sql select dd from result3.streamt3 order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop4
endi
if $data00 != tag-t1 then
print =====data00=$data00
goto loop4
endi
if $data10 != tag-t2 then
print =====data10=$data10
goto loop4
endi
$loop_count = 0
loop5:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result3.streamt3;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop5
endi
$loop_count = 0
loop6:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result3" order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop6
endi
if $data00 != tbn-t1 then
print =====data00=$data00
goto loop6
endi
if $data10 != tbn-t2 then
print =====data10=$data10
goto loop6
endi
print ======over
system sh/stop_dnodes.sh
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
print ===== step1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ===== step2
print ===== table name
sql create database result vgroups 1;
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s);
sql create stream streams1 trigger at_once into result.streamt SUBTABLE( concat("aaa-", cast(a as varchar(10) ) ) ) as select _wstart, count(*) c1 from st partition by a interval(10s);
print ===== insert into 1
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,2,2,3);
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result" order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
print $data20 $data30
goto loop0
endi
if $data00 != aaa-1 then
print =====data00=$data00
goto loop0
endi
if $data10 != aaa-2 then
print =====data10=$data10
goto loop0
endi
$loop_count = 0
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result.streamt;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop1
endi
print ===== step3
print ===== column name
sql create database result2 vgroups 1;
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams2 trigger at_once into result2.streamt2 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st partition by concat("col-", cast(a as varchar(10) ) ) as cc interval(10s);
print ===== insert into 2
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,2,2,3);
$loop_count = 0
loop2:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop2
endi
if $data00 != cc then
print =====data00=$data00
goto loop2
endi
if $data10 != cc then
print =====data10=$data10
goto loop2
endi
sql select cc from result2.streamt2 order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop2
endi
if $data00 != col-1 then
print =====data00=$data00
goto loop2
endi
if $data10 != col-2 then
print =====data10=$data10
goto loop2
endi
$loop_count = 0
loop3:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result2.streamt2;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop3
endi
print ===== step4
print ===== column name + table name
sql create database result3 vgroups 1;
sql create database test3 vgroups 4;
sql use test3;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", cast(a as varchar(10) ) ) ) as select _wstart, count(*) c1 from st partition by concat("col-", cast(a as varchar(10) ) ) as dd, a interval(10s);
print ===== insert into 3
sql insert into t1 values(1648791213000,1,2,3);
sql insert into t2 values(1648791213000,2,2,3);
$loop_count = 0
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select tag_name from information_schema.ins_tags where db_name="result3" and stable_name = "streamt3" order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop4
endi
if $data00 != dd then
print =====data00=$data00
goto loop4
endi
if $data10 != dd then
print =====data10=$data10
goto loop4
endi
sql select dd from result3.streamt3 order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop4
endi
if $data00 != col-1 then
print =====data00=$data00
goto loop4
endi
if $data10 != col-2 then
print =====data10=$data10
goto loop4
endi
$loop_count = 0
loop5:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from result3.streamt3;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop5
endi
$loop_count = 0
loop6:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select table_name from information_schema.ins_tables where db_name="result3" order by 1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data10
goto loop6
endi
if $data00 != tbn-1 then
print =====data00=$data00
goto loop6
endi
if $data10 != tbn-2 then
print =====data10=$data10
goto loop6
endi
print ======over
system sh/stop_dnodes.sh
......@@ -68,7 +68,7 @@ sql select * from streamt2;
if $rows != 1 then
print ======streamt2=$rows
return -1
goto loop1
endi
sql select * from streamt3;
......@@ -80,7 +80,7 @@ endi
sql select * from streamt4;
if $rows != 1 then
print ======streamt4=$rows
return -1
goto loop1
endi
sql select * from streamt5;
......@@ -92,7 +92,7 @@ endi
sql select * from streamt6;
if $rows != 1 then
print ======streamt6=$rows
return -1
goto loop1
endi
sql select * from streamt7;
......@@ -104,7 +104,7 @@ endi
sql select * from streamt8;
if $rows != 1 then
print ======streamt8=$rows
return -1
goto loop1
endi
sql select * from streamt9;
......@@ -116,7 +116,7 @@ endi
sql select * from streamt10;
if $rows != 1 then
print ======streamt10=$rows
return -1
goto loop1
endi
sql select * from streamt11;
......@@ -125,4 +125,6 @@ if $rows != 2 then
goto loop1
endi
print ======over
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册