提交 d96cdff8 编写于 作者: wmmhello's avatar wmmhello

fix:add logic for auto create table while inserting data for taosx

上级 fe8a4631
......@@ -266,8 +266,8 @@ int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen,
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock2(STqReader *pReader);
bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData** pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData** pSubmitTbDataRet);
// int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
// int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas);
......
......@@ -241,7 +241,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) {
SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
}
if (pRsp->withTbName) {
......@@ -255,23 +256,33 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
continue;
}
}
if (pHandle->fetchMeta) {
#if 0
SSubmitBlk* pBlk = pReader->pBlock;
int64_t uid = pExec->pExecReader->lastBlkUid;
int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) {
if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
}
void* createReq = taosMemoryCalloc(1, schemaLen);
memcpy(createReq, pBlk->data, schemaLen);
taosArrayPush(pRsp->createTableLen, &schemaLen);
int32_t code = TSDB_CODE_SUCCESS;
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
continue;
}
void* createReq = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
if (code < 0) {
tEncoderClear(&encoder);
taosMemoryFree(createReq);
continue;
}
taosArrayPush(pRsp->createTableLen, &len);
taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++;
}
#endif
tEncoderClear(&encoder);
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
......@@ -294,7 +305,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
/*}*/
taosArrayClear(pBlocks);
taosArrayClear(pSchemas);
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) {
SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
}
if (pRsp->withTbName) {
......@@ -307,22 +319,33 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
continue;
}
}
if (pHandle->fetchMeta) {
#if 0
SSubmitBlk* pBlk = pReader->pBlock;
int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) {
if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
}
void* createReq = taosMemoryCalloc(1, schemaLen);
memcpy(createReq, pBlk->data, schemaLen);
taosArrayPush(pRsp->createTableLen, &schemaLen);
int32_t code = TSDB_CODE_SUCCESS;
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
continue;
}
void* createReq = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
tEncoderInit(&encoder, createReq, len);
code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq);
if (code < 0) {
tEncoderClear(&encoder);
taosMemoryFree(createReq);
continue;
}
taosArrayPush(pRsp->createTableLen, &len);
taosArrayPush(pRsp->createTableReq, &createReq);
pRsp->createTableNum++;
}
#endif
tEncoderClear(&encoder);
}
/*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
/*pTq->pVnode->config.tsdbCfg.precision);*/
......
......@@ -332,7 +332,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
while (tqNextDataBlock2(pReader)) {
// TODO mem free
memset(&ret->data, 0, sizeof(SSDataBlock));
int32_t code = tqRetrieveDataBlock2(&ret->data, pReader);
int32_t code = tqRetrieveDataBlock2(&ret->data, pReader, NULL);
if (code != 0 || ret->data.info.rows == 0) {
continue;
}
......@@ -550,7 +550,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader)
}
#endif
int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
ASSERT(pReader->nextBlk < blockSz);
......@@ -559,6 +559,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++;
if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid;
......@@ -1006,9 +1007,9 @@ FAIL:
}
#endif
int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas) {
int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock2(&block, pReader) == 0) {
if (tqRetrieveDataBlock2(&block, pReader, pSubmitTbDataRet) == 0) {
taosArrayPush(blocks, &block);
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pReader->pSchemaWrapper);
taosArrayPush(schemas, &pSW);
......
......@@ -1532,7 +1532,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
while (tqNextDataBlock2(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader);
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue;
......@@ -1941,7 +1941,7 @@ FETCH_NEXT_BLOCK:
while (tqNextDataBlock2(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader);
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册