未验证 提交 c8cca356 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14246 from taosdata/feature/stream

enh(stream): generate schema only once
......@@ -18,57 +18,87 @@
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId) {
SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL;
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// cal size
int32_t cap = sizeof(SSubmitReq);
int32_t sz = taosArrayGetSize(pBlocks);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname;
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.groupId,
if (createTb) {
schemaReqs = taosArrayInit(sz, sizeof(void*));
schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.groupId,
};
STag* pTag = NULL;
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(schemaReqs);
taosArrayDestroy(tagArray);
return NULL;
}
SVCreateTbReq createTbReq = {0};
createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
createTbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
int32_t schemaLen;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
void* schemaStr = taosMemoryMalloc(schemaLen);
if (schemaStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
taosArrayPush(schemaReqs, &schemaStr);
taosArrayPush(schemaReqSz, &schemaLen);
SEncoder encoder = {0};
tEncoderInit(&encoder, schemaStr, schemaLen);
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq);
}
}
taosArrayDestroy(tagArray);
// cal size
int32_t cap = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
int32_t schemaLen = 0;
if (createTb) {
schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
}
......@@ -99,55 +129,13 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname;
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.groupId,
};
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
createTbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, blkSchema, schemaLen);
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
void* schemaStr = taosArrayGetP(schemaReqs, i);
memcpy(blkSchema, schemaStr, schemaLen);
}
blkHead->schemaLen = htonl(schemaLen);
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
......@@ -175,7 +163,10 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
}
ret->length = htonl(ret->length);
taosArrayDestroy(tagArray);
if (schemaReqs) taosArrayDestroyP(schemaReqs, taosMemoryFree);
taosArrayDestroy(schemaReqSz);
return ret;
}
......
......@@ -671,7 +671,7 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
fflush(pFile->fp);
}
bool taosValidFile(TdFilePtr pFile) { return pFile != NULL; }
bool taosValidFile(TdFilePtr pFile) { return pFile != NULL && pFile->fd > 0; }
int32_t taosUmaskFile(int32_t maskVal) {
#ifdef WINDOWS
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册