未验证 提交 7aef8881 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #19628 from taosdata/feat/3.0_stream_wxy

fix: support writing streams to existing tables
......@@ -1789,7 +1789,7 @@ typedef struct {
// 3.0.2.3
int8_t createStb;
uint64_t targetStbUid;
SArray* fillNullCols;
SArray* fillNullCols; // array of SColLocation
} SCMCreateStreamReq;
typedef struct {
......
......@@ -5428,6 +5428,13 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
}
if (tEncodeI8(&encoder, pReq->createStb) < 0) return -1;
if (tEncodeU64(&encoder, pReq->targetStbUid) < 0) return -1;
if (tEncodeI32(&encoder, taosArrayGetSize(pReq->fillNullCols)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pReq->fillNullCols); ++i) {
SColLocation *pCol = taosArrayGet(pReq->fillNullCols, i);
if (tEncodeI16(&encoder, pCol->slotId) < 0) return -1;
if (tEncodeI16(&encoder, pCol->colId) < 0) return -1;
if (tEncodeI8(&encoder, pCol->type) < 0) return -1;
}
tEndEncode(&encoder);
......@@ -5490,6 +5497,26 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
if (tDecodeI8(&decoder, &pReq->createStb) < 0) return -1;
if (tDecodeU64(&decoder, &pReq->targetStbUid) < 0) return -1;
int32_t numOfFillNullCols = 0;
if (tDecodeI32(&decoder, &numOfFillNullCols) < 0) return -1;
if (numOfFillNullCols > 0) {
pReq->fillNullCols = taosArrayInit(numOfFillNullCols, sizeof(SColLocation));
if (pReq->fillNullCols == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfFillNullCols; ++i) {
SColLocation col = {0};
if (tDecodeI16(&decoder, &col.slotId) < 0) return -1;
if (tDecodeI16(&decoder, &col.colId) < 0) return -1;
if (tDecodeI8(&decoder, &col.type) < 0) return -1;
if (taosArrayPush(pReq->fillNullCols, &col) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
}
tEndDecode(&decoder);
......@@ -5559,6 +5586,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosArrayDestroy(pReq->pTags);
taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->ast);
taosArrayDestroy(pReq->fillNullCols);
}
int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) {
......
......@@ -2210,7 +2210,8 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
}
static bool sysTableFromVnode(const char* pTable) {
return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) || (0 == strcmp(pTable, TSDB_INS_TABLE_COLS)));
return ((0 == strcmp(pTable, TSDB_INS_TABLE_TABLES)) || (0 == strcmp(pTable, TSDB_INS_TABLE_TAGS)) ||
(0 == strcmp(pTable, TSDB_INS_TABLE_COLS)));
}
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
......@@ -2278,8 +2279,9 @@ static int32_t setVnodeSysTableVgroupList(STranslateContext* pCxt, SName* pName,
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
}
if (TSDB_CODE_SUCCESS == code && (0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) ||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_COLS)) {
if (TSDB_CODE_SUCCESS == code &&
(0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_TABLES) && !hasUserDbCond) ||
0 == strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_COLS)) {
code = addMnodeToVgroupList(&pCxt->pParseCxt->mgmtEpSet, &pVgs);
}
......@@ -5804,18 +5806,19 @@ static int32_t setFillNullCols(SArray* pProjColPos, const STableMeta* pMeta, SCM
if (NULL == pReq->fillNullCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t indexOfSchema = 0;
const SSchema* pSchemas = getTableColumnSchema(pMeta);
for (int32_t i = 0; i < numOfBoundCols; ++i) {
SProjColPos* pPos = taosArrayGet(pProjColPos, i);
while (indexOfSchema < pMeta->tableInfo.numOfColumns) {
const SSchema* pSchema = pSchemas + indexOfSchema++;
int32_t indexOfBoundCols = 0;
for (int32_t i = 0; i < pMeta->tableInfo.numOfColumns; ++i) {
const SSchema* pSchema = pSchemas + i;
if (indexOfBoundCols < numOfBoundCols) {
SProjColPos* pPos = taosArrayGet(pProjColPos, indexOfBoundCols);
if (pSchema->colId == pPos->colId) {
break;
++indexOfBoundCols;
continue;
}
SColLocation colLoc = {.colId = pSchema->colId, .slotId = indexOfSchema - 1, .type = pSchema->type};
taosArrayPush(pReq->fillNullCols, &colLoc);
}
SColLocation colLoc = {.colId = pSchema->colId, .slotId = i, .type = pSchema->type};
taosArrayPush(pReq->fillNullCols, &colLoc);
}
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册