提交 f3c8bcb9 编写于 作者: D dapan1121

feat: insert from query

上级 d4696ef3
......@@ -89,9 +89,6 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
case TDMT_SCH_MERGE_FETCH:
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_SCH_FETCH_RSP:
code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_SCH_CANCEL_TASK:
code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
......
......@@ -118,7 +118,13 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
}
SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, int64_t uid, int64_t suid, int32_t vgId) {
SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) {
const SArray* pBlocks = pInserter->pDataBlocks;
const STSchema* pTSchema = pInserter->pSchema;
int64_t uid = pInserter->pNode->tableId;
int64_t suid = pInserter->pNode->stableId;
int32_t vgId = pInserter->pNode->vgId;
SSubmitReq* ret = NULL;
int32_t sz = taosArrayGetSize(pBlocks);
......@@ -192,7 +198,7 @@ SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, i
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
taosArrayPush(pInserter->pDataBlocks, pInput->pData);
SSubmitReq* pMsg = dataBlockToSubmit(pInserter->pDataBlocks, pInserter->pSchema, pInserter->pNode->tableId, pInserter->pNode->suid, pInserter->pNode->vgId);
SSubmitReq* pMsg = dataBlockToSubmit(pInserter);
int32_t code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
if (code) {
......@@ -248,7 +254,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
SDataDeleterNode* pInserterNode = (SQueryInserterNode *)pDataSink;
SQueryInserterNode* pInserterNode = (SQueryInserterNode *)pDataSink;
inserter->sink.fPut = putDataBlock;
inserter->sink.fEndPut = endPut;
inserter->sink.fGetLen = getDataLength;
......@@ -267,7 +273,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
return code;
}
if (pInserterNode->suid != suid) {
if (pInserterNode->stableId != suid) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return terrno;
}
......
......@@ -39,6 +39,9 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) {
if (TK_USING == t.type || TK_VALUES == t.type) {
return true;
}
if (0 == t.type) {
break;
}
} while (pStr - pSql < length);
return false;
}
......
......@@ -247,7 +247,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
}
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
int32_t len = 0;
int64_t len = 0;
bool queryEnd = false;
int32_t code = 0;
SOutputData output = {0};
......@@ -255,7 +255,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
if (len <= 0 || len != sizeof(SDeleterRes)) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......
......@@ -318,7 +318,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_SRC_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
......
......@@ -30,6 +30,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
case TDMT_SCH_EXPLAIN_RSP:
return TSDB_CODE_SUCCESS;
case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP:
if (lastMsgType != reqMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册