提交 33af9c27 编写于 作者: Y yihaoDeng

TD-1589

上级 f9e1ca94
......@@ -3710,7 +3710,7 @@ int32_t handleExprInDelCond(SSqlCmd* pCmd, SQueryInfo *pQueryInfo, tSQLExpr* pEx
return TSDB_CODE_SUCCESS;
}
int32_t getDelCond(SSqlCmd* pCmd, SQueryInfo *pQueryInfo, tSQLExpr* pExpr) {
int32_t getDelCond(SSqlCmd* pCmd, SQueryInfo *pQueryInfo, tSQLExpr* pExpr, int64_t **tsBuf, int32_t *sz) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
......@@ -3731,24 +3731,29 @@ int32_t getDelCond(SSqlCmd* pCmd, SQueryInfo *pQueryInfo, tSQLExpr* pExpr) {
if (pRight == NULL || pRight->nSQLOptr != TK_SET || pRight->pParam == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
int32_t nParam = pRight->pParam->nExpr;
int64_t *tsBuf = malloc(sizeof(int64_t) * nParam);
*tsBuf = malloc(sizeof(int64_t) * nParam);
*sz = nParam;
if (*tsBuf == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int i = 0; i < nParam; i++) {
int64_t ts;
if (getTimeFromExpr(pRight->pParam->a[i].pNode, timePrecision, &ts) != TSDB_CODE_SUCCESS) {
free(tsBuf);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else {
tsBuf[i] = ts;
(*tsBuf)[i] = ts;
}
}
qsort(tsBuf, nParam, sizeof(tsBuf[0]), compareInt64Val);
qsort(*tsBuf, nParam, sizeof((*tsBuf)[0]), getComparFunc(TSDB_DATA_TYPE_TIMESTAMP, 0));
} else {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
} else {
return TSDB_CODE_TSC_INVALID_SQL;
}
return TSDB_CODE_SUCCESS;
//const char* msg1 = "del condition must use 'or'";
//tSQLExpr* pLeft = pExpr->pLeft;
//tSQLExpr* pRight = pExpr->pRight;
......@@ -4643,6 +4648,7 @@ int32_t setDelInfo(SSqlObj *pSql, struct SSqlInfo* pInfo) {
const char* msg1 = "invalid table name";
const char* msg2 = "invalid delete sql";
const char* msg3 = "delete can not be supported by super table";
const char* msg4 = "delete only supported by record";
int32_t code = TSDB_CODE_SUCCESS;
......@@ -4674,15 +4680,43 @@ int32_t setDelInfo(SSqlObj *pSql, struct SSqlInfo* pInfo) {
return code;
}
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
int64_t *tsBuf = NULL;
int32_t sz = 0;
if (pDelSql->pWhere != NULL) {
if (getDelCond(pCmd, pQueryInfo, pDelSql->pWhere) != TSDB_CODE_SUCCESS) {
if (getDelCond(pCmd, pQueryInfo, pDelSql->pWhere, &tsBuf, &sz) != TSDB_CODE_SUCCESS) {
free(tsBuf);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
}
return TSDB_CODE_SUCCESS;
} else {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
if ((code = tscAllocPayload(pCmd, sizeof(SDeleteMsg) + sz * sizeof(tsBuf[0]) + 64)) != TSDB_CODE_SUCCESS) {
return code;
}
SDeleteMsg *pDelMsg = (SDeleteMsg *)pCmd->payload;
pDelMsg->head.contLen = htonl(sizeof(SDeleteMsg) + sz * sizeof(tsBuf[0]));
pDelMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
pDelMsg->uid = htobe64(pTableMeta->id.uid);
pDelMsg->tid = htonl(pTableMeta->id.tid);
pDelMsg->tversion = htons(pTableMeta->tversion);
pDelMsg->delValLen = htonl(sz);
int64_t* data = (int64_t *)(pDelMsg->data);
for (int32_t i = 0; i < sz; i++) {
*data = htobe64(*data);
data += 1;
}
pCmd->payloadLen = sizeof(SDeleteMsg) + sz * sizeof(tsBuf[0]);
free(tsBuf);
return code;
}
int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const int32_t DEFAULT_TABLE_INDEX = 0;
......
......@@ -409,6 +409,7 @@ int doProcessSql(SSqlObj *pSql) {
if (pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_DELETE ||
pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_INSERT ||
pCmd->command == TSDB_SQL_DELETE ||
......@@ -521,6 +522,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of tables to be inserted
// pSql->cmd.payloadLen is set during copying data into payload
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
......@@ -533,12 +535,14 @@ int tscBuildDelMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
char* pMsg = pSql->cmd.payload;
//char* pMsg = pSql->cmd.payload;
// NOTE: shell message size should not include SMsgDesc
int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
//int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
int32_t vgId = pTableMeta->vgroupInfo.vgId;
pSql->cmd.msgType = TSDB_MSG_TYPE_DELETE;
pSql->cmd.msgType = TSDB_MSG_TYPE_DELETE;
tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
tscDebug("%p build delete msg, vgId:%d numberOfEP:%d", pSql, vgId, pSql->epSet.numOfEps);
return TSDB_CODE_SUCCESS;
}
/*
......
......@@ -98,11 +98,11 @@ void dnodeCleanupVnodeWrite() {
void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
char *pCont = (char *)pMsg->pCont;
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_DELETE) {
SMsgDesc *pDesc = (SMsgDesc *)pCont;
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
pCont += sizeof(SMsgDesc);
}
}
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
......
......@@ -302,6 +302,15 @@ typedef struct {
char data[];
} SUpdateTableTagValMsg;
typedef struct {
SMsgHead head;
int64_t uid;
int32_t tid;
int16_t tversion;
int32_t delValLen;
char data[];
} SDeleteMsg;
typedef struct {
char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN];
......
......@@ -31,6 +31,7 @@
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, SRspRet *);
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDeleteMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
......@@ -39,6 +40,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_DELETE] = vnodeProcessDeleteMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg;
......@@ -127,6 +129,10 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
return code;
}
static int32_t vnodeProcessDeleteMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int32_t code = TSDB_CODE_SUCCESS;
return code;
}
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int code = TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册