diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index dfc56eff3912c6ea3a6ceb7d9b271d4810c747f5..9c788202d302ca3f984f2c0d65db7b5ab61c8ae0 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -53,6 +53,7 @@ typedef struct SInsertParseContext { SHashObj* pTableBlockHashObj; // global SHashObj* pSubTableHashObj; // global SArray* pVgDataBlocks; // global + SHashObj* pTableNameHashObj; // global int32_t totalNum; SVnodeModifOpStmt* pOutput; SStmtCallback* pStmtCb; @@ -1111,6 +1112,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); tNameExtractFullName(&name, tbFName); + CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName))); + // USING cluase if (TK_USING == sToken.type) { CHECK_CODE(parseUsingClause(pCxt, &name, tbFName)); @@ -1193,7 +1196,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { .pSql = (char*)pContext->pSql, .msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, .pTableMeta = NULL, - .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false), + .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), + .pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK), .totalNum = 0, .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT), .pStmtCb = pContext->pStmtCb}; @@ -1202,12 +1206,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj, &context.pTableBlockHashObj); } else { - context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); - context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); + context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + context.pTableBlockHashObj = + taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj || - NULL == context.pOutput) { + NULL == context.pTableNameHashObj || NULL == context.pOutput) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1220,6 +1225,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { if (NULL == *pQuery) { return TSDB_CODE_OUT_OF_MEMORY; } + (*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName)); + if (NULL == (*pQuery)->pTableList) { + return TSDB_CODE_OUT_OF_MEMORY; + } (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; (*pQuery)->haveResultSet = false; (*pQuery)->msgType = TDMT_VND_SUBMIT; @@ -1232,6 +1241,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { if (TSDB_CODE_SUCCESS == code) { code = parseInsertBody(&context); } + if (TSDB_CODE_SUCCESS == code) { + SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL); + while (NULL != pTable) { + taosArrayPush((*pQuery)->pTableList, pTable); + pTable = taosHashIterate(context.pTableNameHashObj, pTable); + } + } destroyInsertParseContext(&context); return code; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 1ce074c49fb08f13688d31f0dc4a2f4e8071b820..2710e54f9533ce9ed34552bfa0745d5854c7ad34 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1092,11 +1092,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { for (int32_t i = 0; i < batchRsp.nRsps; ++i) { SVCreateTbRsp *rsp = batchRsp.pRsps + i; - if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { - tDecoderClear(&coder); - SCH_ERR_JRET(rsp->code); - } else if (TSDB_CODE_SUCCESS != rsp->code) { + if (TSDB_CODE_SUCCESS != rsp->code) { code = rsp->code; + tDecoderClear(&coder); + SCH_ERR_JRET(code); } } } @@ -1117,11 +1116,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { for (int32_t i = 0; i < batchRsp.nRsps; ++i) { SVDropTbRsp *rsp = batchRsp.pRsps + i; - if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { - tDecoderClear(&coder); - SCH_ERR_JRET(rsp->code); - } else if (TSDB_CODE_SUCCESS != rsp->code) { + if (TSDB_CODE_SUCCESS != rsp->code) { code = rsp->code; + tDecoderClear(&coder); + SCH_ERR_JRET(code); } } } @@ -1137,7 +1135,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(rspCode); if (msg) { - SDecoder coder = {0}; + SDecoder coder = {0}; SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp)); tDecoderInit(&coder, msg, msgSize); code = tDecodeSSubmitRsp(&coder, rsp); @@ -1146,10 +1144,21 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch tFreeSSubmitRsp(rsp); SCH_ERR_JRET(code); } - + + if (rsp->nBlocks > 0) { + for (int32_t i = 0; i < rsp->nBlocks; ++i) { + SSubmitBlkRsp *blk = rsp->pBlocks + i; + if (TSDB_CODE_SUCCESS != blk->code) { + code = blk->code; + tFreeSSubmitRsp(rsp); + SCH_ERR_JRET(code); + } + } + } + atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); - + if (pJob->attr.needRes) { SCH_LOCK(SCH_WRITE, &pJob->resLock); if (pJob->resData) { @@ -1160,7 +1169,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); taosMemoryFree(rsp->pBlocks); taosMemoryFree(rsp); - } else { + } else { pJob->resData = rsp; } SCH_UNLOCK(SCH_WRITE, &pJob->resLock);