提交 5a5b4bf5 编写于 作者: A Alex Duan

[TS-238]<feature>(tsdb): the flow of delete was passed

上级 eeba8d02
...@@ -164,12 +164,14 @@ void dnodeFreeVWriteQueue(void *pWqueue) { ...@@ -164,12 +164,14 @@ void dnodeFreeVWriteQueue(void *pWqueue) {
void* waitingResultThread(void* param) { void* waitingResultThread(void* param) {
SVWriteMsg* pWrite = (SVWriteMsg* )param; SVWriteMsg* pWrite = (SVWriteMsg* )param;
int32_t ret = sem_wait(pWrite->rspRet.psem_rsp); int32_t ret = sem_wait(pWrite->rspRet.psem);
if(ret == 0) { if(ret == 0) {
// success // success
} }
sem_destroy(pWrite->rspRet.psem_rsp); sem_destroy(pWrite->rspRet.psem);
tfree(pWrite->rspRet.psem);
// wait ok // wait ok
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle, .handle = pWrite->rpcMsg.handle,
...@@ -194,7 +196,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { ...@@ -194,7 +196,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
if (count <= 1) return; if (count <= 1) return;
if(pWrite->rspRet.psem_rsp == 0) { if(pWrite->rspRet.psem == 0) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle, .handle = pWrite->rpcMsg.handle,
.pCont = pWrite->rspRet.rsp, .pCont = pWrite->rspRet.rsp,
...@@ -208,7 +210,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { ...@@ -208,7 +210,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) {
// need async to wait result in another thread // need async to wait result in another thread
pthread_t* thread = taosCreateThread(waitingResultThread, pWrite); pthread_t* thread = taosCreateThread(waitingResultThread, pWrite);
// add to wait thread manager // add to wait thread manager
vnodeAddWait(pVnode, thread, pWrite->rspRet.psem_rsp, pWrite); vnodeAddWait(pVnode, thread, pWrite->rspRet.psem, pWrite);
} }
} }
......
...@@ -1005,7 +1005,7 @@ typedef struct { ...@@ -1005,7 +1005,7 @@ typedef struct {
#define CMD_DELETE_DATA 0x00000001 #define CMD_DELETE_DATA 0x00000001
typedef struct SControlData{ typedef struct SControlData{
uint32_t command; // see define CMD_??? uint32_t command; // see define CMD_???
STimeWindow win; STimeWindow win;
} SControlData; } SControlData;
......
...@@ -160,7 +160,7 @@ typedef struct { ...@@ -160,7 +160,7 @@ typedef struct {
* *
* @return the number of points inserted, -1 for failure and the error number is set * @return the number of points inserted, -1 for failure and the error number is set
*/ */
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp, sem_t* pSem); int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp, tsem_t** ppSem);
// -- FOR QUERY TIME SERIES DATA // -- FOR QUERY TIME SERIES DATA
......
...@@ -32,7 +32,7 @@ typedef struct { ...@@ -32,7 +32,7 @@ typedef struct {
int32_t len; int32_t len;
void * rsp; void * rsp;
void * qhandle; // used by query and retrieve msg void * qhandle; // used by query and retrieve msg
sem_t* psem_rsp; // if it is not zero, need wait result with async tsem_t* psem; // if it is not zero, need wait result with async
} SRspRet; } SRspRet;
typedef struct { typedef struct {
......
...@@ -34,7 +34,7 @@ typedef struct { ...@@ -34,7 +34,7 @@ typedef struct {
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
void *tsdbCommitData(STsdbRepo *pRepo); void *tsdbCommitData(STsdbRepo *pRepo, bool end);
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx); int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
...@@ -42,6 +42,9 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFil ...@@ -42,6 +42,9 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFil
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf); SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf);
int tsdbApplyRtn(STsdbRepo *pRepo); int tsdbApplyRtn(STsdbRepo *pRepo);
// commit control command
int tsdbCommitControl(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo);
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
if (fid >= pRtn->maxFid) { if (fid >= pRtn->maxFid) {
return 0; return 0;
......
...@@ -18,9 +18,9 @@ ...@@ -18,9 +18,9 @@
typedef enum { typedef enum {
COMMIT_REQ, COMMIT_REQ,
COMMIT_BOTH_REQ,
COMPACT_REQ, COMPACT_REQ,
TRUNCATE_TBL_REQ, CONTROL_REQ,
DELETE_TBL_REQ,
COMMIT_CONFIG_REQ, COMMIT_CONFIG_REQ,
} TSDB_REQ_T; } TSDB_REQ_T;
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#ifndef _TD_TSDB_MEMTABLE_H_ #ifndef _TD_TSDB_MEMTABLE_H_
#define _TD_TSDB_MEMTABLE_H_ #define _TD_TSDB_MEMTABLE_H_
#include "tsdbTruncate.h"
typedef struct { typedef struct {
int rowsInserted; int rowsInserted;
int rowsUpdated; int rowsUpdated;
...@@ -65,11 +66,12 @@ int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); ...@@ -65,11 +66,12 @@ int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pATable); int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pATable);
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot); void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo); // if pCtrlData is NULL, force must be true
int tsdbAsyncCommit(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo);
int tsdbSyncCommitConfig(STsdbRepo* pRepo); int tsdbSyncCommitConfig(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
void* tsdbCommitData(STsdbRepo* pRepo); void* tsdbCommitData(STsdbRepo* pRepo, bool end);
static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator* pIter) { static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL; if (pIter == NULL) return NULL;
......
...@@ -18,6 +18,15 @@ ...@@ -18,6 +18,15 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct {
SControlData ctlData;
// addition info
uint64_t uid; // table unique id
int32_t tid; // table id
tsem_t* pSem;
bool memNull; // pRepo->mem is NULL, this is true
SShellSubmitRspMsg *pRsp;
} SControlDataInfo;
void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param); void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param);
void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param); void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param);
......
...@@ -58,7 +58,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); ...@@ -58,7 +58,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile); static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
static int tsdbCommitTSData(STsdbRepo *pRepo); static int tsdbCommitTSData(STsdbRepo *pRepo);
static void tsdbStartCommit(STsdbRepo *pRepo); static void tsdbStartCommit(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo, int eno); static void tsdbEndCommit(STsdbRepo *pRepo, int eno, bool end);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCreateCommitIters(SCommitH *pCommith); static int tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith); static void tsdbDestroyCommitIters(SCommitH *pCommith);
...@@ -85,14 +85,14 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p ...@@ -85,14 +85,14 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows, int8_t update); TSKEY maxKey, int maxRows, int8_t update);
void *tsdbCommitData(STsdbRepo *pRepo) { void *tsdbCommitData(STsdbRepo *pRepo, bool end) {
if (pRepo->imem == NULL) { if (pRepo->imem == NULL) {
return NULL; return NULL;
} }
tsdbStartCommit(pRepo); tsdbStartCommit(pRepo);
if (tsShortcutFlag & TSDB_SHORTCUT_RB_TSDB_COMMIT) { if (tsShortcutFlag & TSDB_SHORTCUT_RB_TSDB_COMMIT) {
tsdbEndCommit(pRepo, terrno); tsdbEndCommit(pRepo, terrno, end);
return NULL; return NULL;
} }
...@@ -108,14 +108,14 @@ void *tsdbCommitData(STsdbRepo *pRepo) { ...@@ -108,14 +108,14 @@ void *tsdbCommitData(STsdbRepo *pRepo) {
goto _err; goto _err;
} }
tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS, end);
return NULL; return NULL;
_err: _err:
ASSERT(terrno != TSDB_CODE_SUCCESS); ASSERT(terrno != TSDB_CODE_SUCCESS);
pRepo->code = terrno; pRepo->code = terrno;
tsdbEndCommit(pRepo, terrno); tsdbEndCommit(pRepo, terrno, end);
return NULL; return NULL;
} }
...@@ -688,7 +688,7 @@ static void tsdbStartCommit(STsdbRepo *pRepo) { ...@@ -688,7 +688,7 @@ static void tsdbStartCommit(STsdbRepo *pRepo) {
pRepo->code = TSDB_CODE_SUCCESS; pRepo->code = TSDB_CODE_SUCCESS;
} }
static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { static void tsdbEndCommit(STsdbRepo *pRepo, int eno, bool end) {
if (eno != TSDB_CODE_SUCCESS) { if (eno != TSDB_CODE_SUCCESS) {
tsdbEndFSTxnWithError(REPO_FS(pRepo)); tsdbEndFSTxnWithError(REPO_FS(pRepo));
} else { } else {
...@@ -697,14 +697,21 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { ...@@ -697,14 +697,21 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); tsdbInfo("vgId:%d commit over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno); // notify
if (end && pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, eno);
}
SMemTable *pIMem = pRepo->imem; SMemTable *pIMem = pRepo->imem;
(void)tsdbLockRepo(pRepo); (void)tsdbLockRepo(pRepo);
pRepo->imem = NULL; pRepo->imem = NULL;
(void)tsdbUnlockRepo(pRepo); (void)tsdbUnlockRepo(pRepo);
tsdbUnRefMemTable(pRepo, pIMem); tsdbUnRefMemTable(pRepo, pIMem);
tsem_post(&(pRepo->readyToCommit));
// release readyToCommit allow next commit
if (end) {
tsem_post(&(pRepo->readyToCommit));
}
} }
#if 0 #if 0
...@@ -1772,3 +1779,39 @@ int tsdbApplyRtn(STsdbRepo *pRepo) { ...@@ -1772,3 +1779,39 @@ int tsdbApplyRtn(STsdbRepo *pRepo) {
return 0; return 0;
} }
int tsdbControlDelete(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo) {
int ret = TSDB_CODE_SUCCESS;
if(pCtlDataInfo->pRsp) {
pCtlDataInfo->pRsp->affectedRows = htonl(23);
pCtlDataInfo->pRsp->code = ret;
}
return ret;
}
// do control task
int tsdbCommitControl(STsdbRepo* pRepo, SControlDataInfo* pCtlDataInfo) {
int ret = TSDB_CODE_SUCCESS;
// do command
if(pCtlDataInfo->ctlData.command == CMD_DELETE_DATA) {
// delete data
ret = tsdbControlDelete(pRepo, pCtlDataInfo);
}
// notify response thread to response result to client
if (pCtlDataInfo->pSem) {
tsem_post(pCtlDataInfo->pSem);
}
// deal wal
if (pRepo->appH.notifyStatus)
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER, ret);
// release commitSep for next commit
tsem_post(&pRepo->readyToCommit);
return ret;
}
\ No newline at end of file
...@@ -189,13 +189,15 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -189,13 +189,15 @@ static void *tsdbLoopCommit(void *arg) {
param = ((SReq *)pNode->data)->param; param = ((SReq *)pNode->data)->param;
if (req == COMMIT_REQ) { if (req == COMMIT_REQ) {
tsdbCommitData(pRepo); tsdbCommitData(pRepo, true);
} else if (req == COMPACT_REQ) { } else if (req == COMPACT_REQ) {
tsdbCompactImpl(pRepo); tsdbCompactImpl(pRepo);
} else if (req == TRUNCATE_TBL_REQ) { } else if (req == COMMIT_BOTH_REQ) {
tsdbTruncateImpl(pRepo, param); SControlDataInfo* pCtlDataInfo = (SControlDataInfo* )param;
} else if (req == DELETE_TBL_REQ) { if(!pCtlDataInfo->memNull) {
tsdbDeleteImpl(pRepo, param); tsdbCommitData(pRepo, false);
}
tsdbCommitControl(pRepo, param);
} else if (req == COMMIT_CONFIG_REQ) { } else if (req == COMMIT_CONFIG_REQ) {
ASSERT(pRepo->config_changed); ASSERT(pRepo->config_changed);
tsdbApplyRepoConfig(pRepo); tsdbApplyRepoConfig(pRepo);
......
...@@ -188,7 +188,7 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { ...@@ -188,7 +188,7 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB
STsdbCfg *pCfg = &(pRepo->config); STsdbCfg *pCfg = &(pRepo->config);
if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
if (tsdbAsyncCommit(pRepo) < 0) return -1; if (tsdbAsyncCommit(pRepo, NULL) < 0) return -1;
} }
return 0; return 0;
} }
...@@ -202,7 +202,7 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { ...@@ -202,7 +202,7 @@ int tsdbCheckCommit(STsdbRepo *pRepo) {
if ((pRepo->mem->extraBuffList != NULL) || if ((pRepo->mem->extraBuffList != NULL) ||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
// trigger commit // trigger commit
if (tsdbAsyncCommit(pRepo) < 0) return -1; if (tsdbAsyncCommit(pRepo, NULL) < 0) return -1;
} }
return 0; return 0;
} }
......
...@@ -49,12 +49,12 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIte ...@@ -49,12 +49,12 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIte
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row); static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row);
static int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmitRspMsg *pRsp, sem_t* pSem); static int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmitRspMsg *pRsp, tsem_t** pSem);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now); TSKEY now);
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp, sem_t* pSem) { int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp, tsem_t** ppSem) {
STsdbRepo * pRepo = repo; STsdbRepo * pRepo = repo;
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL; SSubmitBlk * pBlock = NULL;
...@@ -74,7 +74,7 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR ...@@ -74,7 +74,7 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
if (pBlock == NULL) break; if (pBlock == NULL) break;
if (IS_CONTROL_BLOCK(pBlock)) { if (IS_CONTROL_BLOCK(pBlock)) {
// COMMAND DATA BLOCK // COMMAND DATA BLOCK
ret = tsdbInsertControlData(pRepo, pBlock, pRsp, pSem); ret = tsdbInsertControlData(pRepo, pBlock, pRsp, ppSem);
// all control msg is one SSubmitMsg, so need return // all control msg is one SSubmitMsg, so need return
return ret; return ret;
} else { } else {
...@@ -317,11 +317,12 @@ int tsdbSyncCommitConfig(STsdbRepo* pRepo) { ...@@ -317,11 +317,12 @@ int tsdbSyncCommitConfig(STsdbRepo* pRepo) {
return 0; return 0;
} }
int tsdbAsyncCommit(STsdbRepo *pRepo) { int tsdbAsyncCommit(STsdbRepo *pRepo, SControlDataInfo* pCtlDataInfo) {
tsem_wait(&(pRepo->readyToCommit)); tsem_wait(&(pRepo->readyToCommit));
ASSERT(pRepo->imem == NULL); ASSERT(pRepo->imem == NULL);
if (pRepo->mem == NULL) {
if (pRepo->mem == NULL && pCtlDataInfo == NULL) {
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
return 0; return 0;
} }
...@@ -332,11 +333,33 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -332,11 +333,33 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START, TSDB_CODE_SUCCESS); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START, TSDB_CODE_SUCCESS);
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem;
pRepo->mem = NULL; bool post = false;
if (tsdbScheduleCommit(pRepo, NULL, COMMIT_REQ) < 0) { if (pRepo->mem) {
tsem_post(&(pRepo->readyToCommit)); // has data in mem
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
if(pCtlDataInfo == NULL) {
if (tsdbScheduleCommit(pRepo, NULL, COMMIT_REQ) < 0)
post = true;
} else {
pCtlDataInfo->memNull = false;
if(tsdbScheduleCommit(pRepo, pCtlDataInfo, COMMIT_BOTH_REQ) < 0)
post = true;
}
} else {
// no data in mem
if (pCtlDataInfo) {
pCtlDataInfo->memNull = true;
if(tsdbScheduleCommit(pRepo, pCtlDataInfo, COMMIT_BOTH_REQ) < 0)
post = true;
}
} }
// need post
if(post)
tsem_post(&(pRepo->readyToCommit));
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
return 0; return 0;
...@@ -345,7 +368,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -345,7 +368,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
int tsdbSyncCommit(STsdbRepo *repo) { int tsdbSyncCommit(STsdbRepo *repo) {
STsdbRepo *pRepo = repo; STsdbRepo *pRepo = repo;
tsdbAsyncCommit(pRepo); tsdbAsyncCommit(pRepo, NULL);
tsem_wait(&(pRepo->readyToCommit)); tsem_wait(&(pRepo->readyToCommit));
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
...@@ -1096,38 +1119,46 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r ...@@ -1096,38 +1119,46 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
return 0; return 0;
} }
// Delete Data
int32_t tsdbInsertDeleteData(STsdbRepo* pRepo, SControlData* pCtlData, SShellSubmitRspMsg *pRsp, sem_t* pSem) {
pRsp->affectedRows = htonl(99);
// INIT SEM
int32_t ret = sem_init(pSem, 0, 0);
if(ret != 0) {
return TAOS_SYSTEM_ERROR(ret);
}
// CREATE DELETE MEMTABLE
// FORCE COMMIT ALL MEM AND IMEM
return 0;
}
// Control Data // Control Data
int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmitRspMsg *pRsp, sem_t* pSem) { int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmitRspMsg *pRsp, tsem_t** ppSem) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
assert(pBlock->dataLen == sizeof(SControlData)); assert(pBlock->dataLen == sizeof(SControlData));
SControlData* pCtlData = (SControlData* )pBlock->data; SControlData* pCtlData = (SControlData* )pBlock->data;
// INIT SEM FOR ASYNC WAIT COMMIT RESULT
if (ppSem) {
*ppSem = (tsem_t* )tmalloc(sizeof(tsem_t));
ret = tsem_init(*ppSem, 0, 0);
if(ret != 0) {
return TAOS_SYSTEM_ERROR(ret);
}
}
// anti-serialize // anti-serialize
pCtlData->command = htonl(pCtlData->command); pCtlData->command = htonl(pCtlData->command);
pCtlData->win.skey = htobe64(pCtlData->win.skey); pCtlData->win.skey = htobe64(pCtlData->win.skey);
pCtlData->win.ekey = htobe64(pCtlData->win.ekey); pCtlData->win.ekey = htobe64(pCtlData->win.ekey);
// server data set
SControlDataInfo* pNew = (SControlDataInfo* )tmalloc(sizeof(SControlDataInfo));
memset(pNew, 0, sizeof(SControlDataInfo));
pNew->ctlData = *pCtlData;
pNew->uid = pBlock->uid;
pNew->tid = pBlock->tid;
pNew->pRsp = pRsp;
if (ppSem)
pNew->pSem = *ppSem;
if(pCtlData->command == CMD_DELETE_DATA) { if(pCtlData->command == CMD_DELETE_DATA) {
ret = tsdbInsertDeleteData(pRepo, pCtlData, pRsp, pSem); // malloc new to pass commit thread
ret = tsdbAsyncCommit(pRepo, pNew);
} }
// if async post failed , must set wait event ppSem NULL
if(ret != TSDB_CODE_SUCCESS && ppSem) {
tsem_destroy(*ppSem);
*ppSem = NULL;
}
return ret; return ret;
} }
\ No newline at end of file
...@@ -45,13 +45,14 @@ typedef struct { ...@@ -45,13 +45,14 @@ typedef struct {
#define TSDB_TRUNCATE_COMP_BUF(pTruncateH) TSDB_READ_COMP_BUF(&((pTruncateH)->readh)) #define TSDB_TRUNCATE_COMP_BUF(pTruncateH) TSDB_READ_COMP_BUF(&((pTruncateH)->readh))
#define TSDB_TRUNCATE_EXBUF(pTruncateH) TSDB_READ_EXBUF(&((pTruncateH)->readh)) #define TSDB_TRUNCATE_EXBUF(pTruncateH) TSDB_READ_EXBUF(&((pTruncateH)->readh))
/*
static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param, TSDB_REQ_T type); static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param, TSDB_REQ_T type);
static void tsdbStartTruncate(STsdbRepo *pRepo); static void tsdbStartTruncate(STsdbRepo *pRepo);
static void tsdbEndTruncate(STsdbRepo *pRepo, int eno); static void tsdbEndTruncate(STsdbRepo *pRepo, int eno);
static int tsdbTruncateMeta(STsdbRepo *pRepo); static int tsdbTruncateMeta(STsdbRepo *pRepo);
static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type); static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type);
static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet); //static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet);
static int tsdbDeleteFSet(STruncateH *pTruncateH, SDFileSet *pSet); //static int tsdbDeleteFSet(STruncateH *pTruncateH, SDFileSet *pSet);
static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo); static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo);
static void tsdbDestroyTruncateH(STruncateH *pTruncateH); static void tsdbDestroyTruncateH(STruncateH *pTruncateH);
static int tsdbInitTruncateTblArray(STruncateH *pTruncateH); static int tsdbInitTruncateTblArray(STruncateH *pTruncateH);
...@@ -65,7 +66,7 @@ static int tsdbDeleteFSetImpl(STruncateH *pTruncateH); ...@@ -65,7 +66,7 @@ static int tsdbDeleteFSetImpl(STruncateH *pTruncateH);
static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock); static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock);
static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf, static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf,
void **ppCBuf, void **ppExBuf); void **ppCBuf, void **ppExBuf);
static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type); //static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type);
enum { enum {
TSDB_NO_TRUNCATE, TSDB_NO_TRUNCATE,
...@@ -73,18 +74,19 @@ enum { ...@@ -73,18 +74,19 @@ enum {
TSDB_WAITING_TRUNCATE, TSDB_WAITING_TRUNCATE,
}; };
int tsdbTruncateTbl(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, TRUNCATE_TBL_REQ); } int tsdbTruncateTbl(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, CONTROL_REQ); }
int tsdbDeleteData(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, DELETE_TBL_REQ); } int tsdbDeleteData(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param, CONTROL_REQ); }
void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) { void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) {
tsdbTruncateImplCommon(pRepo, param, TRUNCATE_TBL_REQ); //tsdbTruncateImplCommon(pRepo, param, TRUNCATE_TBL_REQ);
return NULL; return NULL;
} }
void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param) { void *tsdbDeleteImpl(STsdbRepo *pRepo, void *param) {
tsdbTruncateImplCommon(pRepo, param, DELETE_TBL_REQ); //tsdbTruncateImplCommon(pRepo, param, DELETE_TBL_REQ);
return NULL; return NULL;
} }
static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { static void *tsdbTruncateImplCommon(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
ASSERT(param != NULL); ASSERT(param != NULL);
int32_t code = 0; int32_t code = 0;
...@@ -233,6 +235,7 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { ...@@ -233,6 +235,7 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
continue; continue;
} }
#endif #endif
if (truncateH.type == TRUNCATE_TBL_REQ) { if (truncateH.type == TRUNCATE_TBL_REQ) {
if (tsdbTruncateFSet(&truncateH, pSet) < 0) { if (tsdbTruncateFSet(&truncateH, pSet) < 0) {
tsdbDestroyTruncateH(&truncateH); tsdbDestroyTruncateH(&truncateH);
...@@ -248,6 +251,7 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) { ...@@ -248,6 +251,7 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param, TSDB_REQ_T type) {
} else { } else {
ASSERT(false); ASSERT(false);
} }
} }
tsdbDestroyTruncateH(&truncateH); tsdbDestroyTruncateH(&truncateH);
...@@ -739,4 +743,5 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa ...@@ -739,4 +743,5 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa
// } // }
// return 0; // return 0;
// } // }
\ No newline at end of file */
\ No newline at end of file
...@@ -79,7 +79,7 @@ static int insertData(SInsertInfo *pInfo) { ...@@ -79,7 +79,7 @@ static int insertData(SInsertInfo *pInfo) {
pMsg->length = htonl(pMsg->length); pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tsdbInsertData(pInfo->pRepo, pMsg, NULL) < 0) { if (tsdbInsertData(pInfo->pRepo, pMsg, NULL, NULL) < 0) {
tfree(pMsg); tfree(pMsg);
return -1; return -1;
} }
......
...@@ -144,9 +144,9 @@ int32_t vnodeTruncateTbl(STruncateTblMsg *pMsg) { ...@@ -144,9 +144,9 @@ int32_t vnodeTruncateTbl(STruncateTblMsg *pMsg) {
param->nSpan = 1; param->nSpan = 1;
param->span[0].skey = 1634701320001; param->span[0].skey = 1634701320001;
param->span[0].ekey = 1634701320001; param->span[0].ekey = 1634701320001;
if (tsdbTruncateTbl(((SVnodeObj *)pVnode)->tsdb, param) < 0) { //if (tsdbTruncateTbl(((SVnodeObj *)pVnode)->tsdb, param) < 0) {
tfree(param); // tfree(param);
} //}
vnodeRelease(pVnode); vnodeRelease(pVnode);
} else { } else {
vInfo("vgId:%d, vnode not exist, can't truncate table %s in it", vgId, pMsg->tableFname); vInfo("vgId:%d, vnode not exist, can't truncate table %s in it", vgId, pMsg->tableFname);
...@@ -167,9 +167,9 @@ int32_t vnodeDeleteData(SDeleteDataMsg *pMsg) { ...@@ -167,9 +167,9 @@ int32_t vnodeDeleteData(SDeleteDataMsg *pMsg) {
param->nSpan = 1; param->nSpan = 1;
param->span[0].skey = 1634701320001; param->span[0].skey = 1634701320001;
param->span[0].ekey = 1634701320001; param->span[0].ekey = 1634701320001;
if (tsdbDeleteData(((SVnodeObj *)pVnode)->tsdb, param) < 0) { //if (tsdbDeleteData(((SVnodeObj *)pVnode)->tsdb, param) < 0) {
tfree(param); // tfree(param);
} //}
vnodeRelease(pVnode); vnodeRelease(pVnode);
} else { } else {
vInfo("vgId:%d, vnode not exist, can't truncate table %s in it", vgId, pMsg->tableFname); vInfo("vgId:%d, vnode not exist, can't truncate table %s in it", vgId, pMsg->tableFname);
...@@ -481,7 +481,7 @@ void freeWaitThread(SVnodeObj* pVnode) { ...@@ -481,7 +481,7 @@ void freeWaitThread(SVnodeObj* pVnode) {
if(loop == LOOP_CNT) if(loop == LOOP_CNT)
tsem_post(pWaitThread->psem); tsem_post(pWaitThread->psem);
taosMsleep(50); taosMsleep(50);
loop -= 1; loop -= 1;
if(loop == 0 ) if(loop == 0 )
break; break;
} }
...@@ -671,7 +671,7 @@ void vnodeAddWait(void* vparam, pthread_t* pthread, sem_t* psem, void* param) { ...@@ -671,7 +671,7 @@ void vnodeAddWait(void* vparam, pthread_t* pthread, sem_t* psem, void* param) {
pWaitThread->psem = psem; pWaitThread->psem = psem;
pWaitThread->param = param; pWaitThread->param = param;
int32_t crc = crc32c_sf(0, (crc_stream)pWaitThread, sizeof(void* )); int32_t crc = crc32c_sf(0, (crc_stream)param, sizeof(void* ));
taosWriteQitem(pVnode->tqueue, crc, pWaitThread); taosWriteQitem(pVnode->tqueue, crc, pWaitThread);
} }
......
...@@ -163,13 +163,15 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR ...@@ -163,13 +163,15 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
// save insert result into item // save insert result into item
SShellSubmitRspMsg *pRsp = NULL; SShellSubmitRspMsg *pRsp = NULL;
tsem_t** ppsem = NULL;
if (pRet) { if (pRet) {
pRet->len = sizeof(SShellSubmitRspMsg); pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len); pRet->rsp = rpcMallocCont(pRet->len);
pRsp = pRet->rsp; pRsp = pRet->rsp;
ppsem = &pRet->psem;
} }
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp, pRet->psem_rsp) < 0) { if (tsdbInsertData(pVnode->tsdb, pCont, pRsp, ppsem) < 0) {
code = terrno; code = terrno;
} else { } else {
if (pRsp != NULL) atomic_fetch_add_64(&tsSubmitReqSucNum, 1); if (pRsp != NULL) atomic_fetch_add_64(&tsSubmitReqSucNum, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册