提交 3a56d8a8 编写于 作者: A Alex Duan

[TS-238]<feature>(tsdb): super table delete main flow passed

上级 25540e01
......@@ -55,6 +55,7 @@ void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs);
void tscFreeRetrieveSup(void **param);
SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj, __async_cb_func_t fp, int32_t cmd);
void doConcurrentlySendSubQueries(SSqlObj* pSql);
#ifdef __cplusplus
}
......
......@@ -14,12 +14,169 @@
*/
#include "os.h"
#include "taosmsg.h"
#include "tcmdtype.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscDelete.h"
#include "tscSubquery.h"
void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo);
//
// handle error
//
void tscHandleSubDeleteError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
}
//
// sub delete sql callback
//
void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) {
// the param may be null, since it may be done by other query threads. and the asyncOnError may enter in this
// function while kill query by a user.
if (param == NULL) {
assert(code != TSDB_CODE_SUCCESS);
return;
}
SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
SSqlObj* pParentSql = trsupport->pParentSql;
SSqlObj* pSql = (SSqlObj *) tres;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentSql->cmd, 0);
SVgroupMsg* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
//
// stable query killed or other subquery failed, all query stopped
//
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tscError("0x%"PRIx64" query cancelled or failed, sub:0x%"PRIx64", vgId:%d, orderOfSub:%d, code:%s, global code:%s",
pParentSql->self, pSql->self, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));
tscHandleSubDeleteError(param, tres, code);
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
}
return;
}
/*
* if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
* than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
* function to abort current and remain retrieve process.
*
* NOTE: thread safe is required.
*/
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(code == taos_errno(pSql));
int32_t sent = 0;
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID && code != TSDB_CODE_VND_INVALID_VGROUP_ID)) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry);
//tscReissueSubquery(trsupport, pSql, code, &sent);
if (sent) {
return;
}
} else {
tscError("0x%"PRIx64" sub:0x%"PRIx64" reach the max retry times or no need to retry, set global code:%s", pParentSql->self, pSql->self, tstrerror(code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort
}
tscHandleSubDeleteError(param, tres, pParentSql->res.code);
if(!sent) {
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
}
}
return;
}
tscDebug("0x%"PRIx64":CDEL sub:0x%"PRIx64" query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql->self,
pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
// do merge
pParentSql->res.numOfRows += pSql->res.numOfRows;
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
}
return ;
}
void writeMsgVgId(char * payload, int32_t vgId) {
SSubmitMsg* pSubmitMsg = (SSubmitMsg *)(payload + sizeof(SMsgDesc));
// SSubmitMsg
pSubmitMsg->header.vgId = htonl(vgId);
}
//
// STable malloc sub delete
//
SSqlObj *tscCreateSTableSubDelete(SSqlObj *pSql, SVgroupMsg* pVgroupMsg, SRetrieveSupport *trsupport) {
// Init
SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
tscError("0x%"PRIx64":CDEL new subdelete failed.", pSql->self);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->rootObj = pSql->rootObj;
pNew->fp = tscSubDeleteCallback;
pNew->fetchFp = tscSubDeleteCallback;
pNew->param = trsupport;
pNew->maxRetry = TSDB_MAX_REPLICA;
SSqlCmd* pNewCmd = &pNew->cmd;
memcpy(pNewCmd, pCmd, sizeof(SSqlCmd));
// set zero
pNewCmd->pQueryInfo = NULL;
pNewCmd->active = NULL;
pNewCmd->payload = NULL;
pNewCmd->allocSize = 0;
// payload copy
int32_t ret = tscAllocPayload(pNewCmd, pCmd->payloadLen);
if (ret != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64":CDEL , sub delete alloc payload failed. errcode=%d", pSql->self, ret);
free(pNew);
return NULL;
}
memcpy(pNewCmd->payload, pCmd->payload, pCmd->payloadLen);
// update vgroup id
writeMsgVgId(pNewCmd->payload ,pVgroupMsg->vgId);
tsem_init(&pNew->rspSem, 0, 0);
registerSqlObj(pNew);
tscDebug("0x%"PRIx64":CDEL new sub insertion: %p", pSql->self, pNew);
// set vnode epset
tscSetDnodeEpSet(&pNew->epSet, pVgroupMsg);
return pNew;
}
//
// execute delete sql
//
int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
int32_t ret = TSDB_CODE_SUCCESS;
......@@ -30,8 +187,6 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pSql->cmd.active = pQueryInfo;
return tscBuildAndSendRequest(pSql, pQueryInfo);
}
return ret;
/*
//
// super table
......@@ -40,100 +195,71 @@ int32_t executeDelete(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
// pRes->code check only serves in launching super table sub-queries
// check cancel
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function.
return pRes->code;
}
tExtMemBuffer **pMemoryBuf = NULL;
tOrderDescriptor *pDesc = NULL;
if(pTableMetaInfo->vgroupList == NULL) {
tscError(":CDEL SQL:%p tablename=%s vgroupList is NULL.", pSql, pTableMetaInfo->name.tname);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
pRes->qId = 0x1; // hack the qhandle check
uint32_t nBufferSize = (1u << 18u); // 256KB, default buffer size
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSubqueryState *pState = &pSql->subState;
int32_t numOfSub = (pTableMetaInfo->pVgroupTables == NULL) ? pTableMetaInfo->vgroupList->numOfVgroups
: (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
int32_t numOfSub = pTableMetaInfo->vgroupList->numOfVgroups;
int32_t ret = doInitSubState(pSql, numOfSub);
ret = doInitSubState(pSql, numOfSub);
if (ret != 0) {
tscAsyncResultOnError(pSql);
return ret;
}
ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, &nBufferSize, pSql->self);
if (ret != 0) {
pRes->code = ret;
tscAsyncResultOnError(pSql);
tfree(pDesc);
tfree(pMemoryBuf);
return ret;
}
tscDebug("0x%"PRIx64" retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
tscDebug("0x%"PRIx64":CDEL retrieved query data from %d vnode(s)", pSql->self, pState->numOfSub);
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < pState->numOfSub; ++i) {
int32_t i;
for (i = 0; i < pState->numOfSub; ++i) {
// vgroup
SVgroupMsg* pVgroupMsg = &pTableMetaInfo->vgroupList->vgroups[i];
// malloc each support
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
break;
}
trs->pExtMemBuffer = pMemoryBuf;
trs->pOrderDescriptor = pDesc;
trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
trs->localBufferSize = nBufferSize + sizeof(tFilePage);
if (trs->localBuffer == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tfree(trs);
break;
}
trs->localBuffer->num = 0;
trs->subqueryIndex = i;
trs->pParentSql = pSql;
SSqlObj *pNew = tscCreateSTableSubquery(pSql, trs, NULL);
// malloc sub SSqlObj
SSqlObj *pNew = tscCreateSTableSubDelete(pSql, pVgroupMsg, trs);
if (pNew == NULL) {
tscError("0x%"PRIx64" failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tscError("0x%"PRIx64"CDEL failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql->self, i, strerror(errno));
tfree(trs->localBuffer);
tfree(trs);
break;
}
// todo handle multi-vnode situation
if (pQueryInfo->tsBuf) {
SQueryInfo *pNewQueryInfo = tscGetQueryInfo(&pNew->cmd);
pNewQueryInfo->tsBuf = tsBufClone(pQueryInfo->tsBuf);
assert(pNewQueryInfo->tsBuf != NULL);
}
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" create subquery success. orderOfSub:%d", pSql->self, pNew->self,
trs->subqueryIndex);
pSql->pSubs[i] = pNew;
}
if (i < pState->numOfSub) {
tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self);
tscError("0x%"PRIx64":CDEL failed to prepare subdelete structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code;
}
// send sub sql
doConcurrentlySendSubQueries(pSql);
//return TSDB_CODE_TSC_QUERY_CANCELLED;
return TSDB_CODE_SUCCESS;
*/
}
......@@ -1068,6 +1068,14 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (code != TSDB_CODE_SUCCESS) {
return code ; // async load table meta
}
// vgroupInfo if super
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pQueryInfo);
}
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return code;
}
// CHECK AND SET WHERE
if (pInfo->pDelData->pWhere) {
......@@ -6460,6 +6468,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
const char* msg1 = "invalid expression";
const char* msg2 = "the timestamp column condition must be in an interval";
int32_t ret = TSDB_CODE_SUCCESS;
// tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space
......@@ -6509,7 +6518,7 @@ int32_t validateWhereNode(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSq
goto PARSE_WHERE_EXIT;
}
// check timestamp range
// check timestamp range
if (pQueryInfo->window.skey > pQueryInfo->window.ekey) {
return invalidOperationMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
}
......
......@@ -73,7 +73,7 @@ static int32_t removeDupVgid(int32_t *src, int32_t sz) {
return ret;
}
static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) {
void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupMsg* pVgroupInfo) {
assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
// Issue the query to one of the vnode among a vgroup randomly.
......@@ -619,7 +619,7 @@ int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
}
tscDebug("0x%"PRIx64" SQL cmd:%s will be processed, name:%s, type:%d", pSql->self, sqlCmd[pCmd->command], name, type);
if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL
if (pCmd->command < TSDB_SQL_MGMT && pCmd->command != TSDB_SQL_DELETE_DATA) { // the pTableMetaInfo cannot be NULL
if (pTableMetaInfo == NULL) {
pSql->res.code = TSDB_CODE_TSC_APP_ERROR;
return pSql->res.code;
......@@ -3318,20 +3318,41 @@ int buildSTableDelDataMsg(SSqlObj *pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
return 0;
}
// Normal Child Table
int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SSqlInfo *pInfo) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int tscBuildDelDataMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
uint32_t command = CMD_DELETE_DATA;
// pSql->cmd.payloadLen is set during copying data into payload
pCmd->msgType = TSDB_MSG_TYPE_SUBMIT;
if (pTableMeta->tableType == TSDB_SUPER_TABLE) {
// super table to do
command |= FLAG_SUPER_TABLE;
} else {
// no super table to do copy epSet
SNewVgroupInfo vgroupInfo = {0};
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
tscDebug("0x%"PRIx64" table deldata submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps);
}
SNewVgroupInfo vgroupInfo = {0};
taosHashGetClone(UTIL_GET_VGROUPMAP(pSql), &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo);
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
SCond *pCond = NULL;
// serialize tag column query condition
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond;
pCond = tsGetSTableQueryCond(pTagCond, pTableMeta->id.uid);
}
tscDebug("0x%"PRIx64" table deldata submit msg built, numberOfEP:%d", pSql->self, pSql->epSet.numOfEps);
// set payload
size_t payloadLen = sizeof(SMsgDesc) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + sizeof(SControlData) + sizeof(int32_t);
int32_t tagCondLen = 0;
if (pCond) {
tagCondLen = pCond->len;
}
size_t payloadLen = sizeof(SMsgDesc) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + sizeof(SControlData) + tagCondLen;
int32_t ret = tscAllocPayload(pCmd, payloadLen);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
......@@ -3362,28 +3383,19 @@ int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, S
pSubmitBlk->numOfRows = htons(1);
pSubmitBlk->schemaLen = 0; // only server return TSDB_CODE_TDB_TABLE_RECONFIGURE need schema attached
pSubmitBlk->sversion = htonl(pTableMeta->sversion);
pSubmitBlk->dataLen = htonl(sizeof(SControlData) + sizeof(int32_t));
pSubmitBlk->dataLen = htonl(sizeof(SControlData) + tagCondLen);
// SControlData
pControlData->command = htonl(CMD_DELETE_DATA);
pControlData->command = htonl(command);
pControlData->win.skey = htobe64(pQueryInfo->window.skey);
pControlData->win.ekey = htobe64(pQueryInfo->window.ekey);
pControlData->tnum = htonl(1);
pControlData->tids[0] = htonl(pTableMeta->id.tid);
return TSDB_CODE_SUCCESS;
}
int tscBuildDelDataMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if(UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return buildTableDelDataMsg(pSql, pCmd, pQueryInfo, pTableMetaInfo, pInfo);
} else {
return buildTableDelDataMsg(pSql, pCmd, pQueryInfo, pTableMetaInfo, pInfo);
// set tagCond
if (pCond) {
pControlData->tagCondLen = htonl(pCond->len);
memcpy(pControlData->tagCond, pCond->cond, pCond->len);
}
return TSDB_CODE_SUCCESS;
}
void tscInitMsgsFp() {
......@@ -3466,5 +3478,4 @@ void tscInitMsgsFp() {
tscKeepConn[TSDB_SQL_SELECT] = 1;
tscKeepConn[TSDB_SQL_FETCH] = 1;
tscKeepConn[TSDB_SQL_HB] = 1;
}
}
\ No newline at end of file
......@@ -2611,7 +2611,7 @@ static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
tfree(p);
}
static void doConcurrentlySendSubQueries(SSqlObj* pSql) {
void doConcurrentlySendSubQueries(SSqlObj* pSql) {
SSubqueryState *pState = &pSql->subState;
// concurrently sent the query requests.
......
......@@ -1003,15 +1003,19 @@ typedef struct {
char value[];
} STLV;
// Ox00000001 ~ 0x00010000 command id 16 items
#define CMD_DELETE_DATA 0x00000001
#define CMD_TRUNCATE 0x00000002
// 0x00010000 ~ 0x10000000 command flag 16 items
#define FLAG_SUPER_TABLE 0x00010000
#define GET_CTLDATA_SIZE(p) (sizeof(SControlData) + p->tnum * sizeof(int32_t))
typedef struct SControlData{
typedef struct SControlData {
uint32_t command; // see define CMD_???
STimeWindow win;
int32_t tnum; // tids nums
int32_t tids[]; // delete table tid
// tag cond
int32_t tagCondLen;
char tagCond[];
} SControlData;
enum {
......
......@@ -449,6 +449,8 @@ char* parseTagDatatoJson(void *p);
typedef bool (*readover_callback)(void* param, int8_t type, int32_t tid);
void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callback, void* param);
int32_t tsdbTableTid(void* pTable);
#ifdef __cplusplus
}
#endif
......
......@@ -20,13 +20,18 @@ extern "C" {
#endif
// SControlData addition information
#define GET_CTLINFO_SIZE(p) (sizeof(SControlDataInfo) + p.ctlData.tnum * sizeof(int32_t))
#define GET_CTLINFO_SIZE(p) (sizeof(SControlDataInfo) + p->tnum * sizeof(int32_t))
typedef struct {
// addition info
tsem_t* pSem;
bool memNull; // pRepo->mem is NULL, this is true
SShellSubmitRspMsg *pRsp;
SControlData ctlData;
// base ControlData
STimeWindow win; // come from SControlData.win
uint32_t command; // come from SControlData.command
int32_t tnum; // num of tids array
int32_t tids[];
} SControlDataInfo;
// -------- interface ---------
......
......@@ -1787,7 +1787,7 @@ int tsdbCommitControl(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo) {
int ret = TSDB_CODE_SUCCESS;
// do command
if(pCtlDataInfo->ctlData.command == CMD_DELETE_DATA) {
if(pCtlDataInfo->command & CMD_DELETE_DATA) {
// delete data
ret = tsdbControlDelete(pRepo, pCtlDataInfo);
}
......
......@@ -179,14 +179,14 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
SDeleteH truncateH = {0};
SDFileSet * pSet = NULL;
tsdbDebug("vgId:%d start to truncate TS data for %d", REPO_ID(pRepo), pCtlInfo->ctlData.tids[0]);
tsdbDebug("vgId:%d start to truncate TS data for %d", REPO_ID(pRepo), pCtlInfo->tids[0]);
if (tsdbInitDeleteH(&truncateH, pRepo) < 0) {
return -1;
}
truncateH.pCtlInfo = pCtlInfo;
STimeWindow win = pCtlInfo->ctlData.win;
STimeWindow win = pCtlInfo->win;
int sFid = TSDB_KEY_FID(win.skey, pCfg->daysPerFile, pCfg->precision);
int eFid = TSDB_KEY_FID(win.ekey, pCfg->daysPerFile, pCfg->precision);
......@@ -216,8 +216,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
}
#endif
if (pCtlInfo->ctlData.command == CMD_DELETE_DATA) {
if (pCtlInfo->command & CMD_DELETE_DATA) {
if (tsdbFSetDelete(&truncateH, pSet) < 0) {
tsdbDestroyDeleteH(&truncateH);
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
......@@ -448,12 +447,12 @@ static int tsdbFSetInit(SDeleteH *pdh, SDFileSet *pSet) {
return 0;
}
static void tsdbDeleteFSetEnd(SDeleteH *pdh) { tsdbCloseAndUnsetFSet(&(pdh->readh)); }
static void tsdbDeleteFSetEnd(SDeleteH *pdh) {
tsdbCloseAndUnsetFSet(&(pdh->readh));
}
static int32_t tsdbFilterDataCols(SDeleteH *pdh, SDataCols *pSrcDCols) {
SDataCols * pDstDCols = pdh->pDCols;
SControlData* pCtlData = &pdh->pCtlInfo->ctlData;
int32_t delRows = 0;
tdResetDataCols(pDstDCols);
......@@ -464,7 +463,7 @@ static int32_t tsdbFilterDataCols(SDeleteH *pdh, SDataCols *pSrcDCols) {
for (int i = 0; i < pSrcDCols->numOfRows; ++i) {
int64_t tsKey = *(int64_t *)tdGetColDataOfRow(pSrcDCols->cols, i);
if ((tsKey >= pCtlData->win.skey) && (tsKey <= pCtlData->win.ekey)) {
if ((tsKey >= pdh->pCtlInfo->win.skey) && (tsKey <= pdh->pCtlInfo->win.ekey)) {
// delete row
delRows ++;
continue;
......@@ -488,8 +487,8 @@ static int32_t tsdbFilterDataCols(SDeleteH *pdh, SDataCols *pSrcDCols) {
// table in delete list
bool tableInDel(SDeleteH* pdh, int32_t tid) {
for (int32_t i = 0; i < pdh->pCtlInfo->ctlData.tnum; i++) {
if (tid == pdh->pCtlInfo->ctlData.tids[i])
for (int32_t i = 0; i < pdh->pCtlInfo->tnum; i++) {
if (tid == pdh->pCtlInfo->tids[i])
return true;
}
......@@ -499,7 +498,7 @@ bool tableInDel(SDeleteH* pdh, int32_t tid) {
// if pBlock is border block return true else return false
static int tsdbBlockSolve(SDeleteH *pdh, SBlock *pBlock) {
// delete window
STimeWindow* pdel = &pdh->pCtlInfo->ctlData.win;
STimeWindow* pdel = &pdh->pCtlInfo->win;
// do nothing for no delete
if(pBlock->keyFirst > pdel->ekey || pBlock->keyLast < pdel->skey)
......
......@@ -15,9 +15,9 @@
#include "tdataformat.h"
#include "tfunctional.h"
#include "tsdbint.h"
#include "tskiplist.h"
#include "tsdbRowMergeBuf.h"
#include "tsdbint.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512
......@@ -706,6 +706,10 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
pBlock->numOfRows = htons(pBlock->numOfRows);
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
if (IS_CONTROL_BLOCK(pBlock)) {
// super talbe control block tid == 0
continue;
}
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
pBlock->tid);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
......@@ -1119,6 +1123,25 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
return 0;
}
// set tid to ptids and return all tables num
int32_t tsdbTableGroupInfo(STableGroupInfo* pTableGroup, int32_t * ptids) {
int32_t pos = 0;
size_t numOfGroup = taosArrayGetSize(pTableGroup->pGroupList);
for (int32_t i = 0; i < numOfGroup; ++i) {
SArray* group = taosArrayGetP(pTableGroup->pGroupList, i);
size_t cnt = taosArrayGetSize(group);
for(int32_t j = 0; j < cnt; ++j) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
if (pKeyInfo->pTable != NULL) {
if(ptids)
ptids[pos] = tsdbTableTid(pKeyInfo->pTable);
pos ++;
}
}
}
return pos;
}
// Control Data
int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmitRspMsg *pRsp, tsem_t** ppSem) {
int32_t ret = TSDB_CODE_SUCCESS;
......@@ -1135,23 +1158,55 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit
// anti-serialize
pCtlData->command = htonl(pCtlData->command);
pCtlData->tnum = htonl(pCtlData->tnum);
pCtlData->win.skey = htobe64(pCtlData->win.skey);
pCtlData->win.ekey = htobe64(pCtlData->win.ekey);
for (int32_t i=0; i < pCtlData->tnum; i++) {
pCtlData->tids[i] = htonl(pCtlData->tids[i]);
pCtlData->tagCondLen = htonl(pCtlData->tagCondLen);
// get tables after filter tag condition
STableGroupInfo tableGroupInfo = {0};
tableGroupInfo.sVersion = -1;
tableGroupInfo.tVersion = -1;
// get del tables tid
int32_t tnum;
if (pCtlData->command & FLAG_SUPER_TABLE) {
// super table
ret = tsdbQuerySTableByTagCond(pRepo, pBlock->uid, pCtlData->win.skey, pCtlData->tagCond, pCtlData->tagCondLen, &tableGroupInfo, NULL, 0);
if (ret != TSDB_CODE_SUCCESS) {
tsdbError(":SDEL vgId:%d failed to get child tables id from stable with tag condition. uid=%ld", REPO_ID(pRepo), pBlock->uid);
return ret;
}
tnum = tsdbTableGroupInfo(&tableGroupInfo, NULL);
if (tnum == 0) {
tsdbWarn(":SDEL vgId:%d super table no child tables after filter by tag. uid=%ld", REPO_ID(pRepo), pBlock->uid);
return TSDB_CODE_SUCCESS;
}
} else {
// single table
tnum = 1;
}
// server data set
size_t nsize = sizeof(SControlDataInfo) + pCtlData->tnum * sizeof(int32_t);
size_t nsize = sizeof(SControlDataInfo) + tnum * sizeof(int32_t);
SControlDataInfo* pNew = (SControlDataInfo* )tmalloc(nsize);
memset(pNew, 0, nsize);
memcpy(&pNew->ctlData, pCtlData, GET_CTLDATA_SIZE(pCtlData));
pNew->win = pCtlData->win;
pNew->command = pCtlData->command;
pNew->pRsp = pRsp;
if (ppSem)
pNew->pSem = *ppSem;
// tids
pNew->tnum = tnum;
// copy tid
if (pCtlData->command & FLAG_SUPER_TABLE) {
tsdbTableGroupInfo(&tableGroupInfo, pNew->tids);
} else {
pNew->tids[0] = pBlock->tid;
}
if(pCtlData->command == CMD_DELETE_DATA) {
if(pCtlData->command & CMD_DELETE_DATA) {
// malloc new to pass commit thread
ret = tsdbAsyncCommit(pRepo, pNew);
}
......
......@@ -4662,3 +4662,9 @@ void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callba
pQueryHandle->param = param;
return ;
}
// get table tid
int32_t tsdbTableTid(void* pTable) {
STable *p = (STable *)pTable;
return p->tableId.tid;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册