From 33af9c27928123fc1629be22b568a448c708f317 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 21 Oct 2020 10:05:34 +0000 Subject: [PATCH] TD-1589 --- src/client/src/tscSQLParser.c | 50 +++++++++++++++++++++++++++++------ src/client/src/tscServer.c | 12 ++++++--- src/dnode/src/dnodeVWrite.c | 4 +-- src/inc/taosmsg.h | 9 +++++++ src/vnode/src/vnodeWrite.c | 6 +++++ 5 files changed, 67 insertions(+), 14 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 352b94d184..efe8ac86f8 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3710,7 +3710,7 @@ int32_t handleExprInDelCond(SSqlCmd* pCmd, SQueryInfo *pQueryInfo, tSQLExpr* pEx return TSDB_CODE_SUCCESS; } -int32_t getDelCond(SSqlCmd* pCmd, SQueryInfo *pQueryInfo, tSQLExpr* pExpr) { +int32_t getDelCond(SSqlCmd* pCmd, SQueryInfo *pQueryInfo, tSQLExpr* pExpr, int64_t **tsBuf, int32_t *sz) { if (pExpr == NULL) { return TSDB_CODE_SUCCESS; } @@ -3731,24 +3731,29 @@ int32_t getDelCond(SSqlCmd* pCmd, SQueryInfo *pQueryInfo, tSQLExpr* pExpr) { if (pRight == NULL || pRight->nSQLOptr != TK_SET || pRight->pParam == NULL) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } + int32_t nParam = pRight->pParam->nExpr; - int64_t *tsBuf = malloc(sizeof(int64_t) * nParam); + *tsBuf = malloc(sizeof(int64_t) * nParam); + *sz = nParam; + if (*tsBuf == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } for (int i = 0; i < nParam; i++) { int64_t ts; if (getTimeFromExpr(pRight->pParam->a[i].pNode, timePrecision, &ts) != TSDB_CODE_SUCCESS) { - free(tsBuf); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } else { - tsBuf[i] = ts; + (*tsBuf)[i] = ts; } } - qsort(tsBuf, nParam, sizeof(tsBuf[0]), compareInt64Val); + qsort(*tsBuf, nParam, sizeof((*tsBuf)[0]), getComparFunc(TSDB_DATA_TYPE_TIMESTAMP, 0)); } else { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } } else { return TSDB_CODE_TSC_INVALID_SQL; } + return TSDB_CODE_SUCCESS; //const char* msg1 = "del condition must use 'or'"; //tSQLExpr* pLeft = pExpr->pLeft; //tSQLExpr* pRight = pExpr->pRight; @@ -4643,6 +4648,7 @@ int32_t setDelInfo(SSqlObj *pSql, struct SSqlInfo* pInfo) { const char* msg1 = "invalid table name"; const char* msg2 = "invalid delete sql"; const char* msg3 = "delete can not be supported by super table"; + const char* msg4 = "delete only supported by record"; int32_t code = TSDB_CODE_SUCCESS; @@ -4674,15 +4680,43 @@ int32_t setDelInfo(SSqlObj *pSql, struct SSqlInfo* pInfo) { return code; } + STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } + + int64_t *tsBuf = NULL; + int32_t sz = 0; if (pDelSql->pWhere != NULL) { - if (getDelCond(pCmd, pQueryInfo, pDelSql->pWhere) != TSDB_CODE_SUCCESS) { + if (getDelCond(pCmd, pQueryInfo, pDelSql->pWhere, &tsBuf, &sz) != TSDB_CODE_SUCCESS) { + free(tsBuf); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } - } - return TSDB_CODE_SUCCESS; + } else { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); + } + + if ((code = tscAllocPayload(pCmd, sizeof(SDeleteMsg) + sz * sizeof(tsBuf[0]) + 64)) != TSDB_CODE_SUCCESS) { + return code; + } + + SDeleteMsg *pDelMsg = (SDeleteMsg *)pCmd->payload; + pDelMsg->head.contLen = htonl(sizeof(SDeleteMsg) + sz * sizeof(tsBuf[0])); + pDelMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); + pDelMsg->uid = htobe64(pTableMeta->id.uid); + pDelMsg->tid = htonl(pTableMeta->id.tid); + pDelMsg->tversion = htons(pTableMeta->tversion); + pDelMsg->delValLen = htonl(sz); + int64_t* data = (int64_t *)(pDelMsg->data); + for (int32_t i = 0; i < sz; i++) { + *data = htobe64(*data); + data += 1; + } + pCmd->payloadLen = sizeof(SDeleteMsg) + sz * sizeof(tsBuf[0]); + free(tsBuf); + + return code; } int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { const int32_t DEFAULT_TABLE_INDEX = 0; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 41a4a1a107..f51aa081ce 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -409,6 +409,7 @@ int doProcessSql(SSqlObj *pSql) { if (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_FETCH || + pCmd->command == TSDB_SQL_DELETE || pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_DELETE || @@ -521,6 +522,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of tables to be inserted // pSql->cmd.payloadLen is set during copying data into payload + pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet); @@ -533,12 +535,14 @@ int tscBuildDelMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; - char* pMsg = pSql->cmd.payload; - + //char* pMsg = pSql->cmd.payload; // NOTE: shell message size should not include SMsgDesc - int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); + //int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); int32_t vgId = pTableMeta->vgroupInfo.vgId; - pSql->cmd.msgType = TSDB_MSG_TYPE_DELETE; + + pSql->cmd.msgType = TSDB_MSG_TYPE_DELETE; + tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet); + tscDebug("%p build delete msg, vgId:%d numberOfEP:%d", pSql, vgId, pSql->epSet.numOfEps); return TSDB_CODE_SUCCESS; } /* diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index e56bae0d7e..89438ce0ae 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -98,11 +98,11 @@ void dnodeCleanupVnodeWrite() { void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { char *pCont = (char *)pMsg->pCont; - if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { + if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_DELETE) { SMsgDesc *pDesc = (SMsgDesc *)pCont; pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); pCont += sizeof(SMsgDesc); - } + } SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 32fd473f78..528bffce2e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -302,6 +302,15 @@ typedef struct { char data[]; } SUpdateTableTagValMsg; +typedef struct { + SMsgHead head; + int64_t uid; + int32_t tid; + int16_t tversion; + int32_t delValLen; + char data[]; +} SDeleteMsg; + typedef struct { char clientVersion[TSDB_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN]; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index c4924f312f..27a512a4fd 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -31,6 +31,7 @@ static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessDeleteMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); @@ -39,6 +40,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; + vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DELETE] = vnodeProcessDeleteMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg; @@ -127,6 +129,10 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR return code; } +static int32_t vnodeProcessDeleteMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { + int32_t code = TSDB_CODE_SUCCESS; + return code; +} static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { int code = TSDB_CODE_SUCCESS; -- GitLab