提交 3ed1ab6a 编写于 作者: L Liu Jicong

enh(stream): auto create ctb

上级 26712d14
......@@ -101,8 +101,8 @@ int32_t create_topic() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
......
......@@ -56,11 +56,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
#define colDataSetNotNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1)
#define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
......@@ -187,8 +187,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
}
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource,
uint32_t numOfRow2);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
......@@ -230,9 +230,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid);
tb_uid_t uid, tb_uid_t suid);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
......
......@@ -69,20 +69,24 @@ typedef struct {
SUseDbRsp dbInfo;
} STaskDispatcherShuffle;
typedef void FTbSink(void* vnode, int64_t ver, const SArray* data);
typedef struct {
int8_t reserved;
int64_t stbUid;
SSchemaWrapper* pSchemaWrapper;
// not applicable to encoder and decoder
void* vnode;
FTbSink* tbSinkFunc;
STSchema* pTSchema;
SHashObj* pHash; // groupId to tbuid
} STaskSinkTb;
typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data);
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
typedef struct {
int64_t smaId;
// following are not applicable to encoder and decoder
FSmaHandle* smaHandle;
FSmaSink* smaSink;
} STaskSinkSma;
typedef struct {
......
......@@ -363,9 +363,9 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd
for (int32_t i = 0; i < pDest->info.numOfCols; ++i) {
int32_t mapIndex = i;
// if (pIndexMap) {
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
// }
// if (pIndexMap) {
// mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
// }
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
......@@ -1596,7 +1596,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
return TSDB_CODE_SUCCESS;
}
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid) {
SSubmitReq* ret = NULL;
// cal size
......@@ -1608,7 +1608,29 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
cap += sizeof(SSubmitBlk) + rows * maxLen;
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = htobe64(suid);
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
}
// assign data
......@@ -1623,19 +1645,47 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
SSubmitBlk* blkHead = submitBlk;
blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->schemaLen = 0;
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = 0;
blkHead->uid = htobe64(pDataBlock->info.uid);
blkHead->suid = htobe64(suid);
// uid is assigned by vnode
blkHead->uid = 0;
int32_t rows = pDataBlock->info.rows;
/*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/
/*blkHead->dataLen = htonl(rows * maxLen);*/
blkHead->dataLen = 0;
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
STSRow* rowData = blockData;
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
createTbReq.name = "a";
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
SEncoder encoder = {0};
tEncoderInit(&encoder, blockData, schemaLen);
if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL;
tEncoderClear(&encoder);
}
blkHead->schemaLen = htonl(schemaLen);
STSRow* rowData = POINTER_SHIFT(blockData, schemaLen);
for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0};
......@@ -1656,6 +1706,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) {
int32_t len = blkHead->dataLen;
blkHead->dataLen = htonl(len);
blkHead = POINTER_SHIFT(blkHead, len);
/*submitBlk = blkHead;*/
}
return ret;
......
......@@ -56,7 +56,6 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
ASSERT(0);
}
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen);
ASSERT(pIter->len > 0);
}
......@@ -4013,4 +4012,4 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) {
tEndDecode(pCoder);
return 0;
}
\ No newline at end of file
}
......@@ -574,6 +574,7 @@ typedef struct {
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
int64_t createTime;
int64_t updateTime;
int64_t uid;
......
......@@ -416,6 +416,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
/*int32_t outputNameSz = 0;*/
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
......@@ -465,6 +468,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
......@@ -529,4 +535,4 @@ void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
buf = taosDecodeStringTo(buf, pOffset->key);
buf = taosDecodeFixedI64(buf, &pOffset->offset);
return buf;
}
\ No newline at end of file
}
......@@ -204,6 +204,7 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
ASSERT(pTask->tbSink.pSchemaWrapper);
}
......@@ -244,9 +245,10 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
//
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
......@@ -319,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
}
#endif
......
......@@ -360,6 +360,8 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
goto _OVER;
}
stbObj.uid = pStream->targetStbUid;
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
return 0;
......@@ -379,6 +381,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN);
streamObj.dbUid = pDb->uid;
streamObj.version = 1;
streamObj.sql = pCreate->sql;
......
......@@ -884,24 +884,38 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
}
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if (pTask->execType == TASK_EXEC__NONE) return 0;
void tqTableSink(void* vnode, int64_t ver, const SArray* data) {
//
SVnode* pVnode = (SVnode*)vnode;
// build write msg
//
}
pTask->exec.numOfRunners = parallel;
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
if (pTask->exec.runners == NULL) {
return -1;
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if (pTask->execType != TASK_EXEC__NONE) {
// expand runners
pTask->exec.numOfRunners = parallel;
pTask->exec.runners = taosMemoryCalloc(parallel, sizeof(SStreamRunner));
if (pTask->exec.runners == NULL) {
return -1;
}
for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
};
pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.runners[i].executor);
}
}
for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
};
pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.runners[i].executor);
if (pTask->sinkType == TASK_SINK__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqTableSink;
}
return 0;
}
......@@ -925,7 +939,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
// sink
pTask->ahandle = pTq->pVnode;
if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle = smaHandleRes;
pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->sinkType == TASK_SINK__TABLE) {
ASSERT(pTask->tbSink.pSchemaWrapper);
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
......
......@@ -152,12 +152,12 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
/*blockDebugShowData(pRes);*/
blockDebugShowData(pRes);
ASSERT(pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema);
SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, false, pTask->tbSink.stbUid);
tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
//
} else if (pTask->sinkType == TASK_SINK__FETCH) {
//
......@@ -276,7 +276,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
}
if (pTask->sinkType == TASK_SINK__TABLE) {
/*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
......@@ -321,7 +321,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
if (pTask->sinkType == TASK_SINK__TABLE) {
/*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
......
......@@ -442,4 +442,4 @@ TEST(td_encode_test, compound_struct_encode_test) {
#endif
#pragma GCC diagnostic pop
#endif
\ No newline at end of file
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册