diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index 79d9fdf0062a3bcefd43f381de49e365be224704..1bd7d1f453866c00a3ed613e49244188c5a85fba 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -55,6 +55,7 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs); void tscFreeRetrieveSup(void **param); SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj, __async_cb_func_t fp, int32_t cmd); +void doConcurrentlySendSubQueries(SSqlObj* pSql); #ifdef __cplusplus } diff --git a/src/client/src/tscDelete.c b/src/client/src/tscDelete.c index fc31bf1b37355e38378d01c16b1aed39614ffac7..5df03ebf26a389f109f11c0251bdf4d987650764 100644 --- a/src/client/src/tscDelete.c +++ b/src/client/src/tscDelete.c @@ -14,12 +14,169 @@ */ #include "os.h" +#include "taosmsg.h" #include "tcmdtype.h" #include "tscLog.h" #include "tscUtil.h" #include "tsclient.h" #include "tscDelete.h" +#include "tscSubquery.h" + +void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo); + +// +// handle error +// +void tscHandleSubDeleteError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { + +} + +// +// sub delete sql callback +// +void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) { + // the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this + // function while kill query by a user. + if (param == NULL) { + assert(code != TSDB_CODE_SUCCESS); + return; + } + + SRetrieveSupport *trsupport = (SRetrieveSupport *) param; + + SSqlObj* pParentSql = trsupport->pParentSql; + SSqlObj* pSql = (SSqlObj *) tres; + + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentSql->cmd, 0); + SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex]; + + // + // stable query killed or other subquery failed, all query stopped + // + if (pParentSql->res.code != TSDB_CODE_SUCCESS) { + trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; + tscError("0x%"PRIx64" query cancelled or failed, sub:0x%"PRIx64", vgId:%d, orderOfSub:%d, code:%s, global code:%s", + pParentSql->self, pSql->self, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code)); + + tscHandleSubDeleteError(param, tres, code); + + if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) { + // all sub done, call parentSQL callback to finish + (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); + } + return; + } + + /* + * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later + * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack + * function to abort current and remain retrieve process. + * + * NOTE: thread safe is required. + */ + if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { + assert(code == taos_errno(pSql)); + int32_t sent = 0; + + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID && code != TSDB_CODE_VND_INVALID_VGROUP_ID)) { + tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry); + + //tscReissueSubquery(trsupport, pSql, code, &sent); + if (sent) { + return; + } + } else { + tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code)); + atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort + } + + tscHandleSubDeleteError(param, tres, pParentSql->res.code); + + if(!sent) { + if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) { + // all sub done, call parentSQL callback to finish + (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); + } + } + + return; + } + + tscDebug("0x%"PRIx64":CDEL sub:0x%"PRIx64" query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql->self, + pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex); + + // do merge + pParentSql->res.numOfRows += pSql->res.numOfRows; + if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) { + // all sub done, call parentSQL callback to finish + (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); + } + + return ; +} + +void writeMsgVgId(char * payload, int32_t vgId) { + SSubmitMsg* pSubmitMsg = (SSubmitMsg *)(payload + sizeof(SMsgDesc)); + // SSubmitMsg + pSubmitMsg->header.vgId = htonl(vgId); +} + +// +// STable malloc sub delete +// +SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrieveSupport *trsupport) { + // Init + SSqlCmd* pCmd = &pSql->cmd; + SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); + if (pNew == NULL) { + tscError("0x%"PRIx64":CDEL new subdelete failed.", pSql->self); + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + + pNew->pTscObj = pSql->pTscObj; + pNew->signature = pNew; + pNew->sqlstr = strdup(pSql->sqlstr); + pNew->rootObj = pSql->rootObj; + pNew->fp = tscSubDeleteCallback; + pNew->fetchFp = tscSubDeleteCallback; + pNew->param = trsupport; + pNew->maxRetry = TSDB_MAX_REPLICA; + + SSqlCmd* pNewCmd = &pNew->cmd; + memcpy(pNewCmd, pCmd, sizeof(SSqlCmd)); + // set zero + pNewCmd->pQueryInfo = NULL; + pNewCmd->active = NULL; + pNewCmd->payload = NULL; + pNewCmd->allocSize = 0; + + // payload copy + int32_t ret = tscAllocPayload(pNewCmd, pCmd->payloadLen); + if (ret != TSDB_CODE_SUCCESS) { + tscError("0x%"PRIx64":CDEL , sub delete alloc payload failed. errcode=%d", pSql->self, ret); + free(pNew); + return NULL; + } + memcpy(pNewCmd->payload, pCmd->payload, pCmd->payloadLen); + + // update vgroup id + writeMsgVgId(pNewCmd->payload ,pVgroupMsg->vgId); + + tsem_init(&pNew->rspSem, 0, 0); + registerSqlObj(pNew); + tscDebug("0x%"PRIx64":CDEL new sub insertion: %p", pSql->self, pNew); + + // set vnode epset + tscSetDnodeEpSet(&pNew->epSet, pVgroupMsg); + + return pNew; +} + +// +// execute delete sql +// int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { int32_t ret = TSDB_CODE_SUCCESS; @@ -30,8 +187,6 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { pSql->cmd.active = pQueryInfo; return tscBuildAndSendRequest(pSql, pQueryInfo); } - return ret; - /* // // super table @@ -40,100 +195,71 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - // pRes->code check only serves in launching super table sub-queries + // check cancel if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function. return pRes->code; } - - tExtMemBuffer **pMemoryBuf = NULL; - tOrderDescriptor *pDesc = NULL; + if(pTableMetaInfo->vgroupList == NULL) { + tscError(":CDEL SQL:%p tablename=%s vgroupList is NULL.", pSql, pTableMetaInfo->name.tname); + return TSDB_CODE_VND_INVALID_VGROUP_ID; + } + pRes->qId = 0x1; // hack the qhandle check - - uint32_t nBufferSize = (1u << 18u); // 256KB, default buffer size - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); SSubqueryState *pState = &pSql->subState; - int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups - : (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); + int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups; - int32_t ret = doInitSubState(pSql, numOfSub); + ret = doInitSubState(pSql, numOfSub); if (ret != 0) { tscAsyncResultOnError(pSql); return ret; } - ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, &nBufferSize, pSql->self); - if (ret != 0) { - pRes->code = ret; - tscAsyncResultOnError(pSql); - tfree(pDesc); - tfree(pMemoryBuf); - return ret; - } - - tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub); + tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub); pRes->code = TSDB_CODE_SUCCESS; - int32_t i = 0; - for (; i < pState->numOfSub; ++i) { + int32_t i; + for (i = 0; i < pState->numOfSub; ++i) { + // vgroup + SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i]; + + // malloc each support SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport)); if (trs == NULL) { tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); break; } - - trs->pExtMemBuffer = pMemoryBuf; - trs->pOrderDescriptor = pDesc; - - trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); - trs->localBufferSize = nBufferSize + sizeof(tFilePage); - if (trs->localBuffer == NULL) { - tscError("0x%"PRIx64" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); - tfree(trs); - break; - } - - trs->localBuffer->num = 0; trs->subqueryIndex = i; trs->pParentSql = pSql; - SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL); + // malloc sub SSqlObj + SSqlObj *pNew = tscCreateSTableSubDelete(pSql, pVgroupMsg, trs); if (pNew == NULL) { - tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); + tscError("0x%"PRIx64"CDEL failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno)); tfree(trs->localBuffer); tfree(trs); break; } - - // todo handle multi-vnode situation - if (pQueryInfo->tsBuf) { - SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd); - pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf); - assert(pNewQueryInfo->tsBuf != NULL); - } - - tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self, - trs->subqueryIndex); + pSql->pSubs[i] = pNew; } if (i < pState->numOfSub) { - tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self); + tscError("0x%"PRIx64":CDEL failed to prepare subdelete structure and launch subqueries", pSql->self); pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub); doCleanupSubqueries(pSql, i); return pRes->code; // free all allocated resource } if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub); doCleanupSubqueries(pSql, i); return pRes->code; } + // send sub sql doConcurrentlySendSubQueries(pSql); - + //return TSDB_CODE_TSC_QUERY_CANCELLED; + return TSDB_CODE_SUCCESS; - */ } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e107d07128052f5fc7ec03eaa0ab97dfdd28dfba..c7e97a944beddbeac9d0b04c2c890821b78e4956 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1068,6 +1068,14 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (code != TSDB_CODE_SUCCESS) { return code ; // async load table meta } + + // vgroupInfo if super + if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + code = tscGetSTableVgroupInfo(pSql, pQueryInfo); + } + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + return code; + } // CHECK AND SET WHERE if (pInfo->pDelData->pWhere) { @@ -6460,6 +6468,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq const char* msg1 = "invalid expression"; const char* msg2 = "the timestamp column condition must be in an interval"; + int32_t ret = TSDB_CODE_SUCCESS; // tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space @@ -6509,7 +6518,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq goto PARSE_WHERE_EXIT; } - // check timestamp range + // check timestamp range if (pQueryInfo->window.skey > pQueryInfo->window.ekey) { return invalidOperationMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 93224a0937b35d7407817aab762db543fc3477b2..6a89de10afc75cc2d175aa93b2b904759e70c232 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -73,7 +73,7 @@ static int32_t removeDupVgid(int32_t *src, int32_t sz) { return ret; } -static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) { +void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) { assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); // Issue the query to one of the vnode among a vgroup randomly. @@ -619,7 +619,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) { } tscDebug("0x%"PRIx64" SQL cmd:%s will be processed, name:%s, type:%d", pSql->self, sqlCmd[pCmd->command], name, type); - if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL + if (pCmd->command < TSDB_SQL_MGMT && pCmd->command != TSDB_SQL_DELETE_DATA) { // the pTableMetaInfo cannot be NULL if (pTableMetaInfo == NULL) { pSql->res.code = TSDB_CODE_TSC_APP_ERROR; return pSql->res.code; @@ -3318,20 +3318,41 @@ int buildSTableDelDataMsg(SSqlObj *pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, return 0; } -// Normal Child Table -int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SSqlInfo *pInfo) { - STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; +int tscBuildDelDataMsg(SSqlObj *pSql, SSqlInfo *pInfo) { + SSqlCmd *pCmd = &pSql->cmd; + SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + STableMeta *pTableMeta = pTableMetaInfo->pTableMeta; + uint32_t command = CMD_DELETE_DATA; + // pSql->cmd.payloadLen is set during copying data into payload pCmd->msgType = TSDB_MSG_TYPE_SUBMIT; + if (pTableMeta->tableType == TSDB_SUPER_TABLE) { + // super table to do + command |= FLAG_SUPER_TABLE; + } else { + // no super table to do copy epSet + SNewVgroupInfo vgroupInfo = {0}; + taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); + tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); + tscDebug("0x%"PRIx64" table deldata submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps); + } - SNewVgroupInfo vgroupInfo = {0}; - taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo); - tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo); + SCond *pCond = NULL; + // serialize tag column query condition + if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) { + STagCond* pTagCond = &pQueryInfo->tagCond; + pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid); + } - tscDebug("0x%"PRIx64" table deldata submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps); - // set payload - size_t payloadLen = sizeof(SMsgDesc) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + sizeof(SControlData) + sizeof(int32_t); + int32_t tagCondLen = 0; + if (pCond) { + tagCondLen = pCond->len; + } + + size_t payloadLen = sizeof(SMsgDesc) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + sizeof(SControlData) + tagCondLen; + int32_t ret = tscAllocPayload(pCmd, payloadLen); if (ret != TSDB_CODE_SUCCESS) { return ret; @@ -3362,28 +3383,19 @@ int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, S pSubmitBlk->numOfRows = htons(1); pSubmitBlk->schemaLen = 0; // only server return TSDB_CODE_TDB_TABLE_RECONFIGURE need schema attached pSubmitBlk->sversion = htonl(pTableMeta->sversion); - pSubmitBlk->dataLen = htonl(sizeof(SControlData) + sizeof(int32_t)); + pSubmitBlk->dataLen = htonl(sizeof(SControlData) + tagCondLen); // SControlData - pControlData->command = htonl(CMD_DELETE_DATA); + pControlData->command = htonl(command); pControlData->win.skey = htobe64(pQueryInfo->window.skey); pControlData->win.ekey = htobe64(pQueryInfo->window.ekey); - pControlData->tnum = htonl(1); - pControlData->tids[0] = htonl(pTableMeta->id.tid); - - return TSDB_CODE_SUCCESS; -} - -int tscBuildDelDataMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - if(UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - return buildTableDelDataMsg(pSql, pCmd, pQueryInfo, pTableMetaInfo, pInfo); - } else { - return buildTableDelDataMsg(pSql, pCmd, pQueryInfo, pTableMetaInfo, pInfo); + // set tagCond + if (pCond) { + pControlData->tagCondLen = htonl(pCond->len); + memcpy(pControlData->tagCond, pCond->cond, pCond->len); } + + return TSDB_CODE_SUCCESS; } void tscInitMsgsFp() { @@ -3466,5 +3478,4 @@ void tscInitMsgsFp() { tscKeepConn[TSDB_SQL_SELECT] = 1; tscKeepConn[TSDB_SQL_FETCH] = 1; tscKeepConn[TSDB_SQL_HB] = 1; -} - +} \ No newline at end of file diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index ed1dc52472b3aab69eb36920710f7264e578f1dd..a8ceb759b7cd5019e08746a1a9b2c0efceb6d733 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2611,7 +2611,7 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) { tfree(p); } -static void doConcurrentlySendSubQueries(SSqlObj* pSql) { +void doConcurrentlySendSubQueries(SSqlObj* pSql) { SSubqueryState *pState = &pSql->subState; // concurrently sent the query requests. diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 80efba21bfc5e3de7fd7613b29e161743486e9b5..b2e4dc3cb0e40bff098010aa0ffcf5214f41ca2e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -1003,15 +1003,19 @@ typedef struct { char value[]; } STLV; +// Ox00000001 ~ 0x00010000 command id 16 items #define CMD_DELETE_DATA 0x00000001 -#define CMD_TRUNCATE 0x00000002 + +// 0x00010000 ~ 0x10000000 command flag 16 items +#define FLAG_SUPER_TABLE 0x00010000 #define GET_CTLDATA_SIZE(p) (sizeof(SControlData) + p->tnum * sizeof(int32_t)) -typedef struct SControlData{ +typedef struct SControlData { uint32_t command; // see define CMD_??? STimeWindow win; - int32_t tnum; // tids nums - int32_t tids[]; // delete table tid + // tag cond + int32_t tagCondLen; + char tagCond[]; } SControlData; enum { diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 4da4eb3744697f4ceaf54011fac8f27c0755a515..fbde9e0853a94d30d2fd0d52214d51bb64da87ff 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -449,6 +449,8 @@ char* parseTagDatatoJson(void *p); typedef bool (*readover_callback)(void* param, int8_t type, int32_t tid); void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callback, void* param); +int32_t tsdbTableTid(void* pTable); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/inc/tsdbDelete.h b/src/tsdb/inc/tsdbDelete.h index df41195ef34e543478f9e8720b1c29d6e0df75bb..028f470e5049ba98fdead60e2ab6b2e3be8d1146 100644 --- a/src/tsdb/inc/tsdbDelete.h +++ b/src/tsdb/inc/tsdbDelete.h @@ -20,13 +20,18 @@ extern "C" { #endif // SControlData addition information -#define GET_CTLINFO_SIZE(p) (sizeof(SControlDataInfo) + p.ctlData.tnum * sizeof(int32_t)) +#define GET_CTLINFO_SIZE(p) (sizeof(SControlDataInfo) + p->tnum * sizeof(int32_t)) typedef struct { // addition info tsem_t* pSem; bool memNull; // pRepo->mem is NULL, this is true SShellSubmitRspMsg *pRsp; - SControlData ctlData; + + // base ControlData + STimeWindow win; // come from SControlData.win + uint32_t command; // come from SControlData.command + int32_t tnum; // num of tids array + int32_t tids[]; } SControlDataInfo; // -------- interface --------- diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 6073aa2a117ef64543a77b417b67e73ee92afecd..7abad23114d09486eb511b809e4b4dd5453a5cfe 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -1787,7 +1787,7 @@ int tsdbCommitControl(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo) { int ret = TSDB_CODE_SUCCESS; // do command - if(pCtlDataInfo->ctlData.command == CMD_DELETE_DATA) { + if(pCtlDataInfo->command & CMD_DELETE_DATA) { // delete data ret = tsdbControlDelete(pRepo, pCtlDataInfo); } diff --git a/src/tsdb/src/tsdbDelete.c b/src/tsdb/src/tsdbDelete.c index 24adec7e12b3c05775b2f731a5693d6f9bd50000..4ddf644cab2b73f5d79da01b2fb4afef55c85d03 100644 --- a/src/tsdb/src/tsdbDelete.c +++ b/src/tsdb/src/tsdbDelete.c @@ -179,14 +179,14 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { SDeleteH truncateH = {0}; SDFileSet * pSet = NULL; - tsdbDebug("vgId:%d start to truncate TS data for %d", REPO_ID(pRepo), pCtlInfo->ctlData.tids[0]); + tsdbDebug("vgId:%d start to truncate TS data for %d", REPO_ID(pRepo), pCtlInfo->tids[0]); if (tsdbInitDeleteH(&truncateH, pRepo) < 0) { return -1; } truncateH.pCtlInfo = pCtlInfo; - STimeWindow win = pCtlInfo->ctlData.win; + STimeWindow win = pCtlInfo->win; int sFid = TSDB_KEY_FID(win.skey, pCfg->daysPerFile, pCfg->precision); int eFid = TSDB_KEY_FID(win.ekey, pCfg->daysPerFile, pCfg->precision); @@ -216,8 +216,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { } #endif - - if (pCtlInfo->ctlData.command == CMD_DELETE_DATA) { + if (pCtlInfo->command & CMD_DELETE_DATA) { if (tsdbFSetDelete(&truncateH, pSet) < 0) { tsdbDestroyDeleteH(&truncateH); tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); @@ -448,12 +447,12 @@ static int tsdbFSetInit(SDeleteH *pdh, SDFileSet *pSet) { return 0; } -static void tsdbDeleteFSetEnd(SDeleteH *pdh) { tsdbCloseAndUnsetFSet(&(pdh->readh)); } - +static void tsdbDeleteFSetEnd(SDeleteH *pdh) { + tsdbCloseAndUnsetFSet(&(pdh->readh)); +} static int32_t tsdbFilterDataCols(SDeleteH *pdh, SDataCols *pSrcDCols) { SDataCols * pDstDCols = pdh->pDCols; - SControlData* pCtlData = &pdh->pCtlInfo->ctlData; int32_t delRows = 0; tdResetDataCols(pDstDCols); @@ -464,7 +463,7 @@ static int32_t tsdbFilterDataCols(SDeleteH *pdh, SDataCols *pSrcDCols) { for (int i = 0; i < pSrcDCols->numOfRows; ++i) { int64_t tsKey = *(int64_t *)tdGetColDataOfRow(pSrcDCols->cols, i); - if ((tsKey >= pCtlData->win.skey) && (tsKey <= pCtlData->win.ekey)) { + if ((tsKey >= pdh->pCtlInfo->win.skey) && (tsKey <= pdh->pCtlInfo->win.ekey)) { // delete row delRows ++; continue; @@ -488,8 +487,8 @@ static int32_t tsdbFilterDataCols(SDeleteH *pdh, SDataCols *pSrcDCols) { // table in delete list bool tableInDel(SDeleteH* pdh, int32_t tid) { - for (int32_t i = 0; i < pdh->pCtlInfo->ctlData.tnum; i++) { - if (tid == pdh->pCtlInfo->ctlData.tids[i]) + for (int32_t i = 0; i < pdh->pCtlInfo->tnum; i++) { + if (tid == pdh->pCtlInfo->tids[i]) return true; } @@ -499,7 +498,7 @@ bool tableInDel(SDeleteH* pdh, int32_t tid) { // if pBlock is border block return true else return false static int tsdbBlockSolve(SDeleteH *pdh, SBlock *pBlock) { // delete window - STimeWindow* pdel = &pdh->pCtlInfo->ctlData.win; + STimeWindow* pdel = &pdh->pCtlInfo->win; // do nothing for no delete if(pBlock->keyFirst > pdel->ekey || pBlock->keyLast < pdel->skey) diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index b159867ddde41b4e4422c7ab1d63ac5215c293f5..13db0258cb6ad59e3c280b28d1be022c3cfa7e2f 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -15,9 +15,9 @@ #include "tdataformat.h" #include "tfunctional.h" -#include "tsdbint.h" #include "tskiplist.h" #include "tsdbRowMergeBuf.h" +#include "tsdbint.h" #define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_MAX_INSERT_BATCH 512 @@ -706,6 +706,10 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { pBlock->numOfRows = htons(pBlock->numOfRows); if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { + if (IS_CONTROL_BLOCK(pBlock)) { + // super talbe control block tid == 0 + continue; + } tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, pBlock->tid); terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; @@ -1119,6 +1123,25 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r return 0; } +// set tid to ptids and return all tables num +int32_t tsdbTableGroupInfo(STableGroupInfo* pTableGroup, int32_t * ptids) { + int32_t pos = 0; + size_t numOfGroup = taosArrayGetSize(pTableGroup->pGroupList); + for (int32_t i = 0; i < numOfGroup; ++i) { + SArray* group = taosArrayGetP(pTableGroup->pGroupList, i); + size_t cnt = taosArrayGetSize(group); + for(int32_t j = 0; j < cnt; ++j) { + STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j); + if (pKeyInfo->pTable != NULL) { + if(ptids) + ptids[pos] = tsdbTableTid(pKeyInfo->pTable); + pos ++; + } + } + } + return pos; +} + // Control Data int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmitRspMsg *pRsp, tsem_t** ppSem) { int32_t ret = TSDB_CODE_SUCCESS; @@ -1135,23 +1158,55 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit // anti-serialize pCtlData->command = htonl(pCtlData->command); - pCtlData->tnum = htonl(pCtlData->tnum); pCtlData->win.skey = htobe64(pCtlData->win.skey); pCtlData->win.ekey = htobe64(pCtlData->win.ekey); - for (int32_t i=0; i < pCtlData->tnum; i++) { - pCtlData->tids[i] = htonl(pCtlData->tids[i]); + pCtlData->tagCondLen = htonl(pCtlData->tagCondLen); + + // get tables after filter tag condition + STableGroupInfo tableGroupInfo = {0}; + tableGroupInfo.sVersion = -1; + tableGroupInfo.tVersion = -1; + + // get del tables tid + int32_t tnum; + if (pCtlData->command & FLAG_SUPER_TABLE) { + // super table + ret = tsdbQuerySTableByTagCond(pRepo, pBlock->uid, pCtlData->win.skey, pCtlData->tagCond, pCtlData->tagCondLen, &tableGroupInfo, NULL, 0); + if (ret != TSDB_CODE_SUCCESS) { + tsdbError(":SDEL vgId:%d failed to get child tables id from stable with tag condition. uid=%ld", REPO_ID(pRepo), pBlock->uid); + return ret; + } + + tnum = tsdbTableGroupInfo(&tableGroupInfo, NULL); + if (tnum == 0) { + tsdbWarn(":SDEL vgId:%d super table no child tables after filter by tag. uid=%ld", REPO_ID(pRepo), pBlock->uid); + return TSDB_CODE_SUCCESS; + } + } else { + // single table + tnum = 1; } // server data set - size_t nsize = sizeof(SControlDataInfo) + pCtlData->tnum * sizeof(int32_t); + size_t nsize = sizeof(SControlDataInfo) + tnum * sizeof(int32_t); SControlDataInfo* pNew = (SControlDataInfo* )tmalloc(nsize); memset(pNew, 0, nsize); - memcpy(&pNew->ctlData, pCtlData, GET_CTLDATA_SIZE(pCtlData)); + pNew->win = pCtlData->win; + pNew->command = pCtlData->command; pNew->pRsp = pRsp; if (ppSem) pNew->pSem = *ppSem; + + // tids + pNew->tnum = tnum; + // copy tid + if (pCtlData->command & FLAG_SUPER_TABLE) { + tsdbTableGroupInfo(&tableGroupInfo, pNew->tids); + } else { + pNew->tids[0] = pBlock->tid; + } - if(pCtlData->command == CMD_DELETE_DATA) { + if(pCtlData->command & CMD_DELETE_DATA) { // malloc new to pass commit thread ret = tsdbAsyncCommit(pRepo, pNew); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 60c7311d4c0f3d784231fceb8a7e2628a5bd4eda..b038a0e9b391c71ff4965a3b74648d1aa68fa96d 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -4662,3 +4662,9 @@ void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callba pQueryHandle->param = param; return ; } + +// get table tid +int32_t tsdbTableTid(void* pTable) { + STable *p = (STable *)pTable; + return p->tableId.tid; +} \ No newline at end of file