提交 fd924682 编写于 作者: A Alex Duan

[TS-238]<feature>(tsdb): support client show tables count

上级 876c0068
...@@ -319,6 +319,7 @@ typedef struct { ...@@ -319,6 +319,7 @@ typedef struct {
TAOS_FIELD* final; TAOS_FIELD* final;
struct SGlobalMerger *pMerger; struct SGlobalMerger *pMerger;
int32_t numOfTables;
} SSqlRes; } SSqlRes;
typedef struct { typedef struct {
...@@ -498,6 +499,7 @@ void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *para ...@@ -498,6 +499,7 @@ void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *para
void tscImportDataFromFile(SSqlObj *pSql); void tscImportDataFromFile(SSqlObj *pSql);
struct SGlobalMerger* tscInitResObjForLocalQuery(int32_t numOfRes, int32_t rowLen, uint64_t id); struct SGlobalMerger* tscInitResObjForLocalQuery(int32_t numOfRes, int32_t rowLen, uint64_t id);
bool tscIsUpdateQuery(SSqlObj* pSql); bool tscIsUpdateQuery(SSqlObj* pSql);
bool tscIsDeleteQuery(SSqlObj* pSql);
char* tscGetSqlStr(SSqlObj* pSql); char* tscGetSqlStr(SSqlObj* pSql);
bool tscIsQueryWithLimit(SSqlObj* pSql); bool tscIsQueryWithLimit(SSqlObj* pSql);
......
...@@ -89,7 +89,8 @@ void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) { ...@@ -89,7 +89,8 @@ void tscSubDeleteCallback(void *param, TAOS_RES *tres, int code) {
pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex); pSql->self, pVgroup->epAddr[pSql->epSet.inUse].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
// success do total count // success do total count
pParentSql->res.numOfRows += pSql->res.numOfRows; pParentSql->res.numOfRows += pSql->res.numOfRows;
pParentSql->res.numOfTables += pSql->res.numOfTables;
if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) { if (subAndCheckDone(pSql, pParentSql, trsupport->subqueryIndex)) {
// all sub done, call parentSQL callback to finish // all sub done, call parentSQL callback to finish
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows); (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
......
...@@ -523,9 +523,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -523,9 +523,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
pMsg->numOfRows = htonl(pMsg->numOfRows); pMsg->numOfRows = htonl(pMsg->numOfRows);
pMsg->affectedRows = htonl(pMsg->affectedRows); pMsg->affectedRows = htonl(pMsg->affectedRows);
pMsg->failedRows = htonl(pMsg->failedRows); pMsg->failedRows = htonl(pMsg->failedRows);
pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks); pMsg->numOfTables = htonl(pMsg->numOfTables);
pRes->numOfRows += pMsg->affectedRows; pRes->numOfRows += pMsg->affectedRows;
if(pMsg->numOfTables > 0) {
pRes->numOfTables = pMsg->numOfTables;
}
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command], tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command],
tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
} else { } else {
......
...@@ -433,6 +433,14 @@ int taos_affected_rows(TAOS_RES *tres) { ...@@ -433,6 +433,14 @@ int taos_affected_rows(TAOS_RES *tres) {
return pSql->res.numOfRows; return pSql->res.numOfRows;
} }
int taos_affected_tables(TAOS_RES *tres) {
SSqlObj* pSql = (SSqlObj*) tres;
if (pSql == NULL || pSql->signature != pSql) return 0;
return pSql->res.numOfTables;
}
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res; SSqlObj *pSql = (SSqlObj *)res;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
......
...@@ -4311,6 +4311,15 @@ bool tscIsUpdateQuery(SSqlObj* pSql) { ...@@ -4311,6 +4311,15 @@ bool tscIsUpdateQuery(SSqlObj* pSql) {
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_RESET_CACHE == pCmd->command || TSDB_SQL_USE_DB == pCmd->command); return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_RESET_CACHE == pCmd->command || TSDB_SQL_USE_DB == pCmd->command);
} }
bool tscIsDeleteQuery(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return false;
}
SSqlCmd* pCmd = &pSql->cmd;
return pCmd->command == TSDB_SQL_DELETE_DATA;
}
char* tscGetSqlStr(SSqlObj* pSql) { char* tscGetSqlStr(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
return NULL; return NULL;
......
...@@ -216,6 +216,8 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS* taos, char* lines[], int numLi ...@@ -216,6 +216,8 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS* taos, char* lines[], int numLi
DLL_EXPORT int32_t taos_parse_time(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); DLL_EXPORT int32_t taos_parse_time(char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
DLL_EXPORT int taos_affected_tables(TAOS_RES *res);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -256,7 +256,7 @@ typedef struct { ...@@ -256,7 +256,7 @@ typedef struct {
int32_t numOfRows; // number of records the client is trying to write int32_t numOfRows; // number of records the client is trying to write
int32_t affectedRows; // number of records actually written int32_t affectedRows; // number of records actually written
int32_t failedRows; // number of failed records (exclude duplicate records) int32_t failedRows; // number of failed records (exclude duplicate records)
int32_t numOfFailedBlocks; int32_t numOfTables; // affected tables
SShellSubmitRspBlock failedBlocks[]; SShellSubmitRspBlock failedBlocks[];
} SShellSubmitRspMsg; } SShellSubmitRspMsg;
......
...@@ -291,7 +291,20 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { ...@@ -291,7 +291,20 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
return; return;
} }
if (!tscIsUpdateQuery(pSql)) { // select and show kinds of commands if (tscIsDeleteQuery(pSql)) {
// delete
int numOfRows = taos_affected_rows(pSql);
int numOfTables = taos_affected_tables(pSql);
int error_no = taos_errno(pSql);
et = taosGetTimestampUs();
if (error_no == TSDB_CODE_SUCCESS) {
printf("Deleted %d row(s) from %d table(s) (%.6fs)\n", numOfRows, numOfTables, (et - st) / 1E6);
} else {
printf("Deleted interrupted (%s), %d row(s) from %d tables (%.6fs)\n", taos_errstr(pSql), numOfRows, numOfTables, (et - st) / 1E6);
}
}
else if (!tscIsUpdateQuery(pSql)) { // select and show kinds of commands
int error_no = 0; int error_no = 0;
int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode); int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
......
...@@ -99,7 +99,7 @@ struct STsdbRepo { ...@@ -99,7 +99,7 @@ struct STsdbRepo {
SMergeBuf mergeBuf; //used when update=2 SMergeBuf mergeBuf; //used when update=2
int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? int8_t compactState; // compact state: inCompact/noCompact/waitingCompact?
int8_t truncateState; // truncate state: inTruncate/noTruncate/waitingTruncate int8_t deleteState; // truncate state: inTruncate/noTruncate/waitingTruncate
pthread_t* pthread; pthread_t* pthread;
}; };
......
...@@ -63,9 +63,8 @@ typedef struct { ...@@ -63,9 +63,8 @@ typedef struct {
#define TSDB_DELETE_EXBUF(pdh) TSDB_READ_EXBUF(&((pdh)->readh)) #define TSDB_DELETE_EXBUF(pdh) TSDB_READ_EXBUF(&((pdh)->readh))
static void tsdbStartDelete(STsdbRepo *pRepo); static void tsdbStartDeleteTrans(STsdbRepo *pRepo);
static void tsdbEndDelete(STsdbRepo *pRepo, int eno); static void tsdbEndDeleteTrans(STsdbRepo *pRepo, int eno);
static int tsdbDeleteMeta(STsdbRepo *pRepo);
static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray); static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray);
static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet); static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet);
static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo); static int tsdbInitDeleteH(SDeleteH *pdh, STsdbRepo *pRepo);
...@@ -73,7 +72,6 @@ static void tsdbDestroyDeleteH(SDeleteH *pdh); ...@@ -73,7 +72,6 @@ static void tsdbDestroyDeleteH(SDeleteH *pdh);
static int tsdbInitDeleteTblArray(SDeleteH *pdh); static int tsdbInitDeleteTblArray(SDeleteH *pdh);
static void tsdbDestroyDeleteTblArray(SDeleteH *pdh); static void tsdbDestroyDeleteTblArray(SDeleteH *pdh);
static int tsdbCacheFSetIndex(SDeleteH *pdh); static int tsdbCacheFSetIndex(SDeleteH *pdh);
static int tsdbDeleteCache(STsdbRepo *pRepo, void *param);
static int tsdbFSetInit(SDeleteH *pdh, SDFileSet *pSet); static int tsdbFSetInit(SDeleteH *pdh, SDFileSet *pSet);
static void tsdbDeleteFSetEnd(SDeleteH *pdh); static void tsdbDeleteFSetEnd(SDeleteH *pdh);
static int tsdbFSetDeleteImpl(SDeleteH *pdh); static int tsdbFSetDeleteImpl(SDeleteH *pdh);
...@@ -88,6 +86,7 @@ int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlInfo) { ...@@ -88,6 +86,7 @@ int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlInfo) {
int32_t ret = tsdbDeleteImplCommon(pRepo, pCtlInfo); int32_t ret = tsdbDeleteImplCommon(pRepo, pCtlInfo);
if(pCtlInfo->pRsp) { if(pCtlInfo->pRsp) {
pCtlInfo->pRsp->affectedRows = htonl(pCtlInfo->pRsp->affectedRows); pCtlInfo->pRsp->affectedRows = htonl(pCtlInfo->pRsp->affectedRows);
pCtlInfo->pRsp->numOfTables = htonl(pCtlInfo->pRsp->numOfTables);
pCtlInfo->pRsp->code = ret; pCtlInfo->pRsp->code = ret;
} }
...@@ -112,26 +111,23 @@ static void tsdbClearUpdates(SArray * pArray) { ...@@ -112,26 +111,23 @@ static void tsdbClearUpdates(SArray * pArray) {
taosArrayDestroy(&pArray); taosArrayDestroy(&pArray);
} }
static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { static int tsdbDeleteMeta(STsdbRepo *pRepo) {
int32_t code = 0; STsdbFS *pfs = REPO_FS(pRepo);
// Step 1: check and clear cache tsdbUpdateMFile(pfs, pfs->cstatus->pmf);
if ((code = tsdbDeleteCache(pRepo, pCtlInfo)) != 0) { return TSDB_CODE_SUCCESS;
pRepo->code = terrno; }
tsem_post(&(pRepo->readyToCommit));
tsdbInfo("vgId:%d failed to truncate since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
// Step 2: truncate and rebuild DFileSets static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
// Check if there are files in TSDB FS to truncate // check valid
if ((REPO_FS(pRepo)->cstatus->pmf == NULL) || (taosArrayGetSize(REPO_FS(pRepo)->cstatus->df) <= 0)) { if ((REPO_FS(pRepo)->cstatus->pmf == NULL) || (taosArrayGetSize(REPO_FS(pRepo)->cstatus->df) <= 0)) {
pRepo->truncateState = TSDB_NO_DELETE; pRepo->deleteState = TSDB_NO_DELETE;
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
tsdbInfo("vgId:%d truncate over, no meta or data file", REPO_ID(pRepo)); tsdbInfo("vgId:%d delete over, no meta or data file", REPO_ID(pRepo));
return -1; return -1;
} }
tsdbStartDelete(pRepo); // start transaction
tsdbStartDeleteTrans(pRepo);
if (tsdbDeleteMeta(pRepo) < 0) { if (tsdbDeleteMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to truncate META data since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to truncate META data since %s", REPO_ID(pRepo), tstrerror(terrno));
...@@ -144,7 +140,13 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { ...@@ -144,7 +140,13 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
goto _err; goto _err;
} }
tsdbEndDelete(pRepo, TSDB_CODE_SUCCESS); // end transaction
tsdbEndDeleteTrans(pRepo, TSDB_CODE_SUCCESS);
// set affected tables number
if(pCtlInfo->pRsp) {
pCtlInfo->pRsp->numOfTables = pCtlInfo->tnum;
}
// update last row // update last row
tsdbUpdateLastRow(pRepo, aUpdates); tsdbUpdateLastRow(pRepo, aUpdates);
...@@ -153,54 +155,30 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { ...@@ -153,54 +155,30 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
_err: _err:
pRepo->code = terrno; pRepo->code = terrno;
tsdbEndDelete(pRepo, terrno); tsdbEndDeleteTrans(pRepo, terrno);
tsdbClearUpdates(aUpdates); tsdbClearUpdates(aUpdates);
return -1; return -1;
} }
static int tsdbDeleteCache(STsdbRepo *pRepo, void *param) { static void tsdbStartDeleteTrans(STsdbRepo *pRepo) {
// step 1: reset query cache(reset all or the specific cache) assert(pRepo->deleteState != TSDB_IN_DELETE);
// TODO ... check with Doctor Liao
// if(... <0){
// terrno = ...;
// return -1;
// }
// step 2: check and clear cache of last_row/last
// TODO: ... scan/check/clear stable/child table/common table
// if(... <0){
// terrno = ...;
// return -1;
// }
return 0;
}
static void tsdbStartDelete(STsdbRepo *pRepo) {
assert(pRepo->truncateState != TSDB_IN_DELETE);
tsdbInfo("vgId:%d start to truncate!", REPO_ID(pRepo)); tsdbInfo("vgId:%d start to truncate!", REPO_ID(pRepo));
tsdbStartFSTxn(pRepo, 0, 0); tsdbStartFSTxn(pRepo, 0, 0);
pRepo->code = TSDB_CODE_SUCCESS; pRepo->code = TSDB_CODE_SUCCESS;
pRepo->truncateState = TSDB_IN_DELETE; pRepo->deleteState = TSDB_IN_DELETE;
} }
static void tsdbEndDelete(STsdbRepo *pRepo, int eno) { static void tsdbEndDeleteTrans(STsdbRepo *pRepo, int eno) {
if (eno != TSDB_CODE_SUCCESS) { if (eno != TSDB_CODE_SUCCESS) {
tsdbEndFSTxnWithError(REPO_FS(pRepo)); tsdbEndFSTxnWithError(REPO_FS(pRepo));
} else { } else {
tsdbEndFSTxn(pRepo); tsdbEndFSTxn(pRepo);
} }
pRepo->truncateState = TSDB_NO_DELETE; pRepo->deleteState = TSDB_NO_DELETE;
tsdbInfo("vgId:%d truncate over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); tsdbInfo("vgId:%d truncate over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
} }
static int tsdbDeleteMeta(STsdbRepo *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
tsdbUpdateMFile(pfs, pfs->cstatus->pmf);
return 0;
}
static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray) { static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray* pArray) {
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg * pCfg = REPO_CFG(pRepo);
SDeleteH deleteH = {0}; SDeleteH deleteH = {0};
......
...@@ -214,7 +214,7 @@ int tsdbGetState(STsdbRepo *repo) { return repo->state; } ...@@ -214,7 +214,7 @@ int tsdbGetState(STsdbRepo *repo) { return repo->state; }
int8_t tsdbGetCompactState(STsdbRepo *repo) { return (int8_t)(repo->compactState); } int8_t tsdbGetCompactState(STsdbRepo *repo) { return (int8_t)(repo->compactState); }
int8_t tsdbGetTruncateState(STsdbRepo *repo) { return (int8_t)(repo->truncateState); } int8_t tsdbGetTruncateState(STsdbRepo *repo) { return (int8_t)(repo->deleteState); }
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) { void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL); ASSERT(repo != NULL);
...@@ -575,7 +575,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -575,7 +575,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo->state = TSDB_STATE_OK; pRepo->state = TSDB_STATE_OK;
pRepo->code = TSDB_CODE_SUCCESS; pRepo->code = TSDB_CODE_SUCCESS;
pRepo->compactState = 0; pRepo->compactState = 0;
pRepo->truncateState = 0; pRepo->deleteState = 0;
pRepo->config = *pCfg; pRepo->config = *pCfg;
if (pAppH) { if (pAppH) {
pRepo->appH = *pAppH; pRepo->appH = *pAppH;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册