提交 fbfae317 编写于 作者: L Liu Jicong

refactor

上级 b1594a68
......@@ -171,7 +171,8 @@ typedef struct SDataBlockInfo {
STimeWindow calWin; // used for stream, do not serialize
TSKEY watermark; // used for stream
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream
char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
STag* pTag; // used for stream partition
} SDataBlockInfo;
typedef struct SSDataBlock {
......
......@@ -1235,6 +1235,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
taosArrayDestroy(pBlock->pDataBlock);
pBlock->pDataBlock = NULL;
taosMemoryFreeClear(pBlock->pBlockAgg);
taosMemoryFree(pBlock->info.pTag);
memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
}
......@@ -1317,7 +1318,7 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
pBlock->info.rows = 0;
pBlock->info.type = type;
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) +
sizeof(TSKEY) + TSDB_TABLE_NAME_LEN;
sizeof(TSKEY) + VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
pBlock->info.watermark = INT64_MIN;
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
......@@ -1345,7 +1346,7 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
// table name
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = TSDB_TABLE_NAME_LEN;
infoData.info.bytes = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
return pBlock;
......
......@@ -677,7 +677,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// build stream obj from request
if (mndBuildStreamObjFromCreateReq(pMnode, &streamObj, &createStreamReq) < 0) {
/*ASSERT(0);*/
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
......
......@@ -84,26 +84,6 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
continue;
}
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.groupId,
};
STag* pTag = NULL;
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(tagArray);
return NULL;
}
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
// STag* pTag = NULL;
// taosArrayClear(tagArray);
// SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
......@@ -126,42 +106,76 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
// }
SVCreateTbReq createTbReq = {0};
// set const
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
// set tag content
taosArrayClear(tagArray);
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.groupId,
};
taosArrayPush(tagArray, &tagVal);
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(tagArray);
taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
return NULL;
}
createTbReq.ctb.pTag = (uint8_t*)pTag;
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
createTbReq.ctb.tagName = tagName;
// set table name
if (pDataBlock->info.parTbName[0]) {
createTbReq.name = strdup(pDataBlock->info.parTbName);
} else {
createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
}
createTbReq.ctb.suid = suid;
createTbReq.ctb.pTag = (uint8_t*)pTag;
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
createTbReq.ctb.tagName = tagName;
// save schema len
int32_t code;
int32_t schemaLen;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
return NULL;
}
taosArrayPush(schemaReqSz, &schemaLen);
// save schema str
void* schemaStr = taosMemoryMalloc(schemaLen);
if (schemaStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
return NULL;
}
taosArrayPush(schemaReqs, &schemaStr);
taosArrayPush(schemaReqSz, &schemaLen);
SEncoder encoder = {0};
tEncoderInit(&encoder, schemaStr, schemaLen);
......@@ -169,6 +183,10 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
tEncoderClear(&encoder);
return NULL;
}
tEncoderClear(&encoder);
......@@ -221,10 +239,8 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
tqDebug("tq sink, convert block %d, rows: %d", i, rows);
int32_t dataLen = 0;
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
int32_t schemaLen = 0;
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
if (createTb) {
schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
void* schemaStr = taosArrayGetP(schemaReqs, i);
......@@ -262,7 +278,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
ret->length = htonl(ret->length);
if (schemaReqs) taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
return ret;
......
......@@ -1300,6 +1300,56 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return code;
}
static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock* pResBlock) {
if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
if (pBlock == NULL || pBlock->info.rows == 0) return;
SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
ASSERT(pSrcBlock->info.rows == 1);
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
ASSERT(pResBlock->info.rows == 1);
// build tagArray
// build STag
// set STag
blockDataDestroy(pSrcBlock);
}
static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
if (pBlock == NULL || pBlock->info.rows == 0) return;
SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
ASSERT(pSrcBlock->info.rows == 1);
SSDataBlock* pResBlock = createDataBlock();
pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
taosArrayPush(pResBlock->pDataBlock, &data);
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL);
ASSERT(pResBlock->info.rows == 1);
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
void* pData = colDataGetData(pCol, 0);
// TODO check tbname validation
if (pData != (void*)-1 && pData != NULL) {
memcpy(pBlock->info.parTbName, varDataVal(pData), varDataLen(pData));
} else {
pBlock->info.parTbName[0] = 0;
}
blockDataDestroy(pSrcBlock);
blockDataDestroy(pResBlock);
}
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName) {
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
......@@ -1421,28 +1471,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock);
if (pInfo->tbnameCalSup.numOfExprs > 0 && pInfo->pRes->info.rows > 0) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pInfo->pRes, 0);
SSDataBlock* pResBlock = createDataBlock();
pResBlock->info.rowSize = TSDB_TABLE_NAME_LEN;
SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
taosArrayPush(pResBlock->pDataBlock, &data);
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pInfo->tbnameCalSup.pExprInfo, pResBlock, pTmpBlock, pInfo->tbnameCalSup.pCtx, 1, NULL);
ASSERT(pResBlock->info.rows == 1);
ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);
void* pData = colDataGetData(pCol, 0);
// TODO check tbname validation
if (pData != (void*)-1) {
memcpy(pInfo->pRes->info.parTbName, varDataVal(pData), varDataLen(pData));
} else {
pInfo->pRes->info.parTbName[0] = 0;
}
blockDataDestroy(pTmpBlock);
blockDataDestroy(pResBlock);
}
calBlockTbName(&pInfo->tbnameCalSup, pInfo->pRes);
return 0;
}
......@@ -1780,6 +1809,7 @@ FETCH_NEXT_BLOCK:
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "stream scan update");
calBlockTbName(&pInfo->tbnameCalSup, pSDB);
return pSDB;
}
blockDataCleanup(pInfo->pUpdateDataRes);
......
......@@ -492,18 +492,21 @@ int walSaveMeta(SWal* pWal) {
int metaVer = walFindCurMetaVer(pWal);
char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer + 1, fnameStr);
TdFilePtr pMataFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE);
if (pMataFile == NULL) {
TdFilePtr pMetaFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE);
if (pMetaFile == NULL) {
return -1;
}
char* serialized = walMetaSerialize(pWal);
int len = strlen(serialized);
if (len != taosWriteFile(pMataFile, serialized, len)) {
if (len != taosWriteFile(pMetaFile, serialized, len)) {
// TODO:clean file
taosCloseFile(&pMetaFile);
taosRemoveFile(fnameStr);
return -1;
}
taosCloseFile(&pMataFile);
taosCloseFile(&pMetaFile);
// delete old file
if (metaVer > -1) {
walBuildMetaName(pWal, metaVer, fnameStr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册