未验证 提交 a596b548 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #17295 from taosdata/feature/stream

chore: add demo for stream user defined table name
...@@ -49,28 +49,28 @@ int32_t init_env() { ...@@ -49,28 +49,28 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int, j varchar(20)) tags(a varchar(20))");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)"); pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags('c1')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)"); pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags('c2')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags(3)"); pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags('c3')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -96,7 +96,8 @@ int32_t create_stream() { ...@@ -96,7 +96,8 @@ int32_t create_stream() {
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, pRes = taos_query(pConn,
"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)"); /*"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");*/
"create stream stream2 into outstb subtable(concat(concat(concat('prefix_', tname), '_suffix_'), cast(k1 as varchar(20)))) as select _wstart wstart, avg(k) from st1 partition by tbname tname, a k1 interval(10s);");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -117,6 +117,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { ...@@ -117,6 +117,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
mndTransDrop(pTrans); mndTransDrop(pTrans);
tDeleteSMqConsumerObj(pConsumerNew);
return 0; return 0;
FAIL: FAIL:
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
......
...@@ -182,6 +182,11 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { ...@@ -182,6 +182,11 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq); tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "commit-offset"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "commit-offset");
if (pTrans == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tDecoderClear(&decoder);
return -1;
}
for (int32_t i = 0; i < commitOffsetReq.num; i++) { for (int32_t i = 0; i < commitOffsetReq.num; i++) {
SMqOffset *pOffset = &commitOffsetReq.offsets[i]; SMqOffset *pOffset = &commitOffsetReq.offsets[i];
......
...@@ -222,6 +222,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -222,6 +222,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
} }
SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) { if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
taosMemoryFree(buf);
goto TOPIC_DECODE_OVER; goto TOPIC_DECODE_OVER;
} }
taosMemoryFree(buf); taosMemoryFree(buf);
......
...@@ -170,6 +170,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) { ...@@ -170,6 +170,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tdbFree(pKey);
tdbTbcClose(pCur); tdbTbcClose(pCur);
return -1; return -1;
} }
......
...@@ -1341,7 +1341,8 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) { ...@@ -1341,7 +1341,8 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
void* pData = colDataGetData(pCol, 0); void* pData = colDataGetData(pCol, 0);
// TODO check tbname validation // TODO check tbname validation
if (pData != (void*)-1 && pData != NULL) { if (pData != (void*)-1 && pData != NULL) {
memcpy(pBlock->info.parTbName, varDataVal(pData), varDataLen(pData)); memcpy(pBlock->info.parTbName, varDataVal(pData), TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN));
pBlock->info.parTbName[TSDB_TABLE_NAME_LEN - 1] = 0;
} else { } else {
pBlock->info.parTbName[0] = 0; pBlock->info.parTbName[0] = 0;
} }
......
...@@ -3611,6 +3611,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { ...@@ -3611,6 +3611,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
blockDataDestroy(pInfo->pUpdateRes); blockDataDestroy(pInfo->pUpdateRes);
destroySqlFunctionCtx(pInfo->pDummyCtx, 0); destroySqlFunctionCtx(pInfo->pDummyCtx, 0);
taosHashCleanup(pInfo->pStDeleted); taosHashCleanup(pInfo->pStDeleted);
taosHashCleanup(pInfo->pGroupIdTbNameMap);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
...@@ -4670,6 +4671,7 @@ void destroyStreamStateOperatorInfo(void* param) { ...@@ -4670,6 +4671,7 @@ void destroyStreamStateOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
taosHashCleanup(pInfo->pSeDeleted); taosHashCleanup(pInfo->pSeDeleted);
taosHashCleanup(pInfo->pGroupIdTbNameMap);
destroySqlFunctionCtx(pInfo->pDummyCtx, 0); destroySqlFunctionCtx(pInfo->pDummyCtx, 0);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
......
...@@ -35,8 +35,8 @@ int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVe ...@@ -35,8 +35,8 @@ int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVe
int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; } int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; }
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { static FORCE_INLINE void walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
} }
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
...@@ -150,6 +150,7 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -150,6 +150,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
const char* idxPattern = "^[0-9]+.idx$"; const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern; regex_t logRegPattern;
regex_t idxRegPattern; regex_t idxRegPattern;
bool fixed = false;
regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&logRegPattern, logPattern, REG_EXTENDED);
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
...@@ -206,6 +207,77 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -206,6 +207,77 @@ int walCheckAndRepairMeta(SWal* pWal) {
actualFileNum = taosArrayGetSize(pLogInfoArray); actualFileNum = taosArrayGetSize(pLogInfoArray);
#endif #endif
{
int32_t i = 0, j = 0;
while (i < actualFileNum && j < metaFileNum) {
SWalFileInfo* pActualFile = taosArrayGet(actualLog, i);
SWalFileInfo* pMetaFile = taosArrayGet(pWal->fileInfoSet, j);
if (pActualFile->firstVer < pMetaFile->firstVer) {
char fNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
i++;
} else if (pActualFile->firstVer > pMetaFile->firstVer) {
taosArrayRemove(pWal->fileInfoSet, j);
metaFileNum--;
} else {
i++;
j++;
}
}
if (i == actualFileNum && j == metaFileNum) {
if (j > 0) {
SWalFileInfo* pLastInfo = taosArrayGet(pWal->fileInfoSet, j - 1);
int64_t fsize = 0;
char fNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastInfo->firstVer, fNameStr);
taosStatFile(fNameStr, &fsize, NULL);
if (pLastInfo->fileSize != fsize) {
fixed = true;
pLastInfo->fileSize = fsize;
pLastInfo->lastVer = walScanLogGetLastVer(pWal);
}
}
} else {
fixed = true;
while (i < actualFileNum) {
SWalFileInfo* pActualFile = taosArrayGet(actualLog, i);
char fNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pActualFile->firstVer, fNameStr);
taosStatFile(fNameStr, &pActualFile->fileSize, NULL);
if (pActualFile->fileSize == 0) {
ASSERT(i == actualFileNum - 1);
taosRemoveFile(fNameStr);
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
break;
}
if (i < actualFileNum - 1) {
pActualFile->lastVer = ((SWalFileInfo*)taosArrayGet(actualLog, i + 1))->firstVer - 1;
taosArrayPush(pWal->fileInfoSet, pActualFile);
i++;
} else {
pActualFile = taosArrayPush(pWal->fileInfoSet, pActualFile);
pActualFile->lastVer = walScanLogGetLastVer(pWal);
if (pActualFile->lastVer == -1) {
taosRemoveFile(fNameStr);
walBuildIdxName(pWal, pActualFile->firstVer, fNameStr);
taosRemoveFile(fNameStr);
taosArrayPop(pWal->fileInfoSet);
}
break;
}
}
}
}
#if 0
if (metaFileNum > actualFileNum) { if (metaFileNum > actualFileNum) {
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
} else if (metaFileNum < actualFileNum) { } else if (metaFileNum < actualFileNum) {
...@@ -214,32 +286,30 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -214,32 +286,30 @@ int walCheckAndRepairMeta(SWal* pWal) {
taosArrayPush(pWal->fileInfoSet, pFileInfo); taosArrayPush(pWal->fileInfoSet, pFileInfo);
} }
} }
#endif
taosArrayDestroy(actualLog); taosArrayDestroy(actualLog);
actualFileNum = taosArrayGetSize(pWal->fileInfoSet);
pWal->writeCur = actualFileNum - 1; pWal->writeCur = actualFileNum - 1;
if (actualFileNum > 0) {
pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, actualFileNum - 1);
char fnameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL);
/*ASSERT(fileSize != 0);*/
if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) {
pLastFileInfo->fileSize = fileSize;
pWal->vers.lastVer = walScanLogGetLastVer(pWal);
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
ASSERT(pWal->vers.lastVer != -1);
int code = walSaveMeta(pWal); if (actualFileNum > 0) {
if (code < 0) { int64_t fLastVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur))->lastVer;
return -1; if (fLastVer != -1 && pWal->vers.lastVer != fLastVer) {
fixed = true;
pWal->vers.lastVer = fLastVer;
} }
int64_t fFirstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
if (fFirstVer != pWal->vers.firstVer) {
fixed = true;
pWal->vers.firstVer = fFirstVer;
} }
} }
if (fixed) {
walSaveMeta(pWal);
}
return 0; return 0;
} }
...@@ -530,6 +600,11 @@ int walLoadMeta(SWal* pWal) { ...@@ -530,6 +600,11 @@ int walLoadMeta(SWal* pWal) {
// read metafile // read metafile
int64_t fileSize = 0; int64_t fileSize = 0;
taosStatFile(fnameStr, &fileSize, NULL); taosStatFile(fnameStr, &fileSize, NULL);
if (fileSize == 0) {
taosRemoveFile(fnameStr);
wDebug("vgId:%d wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
return -1;
}
int size = (int)fileSize; int size = (int)fileSize;
char* buf = taosMemoryMalloc(size + 5); char* buf = taosMemoryMalloc(size + 5);
if (buf == NULL) { if (buf == NULL) {
...@@ -540,6 +615,7 @@ int walLoadMeta(SWal* pWal) { ...@@ -540,6 +615,7 @@ int walLoadMeta(SWal* pWal) {
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ); TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosMemoryFree(buf);
return -1; return -1;
} }
if (taosReadFile(pFile, buf, size) != size) { if (taosReadFile(pFile, buf, size) != size) {
......
...@@ -399,7 +399,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -399,7 +399,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
} }
*ppHead = ptr; *ppHead = (SWalCkHead *)ptr;
pReadHead = &((*ppHead)->head); pReadHead = &((*ppHead)->head);
pRead->capacity = pReadHead->bodyLen; pRead->capacity = pReadHead->bodyLen;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册