未验证 提交 fe729a47 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #20305 from taosdata/fix/main_bugfix_wxy

fix: taosd crash when modify the schema and query last_row
......@@ -1262,17 +1262,58 @@ _err:
return code;
}
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray, SCacheRowsReader *pr) {
int32_t code = 0;
static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) {
SArray *pColArray = taosArrayInit(pTSchema->numOfCols, sizeof(SLastCol));
if (NULL == pColArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pTSchema->numOfCols; ++i) {
SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[i].colId, pTSchema->columns[i].type)};
taosArrayPush(pColArray, &col);
}
*ppColArray = pColArray;
return TSDB_CODE_SUCCESS;
}
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
*ppDst = taosMemoryMalloc(len);
if (NULL == *ppDst) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(*ppDst, pSrc, len);
return TSDB_CODE_SUCCESS;
}
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
return cloneTSchema(pReader->pSchema, &pReader->pCurrSchema);
}
if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
return TSDB_CODE_SUCCESS;
}
taosMemoryFreeClear(pReader->pCurrSchema);
return metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pCurrSchema);
}
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray, SCacheRowsReader *pr) {
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
int16_t nCol = pTSchema->numOfCols;
int16_t nLastCol = pTSchema->numOfCols;
int16_t iCol = 0;
int16_t noneCol = 0;
bool setNoneCol = false;
SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
bool hasRow = false;
SArray *pColArray = NULL;
SColVal *pColVal = &(SColVal){0};
int32_t code = initLastColArray(pTSchema, &pColArray);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
......@@ -1287,6 +1328,15 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
break;
}
hasRow = true;
code = updateTSchema(TSDBROW_SVERSION(pRow), pr, uid);
if (TSDB_CODE_SUCCESS != code) {
goto _err;
}
pTSchema = pr->pCurrSchema;
int16_t nCol = pTSchema->numOfCols;
TSKEY rowTs = TSDBROW_TS(pRow);
if (lastRowTs == TSKEY_MAX) {
......@@ -1294,29 +1344,27 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal});
for (iCol = 1; iCol < nCol; ++iCol) {
if (iCol >= nLastCol) {
break;
}
SLastCol *pCol = taosArrayGet(pColArray, iCol);
if (pCol->colVal.cid != pTSchema->columns[iCol].colId) {
continue;
}
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal};
*pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
if (pCol->colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
if (taosArrayPush(pColArray, &lastCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
if (COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
......@@ -1373,6 +1421,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
//*ppColArray = NULL;
// taosArrayDestroy(pColArray);
//} else {
if (!hasRow) {
taosArrayClear(pColArray);
}
*ppColArray = pColArray;
//}
......@@ -1387,43 +1438,6 @@ _err:
return code;
}
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
*ppDst = taosMemoryMalloc(len);
if (NULL == *ppDst) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(*ppDst, pSrc, len);
return TSDB_CODE_SUCCESS;
}
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
return cloneTSchema(pReader->pSchema, &pReader->pCurrSchema);
}
if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
return TSDB_CODE_SUCCESS;
}
taosMemoryFreeClear(pReader->pCurrSchema);
return metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pCurrSchema);
}
static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) {
SArray *pColArray = taosArrayInit(pTSchema->numOfCols, sizeof(SLastCol));
if (NULL == pColArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pTSchema->numOfCols; ++i) {
SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[i].colId, pTSchema->columns[i].type)};
taosArrayPush(pColArray, &col);
}
*ppColArray = pColArray;
return TSDB_CODE_SUCCESS;
}
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr) {
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
int16_t nLastCol = pTSchema->numOfCols;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册