未验证 提交 64d60e82 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12981 from taosdata/feature/stream

fix(stream): handle null data
...@@ -350,7 +350,7 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) ...@@ -350,7 +350,7 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
return -1; return -1;
} }
int32_t index = (tsColumnIndex == -1)? 0:tsColumnIndex; int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) { if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
return 0; return 0;
...@@ -599,8 +599,8 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { ...@@ -599,8 +599,8 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
} }
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) { int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
pBlock->info.rows = *(int32_t*) buf; pBlock->info.rows = *(int32_t*)buf;
pBlock->info.groupId = *(uint64_t*) (buf + sizeof(int32_t)); pBlock->info.groupId = *(uint64_t*)(buf + sizeof(int32_t));
int32_t numOfCols = pBlock->info.numOfCols; int32_t numOfCols = pBlock->info.numOfCols;
const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t); const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
...@@ -669,7 +669,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { ...@@ -669,7 +669,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t); return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t);
} }
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
ASSERT(pBlock != NULL); ASSERT(pBlock != NULL);
double rowSize = 0; double rowSize = 0;
...@@ -1232,7 +1232,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { ...@@ -1232,7 +1232,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
// the true value must be less than the value of nRows // the true value must be less than the value of nRows
int32_t additional = 0; int32_t additional = 0;
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
additional += nRows * sizeof(int32_t); additional += nRows * sizeof(int32_t);
...@@ -1728,8 +1728,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1728,8 +1728,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
for (int32_t k = 0; k < pTSchema->numOfCols; k++) { for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pColumn = &pTSchema->columns[k]; const STColumn* pColumn = &pTSchema->columns[k];
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
void* data = colDataGetData(pColData, j); if (colDataIsNull_s(pColData, j)) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k);
} else {
void* data = colDataGetData(pColData, j);
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
}
} }
int32_t rowLen = TD_ROW_LEN(rowData); int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen); rowData = POINTER_SHIFT(rowData, rowLen);
......
...@@ -158,7 +158,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -158,7 +158,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
ASSERT(false); ASSERT(false);
} }
if (output == NULL) break; if (output == NULL) break;
taosArrayPush(pRes, output); // TODO: do we need free memory?
SSDataBlock* outputCopy = createOneDataBlock(output, true);
taosArrayPush(pRes, outputCopy);
} }
// destroy // destroy
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册