提交 2bf0916b 编写于 作者: wmmhello's avatar wmmhello

fix:split block data if there are none value in block for taosx

上级 936b1562
......@@ -1505,7 +1505,18 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, NULL, 0);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if(fields == NULL){
goto end;
}
for(int i = 0; i < pSW->nCols; i++){
fields[i].type = pSW->pSchema[i].type;
fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed");
goto end;
......@@ -1665,7 +1676,18 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, NULL, 0);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter);
TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD));
if(fields == NULL){
goto end;
}
for(int i = 0; i < pSW->nCols; i++){
fields[i].type = pSW->pSchema[i].type;
fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
}
code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols);
taosMemoryFree(fields);
if (code != TSDB_CODE_SUCCESS) {
uError("WriteRaw:rawBlockBindData failed");
goto end;
......
......@@ -39,7 +39,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed since no handler", pMsg);
dGError("msg:%p,info:%s not processed since no handler", pMsg, TMSG_INFO(pMsg->msgType));
return -1;
}
......
......@@ -1006,13 +1006,257 @@ FAIL:
#endif
int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock2(&block, pReader, pSubmitTbDataRet) == 0) {
taosArrayPush(blocks, &block);
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pReader->pSchemaWrapper);
taosArrayPush(schemas, &pSW);
return 0;
tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++;
if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid;
pReader->lastBlkUid = uid;
taosMemoryFree(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchema == NULL) {
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
"), version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchemaWrapper == NULL) {
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
STSchema* pTschema = pReader->pSchema;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
int32_t numOfRows = 0;
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol;
SColData* pCol = taosArrayGet(pCols, 0);
numOfRows = pCol->nVal;
} else {
SArray* pRows = pSubmitTbData->aRowP;
numOfRows = taosArrayGetSize(pRows);
}
int32_t curRow = 0;
int32_t lastRow = 0;
char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
if (assigned == NULL) return -1;
// convert and scan one block
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol;
int32_t numOfCols = taosArrayGetSize(pCols);
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
for (int32_t j = 0; j < numOfCols; j++){
SColData* pCol = taosArrayGet(pCols, j);
SColVal colVal;
tColDataGetValue(pCol, i, &colVal);
if (curRow == 0) {
assigned[j] = !COL_VAL_IS_NONE(&colVal);
buildNew = true;
} else {
bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal);
if (currentRowAssigned != assigned[j]) {
assigned[j] = currentRowAssigned;
buildNew = true;
}
}
}
if (buildNew) {
if (taosArrayGetSize(blocks) > 0) {
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
lastRow = curRow;
}
SSDataBlock block = {0};
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if(pSW == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
goto FAIL;
}
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));
block.info.id.uid = uid;
block.info.version = pReader->msg2.ver;
if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
goto FAIL;
}
taosArrayPush(blocks, &block);
taosArrayPush(schemas, &pSW);
}
SSDataBlock* pBlock = taosArrayGetLast(blocks);
tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(blocks));
int32_t targetIdx = 0;
int32_t sourceIdx = 0;
int32_t colActual = blockDataGetNumOfCols(pBlock);
while (targetIdx < colActual) {
SColData* pCol = taosArrayGet(pCols, sourceIdx);
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal;
if (pCol->cid < pColData->info.colId) {
sourceIdx++;
} else if (pCol->cid == pColData->info.colId) {
tColDataGetValue(pCol, i, &colVal);
if (IS_STR_DATA_TYPE(colVal.type)) {
if (colVal.value.pData != NULL) {
char val[65535 + 2];
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
} else {
colDataAppendNULL(pColData, curRow - lastRow);
}
} else {
if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
}
sourceIdx++;
targetIdx++;
}
}
curRow++;
}
} else {
SArray* pRows = pSubmitTbData->aRowP;
for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGetP(pRows, i);
bool buildNew = false;
for (int32_t j = 0; j < pTschema->numOfCols; j++){
SColVal colVal;
tRowGet(pRow, pTschema, j, &colVal);
if (curRow == 0) {
assigned[j] = !COL_VAL_IS_NONE(&colVal);
buildNew = true;
} else {
bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal);
if (currentRowAssigned != assigned[j]) {
assigned[j] = currentRowAssigned;
buildNew = true;
}
}
}
if (buildNew) {
if (taosArrayGetSize(blocks) > 0) {
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
lastRow = curRow;
}
SSDataBlock block = {0};
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if(pSW == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) {
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
goto FAIL;
}
tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(block.pDataBlock));
block.info.id.uid = uid;
block.info.version = pReader->msg2.ver;
if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
blockDataFreeRes(&block);
tDeleteSSchemaWrapper(pSW);
goto FAIL;
}
taosArrayPush(blocks, &block);
taosArrayPush(schemas, &pSW);
}
SSDataBlock* pBlock = taosArrayGetLast(blocks);
tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
(int32_t)taosArrayGetSize(blocks));
int32_t targetIdx = 0;
int32_t sourceIdx = 0;
int32_t colActual = blockDataGetNumOfCols(pBlock);
while (targetIdx < colActual) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal;
tRowGet(pRow, pTschema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) {
sourceIdx++;
} else if (colVal.cid == pColData->info.colId) {
if (IS_STR_DATA_TYPE(colVal.type)) {
if (colVal.value.pData != NULL) {
char val[65535 + 2];
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
} else {
colDataAppendNULL(pColData, curRow - lastRow);
}
} else {
if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
}
}
sourceIdx++;
targetIdx++;
}
}
curRow++;
}
}
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
taosMemoryFree(assigned);
return 0;
FAIL:
taosMemoryFree(assigned);
return -1;
}
......
......@@ -45,7 +45,7 @@ class TDTestCase:
if drop:
tdSql.checkRows(10)
else:
tdSql.checkRows(15)
tdSql.checkRows(16)
tdSql.query("select * from jt order by i")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
......@@ -63,20 +63,20 @@ class TDTestCase:
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(2)
tdSql.checkRows(3)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 21)
tdSql.checkData(2, 1, 21)
tdSql.checkData(0, 2, 2)
tdSql.checkData(1, 2, 21)
tdSql.checkData(2, 2, 21)
tdSql.checkData(0, 5, "stt3")
tdSql.checkData(1, 5, "stt4")
tdSql.checkData(2, 5, "stt4")
tdSql.execute('use abc1')
tdSql.query("show tables")
if drop:
tdSql.checkRows(10)
else:
tdSql.checkRows(15)
tdSql.checkRows(16)
tdSql.query("select * from jt order by i")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 1)
......@@ -94,13 +94,13 @@ class TDTestCase:
tdSql.checkData(1, 5, "sttb4")
tdSql.query("select * from stt order by ts")
tdSql.checkRows(2)
tdSql.checkRows(3)
tdSql.checkData(0, 1, 1)
tdSql.checkData(1, 1, 21)
tdSql.checkData(2, 1, 21)
tdSql.checkData(0, 2, 2)
tdSql.checkData(1, 2, 21)
tdSql.checkData(2, 2, 21)
tdSql.checkData(0, 5, "stt3")
tdSql.checkData(1, 5, "stt4")
tdSql.checkData(2, 5, "stt4")
return
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册