提交 33a7a83d 编写于 作者: D dapan1121

feature delete data

上级 9cdde2a8
......@@ -32,6 +32,18 @@ extern "C" {
struct SDataSink;
struct SSDataBlock;
typedef struct SDeleterRes {
uint64_t uid;
SArray* uidList;
int64_t skey;
int64_t ekey;
int64_t affectedRows;
} SDeleterRes;
typedef struct SDeleterParam {
SArray* pUidList;
} SDeleterParam;
typedef struct SDataSinkStat {
uint64_t cachedSize;
} SDataSinkStat;
......@@ -64,7 +76,7 @@ typedef struct SOutputData {
* @param pHandle output
* @return error code
*/
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam);
int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat);
......
......@@ -31,7 +31,12 @@ enum {
NODE_TYPE_MNODE,
};
typedef struct SDeleteRes {
uint64_t uid;
SArray* uidList;
int64_t skey;
int64_t ekey;
} SDeleteRes;
typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum;
......@@ -75,6 +80,8 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes);
void qWorkerDestroy(void **qWorkerMgmt);
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
......
......@@ -431,7 +431,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code;
}
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.queryJob != 0) {
......
......@@ -3880,7 +3880,6 @@ int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq) {
int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32(pCoder, pReq->code) < 0) return -1;
if (tEncodeI64(pCoder, pReq->affectedRows) < 0) return -1;
tEndEncode(pCoder);
......@@ -3890,7 +3889,6 @@ int32_t tEncodeSVDeleteRsp(SEncoder* pCoder, const SVDeleteRsp* pReq) {
int32_t tDecodeSVDeleteRsp(SDecoder* pCoder, SVDeleteRsp* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->code) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->affectedRows) < 0) return -1;
tEndDecode(pCoder);
......
......@@ -24,7 +24,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg *pRsp);
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
......@@ -144,7 +144,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
break;
case TDMT_VND_DELETE:
if (vnodeProcessFetchMsg(pVnode, pMsg, pRsp) < 0) goto _err;
if (vnodeProcessWriteMsg(pVnode, version, pMsg, pRsp) < 0) goto _err;
break;
/* TQ */
case TDMT_VND_MQ_VG_CHANGE:
......@@ -260,13 +260,16 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
}
}
int vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) {
vTrace("message in write queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SDeleteRes res = {0};
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) {
case TDMT_VND_DELETE:
return qWorkerProcessDeleteMsg(pVnode, pVnode->pQuery, pMsg, pRsp);
return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, pRsp, &res);
default:
vError("unknown msg type:%d in write queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
......
......@@ -49,6 +49,7 @@ typedef struct SDataSinkHandle {
} SDataSinkHandle;
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam);
#ifdef __cplusplus
}
......
......@@ -855,6 +855,7 @@ int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
int32_t* resNum);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tqueue.h"
extern SDataSinkStat gDataSinkStat;
typedef struct SDataDeleterBuf {
int32_t useSize;
int32_t allocSize;
char* pData;
} SDataDeleterBuf;
typedef struct SDataCacheEntry {
int32_t dataLen;
int32_t numOfRows;
int32_t numOfCols;
int8_t compressed;
char data[];
} SDataCacheEntry;
typedef struct SDataDeleterHandle {
SDataSinkHandle sink;
SDataSinkManager* pManager;
SDataBlockDescNode* pSchema;
SDataDeleterNode* pDeleter;
SDeleterParam* pParam;
STaosQueue* pDataBlocks;
SDataDeleterBuf nextOutput;
int32_t status;
bool queryEnd;
uint64_t useconds;
uint64_t cachedSize;
TdThreadMutex mutex;
} SDataDeleterHandle;
static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
if (tsCompressColData < 0 || 0 == pData->info.rows) {
return false;
}
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
int32_t colSize = pColRes->info.bytes * pData->info.rows;
if (NEEDTO_COMPRESS_QUERY(colSize)) {
return true;
}
}
return false;
}
static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) {
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = 0;
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->numOfCols = pInput->pData->info.numOfCols;
pEntry->dataLen = sizeof(SDeleterRes);
ASSERT(1 == pEntry->numOfRows);
ASSERT(1 == pEntry->numOfCols);
pBuf->useSize = sizeof(SDataCacheEntry);
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
pRes->uid = pHandle->pDeleter->tableId;
pRes->uidList = pHandle->pParam->pUidList;
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
pRes->affectedRows = *(int64_t*)pColRes->pData;
pBuf->useSize += pEntry->dataLen;
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
}
static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) {
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
taosQueueItemSize(pDeleter->pDataBlocks));
return false;
}
pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes);
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
}
return NULL != pBuf->pData;
}
static int32_t updateStatus(SDataDeleterHandle* pDeleter) {
taosThreadMutexLock(&pDeleter->mutex);
int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks);
int32_t status =
(0 == blockNums ? DS_BUF_EMPTY
: (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pDeleter->status = status;
taosThreadMutexUnlock(&pDeleter->mutex);
return status;
}
static int32_t getStatus(SDataDeleterHandle* pDeleter) {
taosThreadMutexLock(&pDeleter->mutex);
int32_t status = pDeleter->status;
taosThreadMutexUnlock(&pDeleter->mutex);
return status;
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM);
if (NULL == pBuf || !allocBuf(pDeleter, pInput, pBuf)) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
toDataCacheEntry(pDeleter, pInput, pBuf);
taosWriteQitem(pDeleter->pDataBlocks, pBuf);
*pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false);
return TSDB_CODE_SUCCESS;
}
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
taosThreadMutexLock(&pDeleter->mutex);
pDeleter->queryEnd = true;
pDeleter->useconds = useconds;
taosThreadMutexUnlock(&pDeleter->mutex);
}
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
if (taosQueueEmpty(pDeleter->pDataBlocks)) {
*pQueryEnd = pDeleter->queryEnd;
*pLen = 0;
return;
}
SDataDeleterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen;
*pQueryEnd = pDeleter->queryEnd;
qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
if (NULL == pDeleter->nextOutput.pData) {
assert(pDeleter->queryEnd);
pOutput->useconds = pDeleter->useconds;
pOutput->precision = pDeleter->pSchema->precision;
pOutput->bufStatus = DS_BUF_EMPTY;
pOutput->queryEnd = pDeleter->queryEnd;
return TSDB_CODE_SUCCESS;
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed;
atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent
pOutput->bufStatus = updateStatus(pDeleter);
taosThreadMutexLock(&pDeleter->mutex);
pOutput->queryEnd = pDeleter->queryEnd;
pOutput->useconds = pDeleter->useconds;
pOutput->precision = pDeleter->pSchema->precision;
taosThreadMutexUnlock(&pDeleter->mutex);
return TSDB_CODE_SUCCESS;
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
taosMemoryFreeClear(pDeleter->nextOutput.pData);
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
SDataDeleterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf);
}
taosCloseQueue(pDeleter->pDataBlocks);
taosThreadMutexDestroy(&pDeleter->mutex);
return TSDB_CODE_SUCCESS;
}
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle;
*size = atomic_load_64(&pDispatcher->cachedSize);
return TSDB_CODE_SUCCESS;
}
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) {
SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
if (NULL == deleter) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink;
deleter->sink.fPut = putDataBlock;
deleter->sink.fEndPut = endPut;
deleter->sink.fGetLen = getDataLength;
deleter->sink.fGetData = getDataBlock;
deleter->sink.fDestroy = destroyDataSinker;
deleter->sink.fGetCacheSize = getCacheSize;
deleter->pManager = pManager;
deleter->pDeleter = pDeleterNode;
deleter->pSchema = pDataSink->pInputDataBlockDesc;
deleter->pParam = pParam;
deleter->status = DS_BUF_EMPTY;
deleter->queryEnd = false;
deleter->pDataBlocks = taosOpenQueue();
taosThreadMutexInit(&deleter->mutex, NULL);
if (NULL == deleter->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
*pHandle = deleter;
return TSDB_CODE_SUCCESS;
}
......@@ -83,7 +83,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pEntry->numOfCols = pInput->pData->info.numOfCols;
pEntry->dataLen = 0;
pBuf->useSize = sizeof(SRetrieveTableRsp);
pBuf->useSize = sizeof(SDataCacheEntry);
blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
pBuf->useSize += pEntry->dataLen;
......@@ -100,7 +100,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false;
}
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pInput->pData);
pBuf->allocSize = sizeof(SDataCacheEntry) + blockGetEncodeSize(pInput->pData);
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
......@@ -211,7 +211,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
return TSDB_CODE_SUCCESS;
}
int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
*size = atomic_load_64(&pDispatcher->cachedSize);
......
......@@ -34,9 +34,12 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat) {
}
int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle) {
if (QUERY_NODE_PHYSICAL_PLAN_DISPATCH == nodeType(pDataSink)) {
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle, void* pParam) {
switch (nodeType(pDataSink)) {
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam);
}
return TSDB_CODE_FAILED;
}
......
......@@ -45,8 +45,15 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (handle) {
code = dsCreateDataSinker(pSubplan->pDataSink, handle);
void* pSinkParam = NULL;
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
}
_error:
......
......@@ -4952,6 +4952,37 @@ int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length) {
return TDB_CODE_SUCCESS;
}
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo) {
SExecTaskInfo* pTask = (SExecTaskInfo*)pTaskInfo;
switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
SDeleterParam *pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
if (NULL == pDeleterParam) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
if (NULL == pDeleterParam->pUidList) {
taosMemoryFree(pDeleterParam);
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < tbNum; ++i) {
STableKeyInfo *pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
}
*pParam = pDeleterParam;
break;
}
default:
break;
}
return TSDB_CODE_SUCCESS;
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId;
......
......@@ -5012,6 +5012,10 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->haveResultSet = true;
pQuery->msgType = TDMT_VND_QUERY;
break;
case QUERY_NODE_DELETE_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_DELETE;
break;
case QUERY_NODE_VNODE_MODIF_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType);
......
......@@ -1335,6 +1335,7 @@ static int32_t buildDeleteSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode
if (TSDB_CODE_SUCCESS == code) {
code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
}
pSubplan->msgType = TDMT_VND_DELETE;
return code;
}
......
......@@ -88,7 +88,7 @@ int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstream
}
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType) {
if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
*pLen = insert->size;
*pStr = insert->pData;
......
......@@ -30,6 +30,7 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes *pRes);
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
......
......@@ -518,7 +518,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
}
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp) {
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *pRsp, SDeleteRes *pRes) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == pRsp) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -540,7 +540,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SR
QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
taosMemoryFreeClear(req.sql);
QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRsp));
QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRsp, pRes));
QW_SCH_TASK_DLOG("processDelete end, node:%p", node);
......
......@@ -291,7 +291,9 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
}
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
if (ctx->ctrlConnInfo.handle) {
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
}
ctx->ctrlConnInfo.handle = NULL;
ctx->ctrlConnInfo.refId = -1;
......
......@@ -242,7 +242,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
return TSDB_CODE_SUCCESS;
}
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg) {
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SDeleteRes *pRes) {
int32_t len = 0;
SVDeleteRsp rsp = {0};
bool queryEnd = false;
......@@ -251,7 +251,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
if (len <= 0 || len != sizeof(SVDeleteRsp)) {
if (len <= 0 || len != sizeof(SDeleterRes)) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -268,13 +268,17 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen
QW_ERR_RET(code);
}
rsp.affectedRows = *(int64_t*)output.pData;
SDeleterRes* pDelRes = (SDeleterRes*)output.pData;
rsp.affectedRows = pDelRes->affectedRows;
pRes->uid = pDelRes->uid;
pRes->uidList = pDelRes->uidList;
pRes->skey = pDelRes->skey;
pRes->ekey = pDelRes->ekey;
int32_t len;
int32_t ret = 0;
SEncoder coder = {0};
tEncodeSize(tEncodeSVDeleteRsp, &rsp, len, ret);
void *msg = taosMemoryCalloc(1, len);
tEncodeSize(tEncodeSVDeleteRsp, &rsp, len, code);
void *msg = rpcMallocCont(len);
tEncoderInit(&coder, msg, len);
tEncodeSVDeleteRsp(&coder, &rsp);
tEncoderClear(&coder);
......@@ -918,7 +922,7 @@ _return:
qwRelease(refId);
}
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp) {
int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes *pRes) {
int32_t code = 0;
SSubplan *plan = NULL;
qTaskInfo_t pTaskInfo = NULL;
......@@ -932,7 +936,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp) {
QW_ERR_JRET(code);
}
ctx->plan = plan;
ctx.plan = plan;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
if (code) {
......@@ -945,12 +949,12 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
ctx->taskHandle = pTaskInfo;
ctx->sinkHandle = sinkHandle;
ctx.taskHandle = pTaskInfo;
ctx.sinkHandle = sinkHandle;
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx, NULL));
QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, &pRsp->contLen, &pRsp->pCont));
QW_ERR_JRET(qwGetDeleteResFromSink(QW_FPARAMS(), &ctx, &pRsp->contLen, &pRsp->pCont, pRes));
_return:
......
......@@ -1476,7 +1476,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_
SSchJob *pJob = NULL;
SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync));
qDebug("QID:0x%" PRIx64 " jobId:0x%"PRIx64 " started", pDag->queryId, pJob->refId);
qDebug("QID:0x%" PRIx64 " job refId 0x%"PRIx64 " started", pDag->queryId, pJob->refId);
*job = pJob->refId;
SCH_ERR_JRET(schLaunchJob(pJob));
......
......@@ -62,10 +62,11 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
case TDMT_VND_DROP_TABLE_RSP:
case TDMT_VND_ALTER_TABLE_RSP:
case TDMT_VND_SUBMIT_RSP:
case TDMT_VND_DELETE_RSP:
break;
default:
SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
SCH_ERR_RET(TSDB_CODE_INVALID_MSG);
}
if (lastMsgType != reqMsgType) {
......@@ -236,8 +237,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
tDecoderInit(&coder, msg, msgSize);
tDecodeSVDeleteRsp(&coder, &rsp);
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
SCH_TASK_DLOG("delete succeed, affectedRows:%d", rsp->affectedRows);
atomic_add_fetch_32(&pJob->resNumOfRows, rsp.affectedRows);
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
}
taosMemoryFreeClear(msg);
......@@ -430,6 +431,10 @@ int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code)
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleDeleteCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_DELETE_RSP, code);
}
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
......@@ -520,6 +525,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case TDMT_VND_QUERY:
*fp = schHandleQueryCallback;
break;
case TDMT_VND_DELETE:
*fp = schHandleDeleteCallback;
break;
case TDMT_VND_EXPLAIN:
*fp = schHandleExplainCallback;
break;
......@@ -1003,23 +1011,22 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case TDMT_VND_DELETE: {
SVDeleteReq req = {0};
req.header.vgId = addr->nodeId;
req.sId = schMgmt.sId;
req.queryId = pJob->queryId;
req.taskId = pTask->taskId;
req.phyLen = pTask->msgLen;
req.sqlLen = strlen(pJob->sql);
req.sql = pJob->sql;
req.sql = (char*)pJob->sql;
req.msg = pTask->msg;
int32_t len = tSerializeSVDeleteReq(NULL, 0, &req);
msg = taosMemoryCalloc(1, len);
msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", len);
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
tSerializeSVDeleteReq(msg, len, &req);
SVDeleteReq *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
tSerializeSVDeleteReq(msg, msgSize, &req);
break;
}
case TDMT_VND_QUERY: {
......
......@@ -257,6 +257,8 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
taosHashCleanup(pCtx->args);
(*pCtx->freeFunc)(pCtx->brokenVal.val);
if (pCtx->freeFunc) {
(*pCtx->freeFunc)(pCtx->brokenVal.val);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册