提交 8b02719e 编写于 作者: wmmhello's avatar wmmhello

fix:[TD-20612] error if write raw with some cols

上级 829a7631
......@@ -1262,22 +1262,19 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
}
uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid);
uint64_t uid = pTableMeta->uid;
int32_t numOfCols = numFields;
int32_t numOfCols = pTableMeta->tableInfo.numOfColumns;
uint16_t fLen = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
for(int k = 0; k < numOfCols; k++){
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
SSchema* schema = pTableMeta->schema + i;
if(strcmp(schema->name, fields[k].name) != 0) continue;
fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes;
if (IS_VAR_DATA_TYPE(schema->type)) {
nVar++;
}
}
}
fLen -= sizeof(TSKEY);
......@@ -1298,14 +1295,14 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
int32_t dataLen = 0;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
char* pStart = pData + getVersion1BlockMetaSize(pData, numOfCols);
char* pStart = pData + getVersion1BlockMetaSize(pData, numFields);
int32_t* colLength = (int32_t*)pStart;
pStart += sizeof(int32_t) * numOfCols;
pStart += sizeof(int32_t) * numFields;
SResultColumn* pCol = taosMemoryCalloc(numOfCols, sizeof(SResultColumn));
SResultColumn* pCol = taosMemoryCalloc(numFields, sizeof(SResultColumn));
for (int32_t i = 0; i < numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(pTableMeta->schema[i].type)) {
for (int32_t i = 0; i < numFields; ++i) {
if (IS_VAR_DATA_TYPE(fields[i].type)) {
pCol[i].offset = (int32_t*)pStart;
pStart += rows * sizeof(int32_t);
} else {
......@@ -1317,36 +1314,35 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
pStart += colLength[i];
}
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for (int i = 0; i < numFields; i++) {
TAOS_FIELD* schema = &fields[i];
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
}
for (int32_t j = 0; j < rows; j++) {
tdSRowResetBuf(&rb, rowData);
int32_t offset = 0;
for (int32_t k = 0; k < numOfCols; k++) {
const SSchema* pColumn = NULL;
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
if (strcmp((pTableMeta->schema + i)->name, fields[k].name) == 0) {
pColumn = pTableMeta->schema + i;
break;
}
}
if(pColumn == NULL){
uError("column not exist:%s", fields[k].name);
code = TSDB_CODE_INVALID_PARA;
goto end;
}
if (IS_VAR_DATA_TYPE(pColumn->type)) {
if (pCol[k].offset[j] != -1) {
char* data = pCol[k].pData + pCol[k].offset[j];
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
} else {
if (!colDataIsNull_f(pCol[k].nullbitmap, j)) {
char* data = pCol[k].pData + pColumn->bytes * j;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if (!index) { // add none
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
}else{
if (IS_VAR_DATA_TYPE(pColumn->type)) {
if (pCol[k].offset[j] != -1) {
char* data = pCol[k].pData + pCol[k].offset[j];
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
if (!colDataIsNull_f(pCol[k].nullbitmap, j)) {
char* data = pCol[k].pData + pColumn->bytes * j;
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k);
} else {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
}
}
}
......@@ -1360,6 +1356,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
dataLen += rowLen;
}
taosHashCleanup(schemaHash);
taosMemoryFree(pCol);
blk->uid = htobe64(uid);
......@@ -1704,8 +1701,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
uint16_t fLen = 0;
int32_t rowSize = 0;
int16_t nVar = 0;
for (int i = 0; i < pSW->nCols; i++) {
SSchema* schema = &pSW->pSchema[i];
for (int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
SSchema* schema = &pTableMeta->schema[i];
fLen += TYPE_BYTES[schema->type];
rowSize += schema->bytes;
if (IS_VAR_DATA_TYPE(schema->type)) {
......@@ -1716,7 +1713,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
int32_t rows = rspObj.resInfo.numOfRows;
int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) +
(int32_t)TD_BITMAP_BYTES(pSW->nCols - 1);
(int32_t)TD_BITMAP_BYTES(pTableMeta->tableInfo.numOfColumns - 1);
int32_t schemaLen = 0;
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
......@@ -1763,14 +1760,14 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
SRowBuilder rb = {0};
tdSRowInit(&rb, sver);
tdSRowSetTpInfo(&rb, pSW->nCols, fLen);
tdSRowSetTpInfo(&rb, pTableMeta->tableInfo.numOfColumns, fLen);
int32_t totalLen = 0;
// SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
// for (int i = 0; i < pSW->nCols; i++) {
// SSchema* schema = &pSW->pSchema[i];
// taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
// }
SHashObj* schemaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for (int i = 0; i < pSW->nCols; i++) {
SSchema* schema = &pSW->pSchema[i];
taosHashPut(schemaHash, schema->name, strlen(schema->name), &i, sizeof(int32_t));
}
for (int32_t j = 0; j < rows; j++) {
tdSRowResetBuf(&rb, rowData);
......@@ -1779,29 +1776,22 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
rspObj.resInfo.current += 1;
int32_t offset = 0;
for (int32_t i = 0; i < pSW->nCols; i++) {
const SSchema* pColumn = NULL;
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
if (strcmp(pTableMeta->schema[k].name, pSW->pSchema[i].name) == 0) {
pColumn = &pTableMeta->schema[k];
break;
}
}
if (pColumn == NULL){
uError("column not exist:%s", pSW->pSchema[i].name);
code = TSDB_CODE_INVALID_PARA;
goto end;
}
char* colData = rspObj.resInfo.row[i];
if (!colData) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, i);
for (int32_t k = 0; k < pTableMeta->tableInfo.numOfColumns; k++) {
const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if (!index) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
} else {
if (IS_VAR_DATA_TYPE(pColumn->type)) {
colData -= VARSTR_HEADER_SIZE;
char* colData = rspObj.resInfo.row[*index];
if (!colData) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
} else {
if (IS_VAR_DATA_TYPE(pColumn->type)) {
colData -= VARSTR_HEADER_SIZE;
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, k);
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, offset, i);
}
if (pColumn->colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
offset += TYPE_BYTES[pColumn->type];
}
......@@ -1812,6 +1802,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
totalLen += rowLen;
}
taosHashCleanup(schemaHash);
blk->uid = htobe64(uid);
blk->suid = htobe64(suid);
blk->sversion = htonl(sver);
......@@ -1882,6 +1873,7 @@ end:
return code;
}
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pVgHash = NULL;
......@@ -2094,7 +2086,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
const SSchema* pColumn = &pTableMeta->schema[k];
int32_t* index = taosHashGet(schemaHash, pColumn->name, strlen(pColumn->name));
if (!index) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k);
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, offset, k);
} else {
char* colData = rspObj.resInfo.row[*index];
if (!colData) {
......@@ -2175,7 +2167,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code;
end:
end:
tDeleteSTaosxRsp(&rspObj.rsp);
rspObj.resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&rspObj.resInfo);
......
......@@ -76,6 +76,32 @@ static void msg_process(TAOS_RES* msg) {
}
int buildDatabase(TAOS* pConn, TAOS_RES* pRes){
/* test for TD-20612 start*/
// pRes = taos_query(pConn,"create table tb1 (ts timestamp, c1 int, c2 int)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn,"insert into tb1 (ts, c1) values(1669092069069, 0)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn,"insert into tb1 (ts, c2) values(1669092069069, 1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// return 0;
/* test for TD-20612 end*/
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册