提交 e9bb654a 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last

...@@ -2663,6 +2663,10 @@ typedef struct { ...@@ -2663,6 +2663,10 @@ typedef struct {
SEpSet epSet; SEpSet epSet;
} SVgEpSet; } SVgEpSet;
typedef struct {
int32_t padding;
} SRSmaExecMsg;
typedef struct { typedef struct {
int64_t suid; int64_t suid;
int8_t level; int8_t level;
......
...@@ -202,6 +202,7 @@ enum { ...@@ -202,6 +202,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL) TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
......
...@@ -34,6 +34,7 @@ extern bool gRaftDetailLog; ...@@ -34,6 +34,7 @@ extern bool gRaftDetailLog;
#define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_PROGRESS_WAIT_MS 4000
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
#define SYNC_MAX_RECV_TIME_RANGE_MS 1000 #define SYNC_MAX_RECV_TIME_RANGE_MS 1000
#define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_MAX_BATCH_SIZE 1 #define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
......
...@@ -615,6 +615,7 @@ int32_t* taosGetErrno(); ...@@ -615,6 +615,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154) #define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154)
#define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) #define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155)
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157)
//index //index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
...@@ -338,6 +338,7 @@ SArray *vmGetMsgHandles() { ...@@ -338,6 +338,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_EXEC_RSMA, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -63,6 +63,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); ...@@ -63,6 +63,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray* list);
void *vnodeGetIdx(SVnode *pVnode); void *vnodeGetIdx(SVnode *pVnode);
void *vnodeGetIvtIdx(SVnode *pVnode); void *vnodeGetIvtIdx(SVnode *pVnode);
...@@ -95,6 +96,7 @@ int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHash ...@@ -95,6 +96,7 @@ int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHash
int32_t metaReadNext(SMetaReader *pReader); int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal); const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
typedef struct SMetaFltParam { typedef struct SMetaFltParam {
tb_uid_t suid; tb_uid_t suid;
......
...@@ -60,6 +60,7 @@ typedef struct { ...@@ -60,6 +60,7 @@ typedef struct {
#define SMA_ENV_LOCK(env) (&(env)->lock) #define SMA_ENV_LOCK(env) (&(env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type) #define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat) #define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_RSMA_STAT(sma) ((SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)(sma)->pRSmaEnv))
struct STSmaStat { struct STSmaStat {
int8_t state; // ETsdbSmaStat int8_t state; // ETsdbSmaStat
...@@ -89,12 +90,14 @@ struct SRSmaStat { ...@@ -89,12 +90,14 @@ struct SRSmaStat {
SSma *pSma; SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit int64_t commitAppliedVer; // vnode applied version for async commit
int64_t refId; // shared by fetch tasks int64_t refId; // shared by fetch tasks
volatile int64_t qBufSize; // queue buffer size
SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo)
int8_t triggerStat; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing int8_t commitStat; // 0 not in committing, 1 in committing
int8_t execStat; // 0 not in exec , 1 in exec
SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w)
SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; SHashObj *infoHash; // key: suid, value: SRSmaInfo
SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2
}; };
struct SSmaStat { struct SSmaStat {
...@@ -105,10 +108,10 @@ struct SSmaStat { ...@@ -105,10 +108,10 @@ struct SSmaStat {
T_REF_DECLARE() T_REF_DECLARE()
}; };
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat) #define SMA_STAT_TSMA(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat) #define SMA_STAT_RSMA(s) (&(s)->rsmaStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) #define RSMA_INFO_HASH(r) ((r)->infoHash)
#define RSMA_IMU_INFO_HASH(r) ((r)->iRsmaInfoHash) #define RSMA_FETCH_HASH(r) ((r)->fetchHash)
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat) #define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId) #define RSMA_REF_ID(r) ((r)->refId)
...@@ -117,6 +120,7 @@ struct SSmaStat { ...@@ -117,6 +120,7 @@ struct SSmaStat {
struct SRSmaInfoItem { struct SRSmaInfoItem {
int8_t level; int8_t level;
int8_t triggerStat; int8_t triggerStat;
uint16_t interval; // second
int32_t maxDelay; int32_t maxDelay;
tmr_h tmrId; tmr_h tmrId;
}; };
...@@ -125,14 +129,19 @@ struct SRSmaInfo { ...@@ -125,14 +129,19 @@ struct SRSmaInfo {
STSchema *pTSchema; STSchema *pTSchema;
int64_t suid; int64_t suid;
int64_t refId; // refId of SRSmaStat int64_t refId; // refId of SRSmaStat
int8_t delFlag; uint64_t delFlag : 1;
uint64_t lastReceived : 63; // second
T_REF_DECLARE() T_REF_DECLARE()
SRSmaInfoItem items[TSDB_RETENTION_L2]; SRSmaInfoItem items[TSDB_RETENTION_L2];
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable STaosQueue *queue; // buffer queue of SubmitReq
STaosQall *qall; // buffer qall of SubmitReq
void *iTaskInfo[TSDB_RETENTION_L2]; // immutable qTaskInfo_t
STaosQueue *iQueue; // immutable buffer queue of SubmitReq
STaosQall *iQall; // immutable buffer qall of SubmitReq
}; };
#define RSMA_INFO_HEAD_LEN 32 #define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items)
#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1) #define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1)
#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1) #define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1)
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i]) #define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
...@@ -161,6 +170,12 @@ enum { ...@@ -161,6 +170,12 @@ enum {
RSMA_RESTORE_SYNC = 2, RSMA_RESTORE_SYNC = 2,
}; };
typedef enum {
RSMA_EXEC_OVERFLOW = 1, // triggered by queue buf overflow
RSMA_EXEC_TIMEOUT = 2, // triggered by timer
RSMA_EXEC_COMMIT = 3, // triggered by commit
} ERsmaExecType;
void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
...@@ -228,12 +243,13 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { ...@@ -228,12 +243,13 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName);
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc); int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level);
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); int32_t tdProcessRSmaRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer);
......
...@@ -65,6 +65,7 @@ struct SVBufPool { ...@@ -65,6 +65,7 @@ struct SVBufPool {
SVBufPool* next; SVBufPool* next;
SVnode* pVnode; SVnode* pVnode;
volatile int32_t nRef; volatile int32_t nRef;
TdThreadSpinlock lock;
int64_t size; int64_t size;
uint8_t* ptr; uint8_t* ptr;
SVBufPoolNode* pTail; SVBufPoolNode* pTail;
......
...@@ -199,6 +199,7 @@ int32_t smaAsyncCommit(SSma* pSma); ...@@ -199,6 +199,7 @@ int32_t smaAsyncCommit(SSma* pSma);
int32_t smaAsyncPostCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t smaDoRetention(SSma* pSma, int64_t now);
int32_t smaProcessFetch(SSma* pSma, void* pMsg); int32_t smaProcessFetch(SSma* pSma, void* pMsg);
int32_t smaProcessExec(SSma* pSma, void* pMsg);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
......
...@@ -127,6 +127,15 @@ _err: ...@@ -127,6 +127,15 @@ _err:
// return 0; // return 0;
// } // }
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
// query uid.idx
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
return false;
}
return true;
}
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
SMeta *pMeta = pReader->pMeta; SMeta *pMeta = pReader->pMeta;
int64_t version; int64_t version;
......
...@@ -83,8 +83,7 @@ int32_t smaBegin(SSma *pSma) { ...@@ -83,8 +83,7 @@ int32_t smaBegin(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
int8_t rsmaTriggerStat = int8_t rsmaTriggerStat =
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE); atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE);
...@@ -123,7 +122,7 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { ...@@ -123,7 +122,7 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
} }
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
// step 1: set rsma stat paused // step 1: set rsma stat paused
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
...@@ -289,8 +288,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { ...@@ -289,8 +288,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(SMA_ENV_STAT(pSmaEnv));
// cleanup outdated qtaskinfo files // cleanup outdated qtaskinfo files
tdCleanupQTaskInfoFiles(pSma, pRSmaStat); tdCleanupQTaskInfoFiles(pSma, pRSmaStat);
...@@ -299,10 +297,9 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { ...@@ -299,10 +297,9 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) {
} }
/** /**
* @brief Rsma async commit implementation * @brief Rsma async commit implementation(only do some necessary light weighted task)
* 1) set rsma stat TASK_TRIGGER_STAT_PAUSED * 1) set rsma stat TASK_TRIGGER_STAT_PAUSED
* 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write) * 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write)
* 3)
* *
* @param pSma * @param pSma
* @return int32_t * @return int32_t
...@@ -314,7 +311,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -314,7 +311,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
} }
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat);
// step 1: set rsma stat // step 1: set rsma stat
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
...@@ -336,28 +333,55 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ...@@ -336,28 +333,55 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
} }
} }
// step 3: swap rsmaInfoHash and iRsmaInfoHash /**
* @brief step 3: consume the SubmitReq in buffer
* 1) This is high cost task and should not put in asyncPreCommit originally.
* 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently.
*/
nLoops = 0;
smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1);
if (old == 0) break;
if (++nLoops > 1000) {
sched_yield();
nLoops = 0;
smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
}
}
smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) {
atomic_store_8(&pRSmaStat->execStat, 0);
return TSDB_CODE_FAILED;
}
// step 4: swap queue/qall and iQueue/iQall
// lock // lock
taosWLockLatch(SMA_ENV_LOCK(pEnv)); taosWLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(RSMA_INFO_HASH(pRSmaStat)); ASSERT(RSMA_INFO_HASH(pRSmaStat));
ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat));
RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat); void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
RSMA_INFO_HASH(pRSmaStat) =
taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_INFO_HASH(pRSmaStat)) { while (pIter) {
// unlock SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); TSWAP(pInfo->iQall, pInfo->qall);
smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr()); TSWAP(pInfo->iQueue, pInfo->queue);
return TSDB_CODE_FAILED; TSWAP(pInfo->iTaskInfo[0], pInfo->taskInfo[0]);
TSWAP(pInfo->iTaskInfo[1], pInfo->taskInfo[1]);
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
} }
atomic_store_64(&pRSmaStat->qBufSize, 0);
atomic_store_8(&pRSmaStat->execStat, 0);
// unlock // unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
// step 4: others // step 5: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -375,17 +399,18 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { ...@@ -375,17 +399,18 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
// perform persist task for qTaskInfo // perform persist task for qTaskInfo operator
tdRSmaPersistExecImpl(pRSmaStat, RSMA_IMU_INFO_HASH(pRSmaStat)); if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/** /**
* @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsmaInfoHash not empty. * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsma infoHash not empty.
* *
* @param pSma * @param pSma
* @return int32_t * @return int32_t
...@@ -396,37 +421,25 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { ...@@ -396,37 +421,25 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); SArray *rsmaDeleted = NULL;
// step 1: merge rsmaInfoHash and iRsmaInfoHash // step 1: merge qTaskInfo and iQTaskInfo
// lock // lock
taosWLockLatch(SMA_ENV_LOCK(pEnv)); taosWLockLatch(SMA_ENV_LOCK(pEnv));
#if 0
if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL);
// just switch the hash pointer if rsmaInfoHash is empty
if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) {
SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat);
RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat);
RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash;
}
} else {
#endif
#if 1
void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL);
while (pIter) { while (pIter) {
tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
if (RSMA_INFO_IS_DEL(pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) {
int32_t refVal = T_REF_VAL_GET(pRSmaInfo); int32_t refVal = T_REF_VAL_GET(pRSmaInfo);
if (refVal == 0) { if (refVal == 0) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true); if (!rsmaDeleted) {
smaDebug( if ((rsmaDeleted = taosArrayInit(1, sizeof(tb_uid_t)))) {
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for " taosArrayPush(rsmaDeleted, pSuid);
"table:%" PRIi64, }
SMA_VID(pSma), *pSuid); }
} else { } else {
smaDebug( smaDebug(
"vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for "
...@@ -434,27 +447,40 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { ...@@ -434,27 +447,40 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
SMA_VID(pSma), refVal, *pSuid); SMA_VID(pSma), refVal, *pSuid);
} }
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
continue; continue;
} }
taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), if (pRSmaInfo->taskInfo[0]) {
*pSuid); if (pRSmaInfo->iTaskInfo[0]) {
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0];
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
pRSmaInfo->iTaskInfo[0] = NULL;
}
} else { } else {
// free the resources TSWAP(pRSmaInfo->taskInfo[0], pRSmaInfo->iTaskInfo[0]);
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter;
tdFreeRSmaInfo(pSma, pRSmaInfo, false);
smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma),
*pSuid);
} }
pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter));
smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid);
pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter);
} }
#endif
// }
taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat)); for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) {
RSMA_IMU_INFO_HASH(pRSmaStat) = NULL; tb_uid_t *pSuid = taosArrayGet(rsmaDeleted, i);
void *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
tdFreeRSmaInfo(pSma, pRSmaInfo, true);
smaDebug(
"vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for "
"table:%" PRIi64,
SMA_VID(pSma), *pSuid);
}
taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t));
}
taosArrayDestroy(rsmaDeleted);
// TODO: remove suid in files?
// unlock // unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
......
...@@ -228,7 +228,12 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS ...@@ -228,7 +228,12 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
RSMA_INFO_HASH(pRSmaStat) = taosHashInit( RSMA_INFO_HASH(pRSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_INFO_HASH(pRSmaStat)) { if (!RSMA_INFO_HASH(pRSmaStat)) {
taosMemoryFreeClear(*pSmaStat); return TSDB_CODE_FAILED;
}
RSMA_FETCH_HASH(pRSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (!RSMA_FETCH_HASH(pRSmaStat)) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
...@@ -264,8 +269,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { ...@@ -264,8 +269,6 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED);
// step 2: destroy the rsma info and associated fetch tasks // step 2: destroy the rsma info and associated fetch tasks
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
#if 1
if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) { if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) {
void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL); void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL);
while (infoHash) { while (infoHash) {
...@@ -274,10 +277,12 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { ...@@ -274,10 +277,12 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash); infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash);
} }
} }
#endif
taosHashCleanup(RSMA_INFO_HASH(pStat)); taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 3: wait all triggered fetch tasks finished // step 3: destroy the rsma fetch hash
taosHashCleanup(RSMA_FETCH_HASH(pStat));
// step 4: wait all triggered fetch tasks finished
int32_t nLoops = 0; int32_t nLoops = 0;
while (1) { while (1) {
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
...@@ -293,7 +298,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { ...@@ -293,7 +298,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
} }
} }
// step 4: free pStat // step 5: free pStat
taosMemoryFreeClear(pStat); taosMemoryFreeClear(pStat);
} }
} }
...@@ -318,9 +323,9 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { ...@@ -318,9 +323,9 @@ void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
if (pSmaStat) { if (pSmaStat) {
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat)); tdDestroyTSmaStat(SMA_STAT_TSMA(pSmaStat));
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) { } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat); SRSmaStat *pRSmaStat = &pSmaStat->rsmaStat;
int32_t vid = SMA_VID(pRSmaStat->pSma); int32_t vid = SMA_VID(pRSmaStat->pSma);
int64_t refId = RSMA_REF_ID(pRSmaStat); int64_t refId = RSMA_REF_ID(pRSmaStat);
if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) { if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
......
...@@ -15,8 +15,10 @@ ...@@ -15,8 +15,10 @@
#include "sma.h" #include "sma.h"
#define RSMA_QTASKINFO_BUFSIZE 32768 #define RSMA_QTASKINFO_BUFSIZE (32768)
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
#define RSMA_QTASKEXEC_BUFSIZE (1048576)
#define RSMA_SUBMIT_BATCH_SIZE (1024)
SSmaMgmt smaMgmt = { SSmaMgmt smaMgmt = {
.inited = 0, .inited = 0,
...@@ -27,17 +29,20 @@ SSmaMgmt smaMgmt = { ...@@ -27,17 +29,20 @@ SSmaMgmt smaMgmt = {
#define TD_RSMAINFO_DEL_FILE "rsmainfo.del" #define TD_RSMAINFO_DEL_FILE "rsmainfo.del"
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
typedef struct SRSmaExecQItem SRSmaExecQItem;
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx); int8_t idx);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
int8_t level); ERsmaExecType type, int8_t level);
static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid);
static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
static void tdFreeRSmaSubmitItems(SArray *pItems);
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr);
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid, int8_t blkType); int64_t suid);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level); static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile);
...@@ -76,6 +81,11 @@ struct SRSmaQTaskInfoIter { ...@@ -76,6 +81,11 @@ struct SRSmaQTaskInfoIter {
int32_t nBufPos; int32_t nBufPos;
}; };
struct SRSmaExecQItem {
void *pRSmaInfo;
void *qall;
};
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) { void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) {
tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName);
} }
...@@ -139,6 +149,18 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { ...@@ -139,6 +149,18 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
if (isDeepFree) { if (isDeepFree) {
taosMemoryFreeClear(pInfo->pTSchema); taosMemoryFreeClear(pInfo->pTSchema);
} }
if (isDeepFree) {
if (pInfo->queue) taosCloseQueue(pInfo->queue);
if (pInfo->qall) taosFreeQall(pInfo->qall);
if (pInfo->iQueue) taosCloseQueue(pInfo->iQueue);
if (pInfo->iQall) taosFreeQall(pInfo->iQall);
pInfo->queue = NULL;
pInfo->qall = NULL;
pInfo->iQueue = NULL;
pInfo->iQall = NULL;
}
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} }
...@@ -179,7 +201,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) ...@@ -179,7 +201,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) { if (pRSmaInfo->taskInfo[i]) {
if ((qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true) < 0)) { if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true)) < 0)) {
tdReleaseRSmaInfo(pSma, pRSmaInfo); tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
terrstr()); terrstr());
...@@ -351,6 +373,19 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con ...@@ -351,6 +373,19 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
goto _err; goto _err;
} }
pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->pTSchema = pTSchema;
if (!(pRSmaInfo->queue = taosOpenQueue())) {
goto _err;
}
if (!(pRSmaInfo->qall = taosAllocateQall())) {
goto _err;
}
if (!(pRSmaInfo->iQueue = taosOpenQueue())) {
goto _err;
}
if (!(pRSmaInfo->iQall = taosAllocateQall())) {
goto _err;
}
pRSmaInfo->suid = suid; pRSmaInfo->suid = suid;
pRSmaInfo->refId = RSMA_REF_ID(pStat); pRSmaInfo->refId = RSMA_REF_ID(pStat);
T_REF_INIT_VAL(pRSmaInfo, 1); T_REF_INIT_VAL(pRSmaInfo, 1);
...@@ -419,8 +454,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { ...@@ -419,8 +454,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat);
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid); SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid);
...@@ -528,6 +562,14 @@ void *tdUidStoreFree(STbUidStore *pStore) { ...@@ -528,6 +562,14 @@ void *tdUidStoreFree(STbUidStore *pStore) {
return NULL; return NULL;
} }
/**
* @brief The SubmitReq for rsma L2/L3 is inserted by tsdbInsertData method directly while not by WriteQ, as the queue
* would be freed when close Vnode, thus lock should be used if with race condition.
* @param pTsdb
* @param version
* @param pReq
* @return int32_t
*/
static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
if (!pReq) { if (!pReq) {
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
...@@ -535,7 +577,7 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { ...@@ -535,7 +577,7 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
} }
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
// TODO: spin lock for race conditiond
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) { if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -569,17 +611,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { ...@@ -569,17 +611,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
return 0; return 0;
} }
static void tdDestroySDataBlockArray(SArray *pArray) {
// TODO
#if 0
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SSDataBlock *pDataBlock = taosArrayGet(pArray, i);
blockDestroyInner(pDataBlock);
}
#endif
taosArrayDestroy(pArray);
}
/** /**
* @brief retention of rsma1/rsma2 * @brief retention of rsma1/rsma2
* *
...@@ -605,7 +636,7 @@ _end: ...@@ -605,7 +636,7 @@ _end:
} }
static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema,
int64_t suid, int8_t blkType) { int64_t suid) {
SArray *pResList = taosArrayInit(1, POINTER_BYTES); SArray *pResList = taosArrayInit(1, POINTER_BYTES);
if (pResList == NULL) { if (pResList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -637,8 +668,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm ...@@ -637,8 +668,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
} else { } else {
smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level);
} }
#if 0
#if 1
char flag[10] = {0}; char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, pItem->level); snprintf(flag, 10, "level %" PRIi8, pItem->level);
blockDebugShowDataBlocks(pResList, flag); blockDebugShowDataBlocks(pResList, flag);
...@@ -665,7 +695,6 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm ...@@ -665,7 +695,6 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64, smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%" PRIi64,
SMA_VID(pSma), suid, pItem->level, output->info.version); SMA_VID(pSma), suid, pItem->level, output->info.version);
} }
} }
...@@ -677,34 +706,113 @@ _err: ...@@ -677,34 +706,113 @@ _err:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, /**
int8_t level) { * @brief Copy msg to rsmaQueueBuffer for batch process
*
* @param pSma
* @param pMsg
* @param inputType
* @param pInfo
* @param suid
* @return int32_t
*/
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo,
tb_uid_t suid) {
const SSubmitReq *pReq = (const SSubmitReq *)pMsg;
void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM);
if (!qItem) {
return TSDB_CODE_FAILED;
}
memcpy(qItem, pMsg, pReq->header.contLen);
taosWriteQitem(pInfo->queue, qItem);
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen);
// smoothing consume
int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZE;
if (n > 1) {
if (n > 10) {
n = 10;
}
taosMsleep(n << 4);
if (n > 2) {
smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4);
} else {
smaDebug("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma),
taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t tdRsmaPrintSubmitReq(SSma *pSma, SSubmitReq *pReq) {
SSubmitMsgIter msgIter = {0};
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
if (tInitSubmitMsgIter(pReq, &msgIter) < 0) return -1;
while (true) {
SSubmitBlk *pBlock = NULL;
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
smaDebug("vgId:%d, numOfRows:%d, suid:%" PRIi64 ", uid:%" PRIi64 ", version:%" PRIi64 ", ts:%" PRIi64,
SMA_VID(pSma), msgIter.numOfRows, msgIter.suid, msgIter.uid, pReq->version, row->ts);
}
}
return 0;
}
/**
* @brief sync mode
*
* @param pSma
* @param pMsg
* @param msgSize
* @param inputType
* @param pInfo
* @param type
* @param level
* @return int32_t
*/
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
ERsmaExecType type, int8_t level) {
int32_t idx = level - 1; int32_t idx = level - 1;
if (!pInfo || !RSMA_INFO_QTASK(pInfo, idx)) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); void *qTaskInfo = (type == RSMA_EXEC_COMMIT) ? RSMA_INFO_IQTASK(pInfo, idx) : RSMA_INFO_QTASK(pInfo, idx);
if (!qTaskInfo) {
smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level,
pInfo->suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!pInfo->pTSchema) { if (!pInfo->pTSchema) {
smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level,
RSMA_INFO_QTASK(pInfo, idx), suid); RSMA_INFO_QTASK(pInfo, idx), pInfo->suid);
if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT #if 0
for (int32_t i = 0; i < msgSize; ++i) {
SSubmitReq *pReq = *(SSubmitReq **)((char *)pMsg + i * sizeof(void *));
smaDebug("vgId:%d, [%d][%d] version %" PRIi64, SMA_VID(pSma), msgSize, i, pReq->version);
tdRsmaPrintSubmitReq(pSma, pReq);
}
#endif
if (qSetMultiStreamInput(qTaskInfo, pMsg, msgSize, inputType) < 0) {
smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx);
tdRSmaFetchAndSubmitResult(pSma, RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid);
STREAM_INPUT__DATA_SUBMIT);
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
if (smaMgmt.tmrHandle) {
taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -739,73 +847,110 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { ...@@ -739,73 +847,110 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL; return NULL;
} }
if (!pRSmaInfo->taskInfo[0]) {
if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return NULL;
}
}
tdRefRSmaInfo(pSma, pRSmaInfo); tdRefRSmaInfo(pSma, pRSmaInfo);
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
ASSERT(pRSmaInfo->suid == suid);
return pRSmaInfo; return pRSmaInfo;
} }
taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat
return NULL; return NULL;
}
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
if (pInfo) {
tdUnRefRSmaInfo(pSma, pInfo);
} }
}
// clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat /**
SRSmaInfo *pCowRSmaInfo = NULL; * @brief async mode
// lock *
taosWLockLatch(SMA_ENV_LOCK(pEnv)); * @param pSma
if (!(pCowRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)))) { // 2-phase lock * @param pMsg
void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); * @param inputType
if (iRSmaInfo) { * @param suid
SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo; * @return int32_t
if (pIRSmaInfo && !RSMA_INFO_IS_DEL(pIRSmaInfo)) { */
if (tdCloneRSmaInfo(pSma, &pCowRSmaInfo, pIRSmaInfo) < 0) { static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) {
// unlock SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); if (!pRSmaInfo) {
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
return NULL; return TSDB_CODE_SUCCESS;
} }
smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid);
if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) { if (inputType == STREAM_INPUT__DATA_SUBMIT) {
// unlock if (tdExecuteRSmaImplAsync(pSma, pMsg, inputType, pRSmaInfo, suid) < 0) {
taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); return TSDB_CODE_FAILED;
return NULL;
} }
if (smaMgmt.tmrHandle) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, 0);
if (pItem->level > 0) {
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
}
pItem = RSMA_INFO_ITEM(pRSmaInfo, 1);
if (pItem->level > 0) {
atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
} }
} }
} else { } else {
pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo; ASSERT(0);
ASSERT(!pCowRSmaInfo);
} }
if (pCowRSmaInfo) { tdReleaseRSmaInfo(pSma, pRSmaInfo);
tdRefRSmaInfo(pSma, pCowRSmaInfo); return TSDB_CODE_SUCCESS;
}
// unlock
taosWUnLockLatch(SMA_ENV_LOCK(pEnv));
return pCowRSmaInfo;
} }
static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { static int32_t tdRSmaExecCheck(SSma *pSma) {
if (pInfo) { SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
tdUnRefRSmaInfo(pSma, pInfo); int64_t bufSize = atomic_load_64(&pRSmaStat->qBufSize);
if (bufSize < RSMA_QTASKEXEC_BUFSIZE) {
smaDebug("vgId:%d, bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize,
RSMA_QTASKEXEC_BUFSIZE);
return TSDB_CODE_SUCCESS;
} }
}
static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 1) {
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); smaDebug("vgId:%d, bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma),
if (!pRSmaInfo) { bufSize);
smaError("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (inputType == STREAM_INPUT__DATA_SUBMIT) { smaDebug("vgId:%d, bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize);
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L1);
tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L2); SRSmaExecMsg fetchMsg;
int32_t contLen = sizeof(SMsgHead);
void *pBuf = rpcMallocCont(0 + contLen);
((SMsgHead *)pBuf)->vgId = SMA_VID(pSma);
((SMsgHead *)pBuf)->contLen = sizeof(SMsgHead);
SRpcMsg rpcMsg = {
.code = 0,
.msgType = TDMT_VND_EXEC_RSMA,
.pCont = pBuf,
.contLen = contLen,
};
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) {
smaError("vgId:%d, failed to put rsma exec msg into query-queue since %s", SMA_VID(pSma), terrstr());
goto _err;
} }
tdReleaseRSmaInfo(pSma, pRSmaInfo); smaDebug("vgId:%d, success to put rsma fetch msg into query-queue", SMA_VID(pSma));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err:
atomic_store_8(&pRSmaStat->execStat, 0);
return TSDB_CODE_FAILED;
} }
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
...@@ -826,45 +971,62 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { ...@@ -826,45 +971,62 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
tdFetchSubmitReqSuids(pMsg, &uidStore); tdFetchSubmitReqSuids(pMsg, &uidStore);
if (uidStore.suid != 0) { if (uidStore.suid != 0) {
tdExecuteRSma(pSma, pMsg, inputType, uidStore.suid); tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid);
void *pIter = taosHashIterate(uidStore.uidHash, NULL); void *pIter = taosHashIterate(uidStore.uidHash, NULL);
while (pIter) { while (pIter) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
tdExecuteRSma(pSma, pMsg, inputType, *pTbSuid); tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid);
pIter = taosHashIterate(uidStore.uidHash, pIter); pIter = taosHashIterate(uidStore.uidHash, pIter);
} }
tdUidStoreDestory(&uidStore); tdUidStoreDestory(&uidStore);
tdRSmaExecCheck(pSma);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/**
* @brief retrieve rsma meta and init
*
* @param pSma
* @param nTables number of tables of rsma
* @return int32_t
*/
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
SArray *suidList = NULL;
STbUidStore uidStore = {0};
SMetaReader mr = {0};
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(suidList); goto _err;
}
if (vnodeGetStbIdList(pSma->pVnode, 0, suidList) < 0) {
smaError("vgId:%d, failed to restore rsma env since get stb id list error: %s", TD_VID(pVnode), terrstr()); smaError("vgId:%d, failed to restore rsma env since get stb id list error: %s", TD_VID(pVnode), terrstr());
return TSDB_CODE_FAILED; goto _err;
} }
int64_t arrSize = taosArrayGetSize(suidList); int64_t arrSize = taosArrayGetSize(suidList);
if (arrSize == 0) {
if (nTables) { if (nTables) {
*nTables = arrSize; *nTables = 0;
} }
if (arrSize == 0) {
taosArrayDestroy(suidList); taosArrayDestroy(suidList);
smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode)); smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SMetaReader mr = {0}; int64_t nRsmaTables = 0;
metaReaderInit(&mr, SMA_META(pSma), 0); metaReaderInit(&mr, SMA_META(pSma), 0);
if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
goto _err;
}
for (int64_t i = 0; i < arrSize; ++i) { for (int64_t i = 0; i < arrSize; ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid); smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid);
...@@ -877,6 +1039,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { ...@@ -877,6 +1039,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
ASSERT(mr.me.type == TSDB_SUPER_TABLE); ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == suid); ASSERT(mr.me.uid == suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) { if (TABLE_IS_ROLLUP(mr.me.flags)) {
++nRsmaTables;
SRSmaParam *param = &mr.me.stbEntry.rsmaParam; SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
for (int i = 0; i < TSDB_RETENTION_L2; ++i) { for (int i = 0; i < TSDB_RETENTION_L2; ++i) {
smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64 smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64
...@@ -887,17 +1050,40 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { ...@@ -887,17 +1050,40 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
goto _err; goto _err;
} }
// reload all ctbUids for suid
uidStore.suid = suid;
if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) {
smaError("vgId:%d, rsma restore, get ctb idlist failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
if (tdUpdateTbUidList(pVnode->pSma, &uidStore) < 0) {
smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
terrstr());
goto _err;
}
taosArrayClear(uidStore.tbUids);
smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid); smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid);
} }
} }
metaReaderClear(&mr); metaReaderClear(&mr);
taosArrayDestroy(suidList); taosArrayDestroy(suidList);
tdUidStoreDestory(&uidStore);
if (nTables) {
*nTables = nRsmaTables;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
metaReaderClear(&mr); metaReaderClear(&mr);
taosArrayDestroy(suidList); taosArrayDestroy(suidList);
tdUidStoreDestory(&uidStore);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1230,7 +1416,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { ...@@ -1230,7 +1416,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
} }
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i); qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i);
if (!taskInfo) { if (!taskInfo) {
smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1);
continue; continue;
...@@ -1388,7 +1574,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -1388,7 +1574,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
} }
_end: _end:
// taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
} }
...@@ -1400,7 +1586,7 @@ _end: ...@@ -1400,7 +1586,7 @@ _end:
* @param level * @param level
* @return int32_t * @return int32_t
*/ */
int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level}; SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level};
int32_t ret = 0; int32_t ret = 0;
int32_t contLen = 0; int32_t contLen = 0;
...@@ -1427,7 +1613,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { ...@@ -1427,7 +1613,7 @@ int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) {
.code = 0, .code = 0,
.msgType = TDMT_VND_FETCH_RSMA, .msgType = TDMT_VND_FETCH_RSMA,
.pCont = pBuf, .pCont = pBuf,
.contLen = contLen, .contLen = contLen + sizeof(SMsgHead),
}; };
if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) {
...@@ -1456,9 +1642,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { ...@@ -1456,9 +1642,7 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
SRSmaFetchMsg req = {0}; SRSmaFetchMsg req = {0};
SDecoder decoder = {0}; SDecoder decoder = {0};
void *pBuf = NULL; void *pBuf = NULL;
SRSmaInfo *pInfo = NULL; SRSmaStat *pRSmaStat = NULL;
SRSmaInfoItem *pItem = NULL;
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
goto _err; goto _err;
...@@ -1472,37 +1656,265 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) { ...@@ -1472,37 +1656,265 @@ int32_t smaProcessFetch(SSma *pSma, void *pMsg) {
goto _err; goto _err;
} }
pInfo = tdAcquireRSmaInfoBySuid(pSma, req.suid); pRSmaStat = SMA_RSMA_STAT(pSma);
if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 0) {
SArray *pSubmitArr = NULL;
if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pRSmaStat->execStat, 0);
goto _err;
}
tdRSmaConsumeAndFetch(pSma, req.suid, req.level, pSubmitArr);
atomic_store_8(&pRSmaStat->execStat, 0);
taosArrayDestroy(pSubmitArr);
} else {
int8_t level = req.level;
int8_t *val = taosHashGet(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid));
if (val) {
level |= (*val);
}
ASSERT(level >= 1 && level <= 3);
taosHashPut(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid), &level, sizeof(level));
}
tDecoderClear(&decoder);
smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid,
req.level);
return TSDB_CODE_SUCCESS;
_err:
tDecoderClear(&decoder);
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr());
return TSDB_CODE_FAILED;
}
static void tdFreeRSmaSubmitItems(SArray *pItems) {
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
taosFreeQitem(*(void **)taosArrayGet(pItems, i));
}
}
static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr) {
SRSmaInfo *pInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
if (!pInfo) { if (!pInfo) {
if (terrno == TSDB_CODE_SUCCESS) { return TSDB_CODE_SUCCESS;
terrno = TSDB_CODE_RSMA_EMPTY_INFO;
} }
smaWarn("vgId:%d, failed to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8 " since %s", SMA_VID(pSma),
req.suid, req.level, terrstr()); // step 1: consume submit req
int64_t qMemSize = 0;
if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) {
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
atomic_fetch_sub_64(&pRSmaStat->qBufSize, qMemSize);
taosArrayClear(pSubmitArr);
while (1) {
void *msg = NULL;
taosGetQitem(pInfo->qall, (void **)&msg);
if (msg) {
if (taosArrayPush(pSubmitArr, &msg) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err; goto _err;
} }
} else {
break;
}
}
pItem = RSMA_INFO_ITEM(pInfo, req.level - 1); int32_t size = taosArrayGetSize(pSubmitArr);
if (size > 0) {
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, RSMA_EXEC_TIMEOUT, i) <
0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
}
tdFreeRSmaSubmitItems(pSubmitArr);
}
}
// step 2: fetch rsma result
SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL};
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, req.level - 1); for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (level & i) {
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
if (!taskInfo) {
continue;
}
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err; goto _err;
} }
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid, STREAM_INPUT__DATA_BLOCK) < 0) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, suid) < 0) {
tdCleanupStreamInputDataBlock(taskInfo);
goto _err; goto _err;
} }
tdCleanupStreamInputDataBlock(taskInfo); tdCleanupStreamInputDataBlock(taskInfo);
}
}
_end:
tdReleaseRSmaInfo(pSma, pInfo); tdReleaseRSmaInfo(pSma, pInfo);
tDecoderClear(&decoder);
smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid,
req.level);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
tdReleaseRSmaInfo(pSma, pInfo); tdReleaseRSmaInfo(pSma, pInfo);
tDecoderClear(&decoder); return TSDB_CODE_FAILED;
smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); }
/**
* @brief
*
* @param pSma
* @param type
* @return int32_t
*/
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SHashObj *infoHash = NULL;
SArray *pSubmitQArr = NULL;
SArray *pSubmitArr = NULL;
bool isFetchAll = false;
if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) {
terrno = TSDB_CODE_RSMA_INVALID_STAT;
goto _err;
}
if (type == RSMA_EXEC_OVERFLOW) {
taosRLockLatch(SMA_ENV_LOCK(pEnv));
if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZE) {
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
return TSDB_CODE_SUCCESS;
}
taosRUnLockLatch(SMA_ENV_LOCK(pEnv));
}
if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// step 1: rsma exec - consume data in buffer queue for all suids
SRSmaExecQItem qItem = {0};
void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock
if (type == RSMA_EXEC_OVERFLOW) {
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
if (taosQueueItemSize(pInfo->queue)) {
taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock
qItem.qall = &pInfo->qall;
qItem.pRSmaInfo = pIter;
taosArrayPush(pSubmitQArr, &qItem);
}
ASSERT(taosQueueItemSize(pInfo->queue) == 0);
pIter = taosHashIterate(infoHash, pIter);
}
} else if (type == RSMA_EXEC_COMMIT) {
while (pIter) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pIter;
if (taosQueueItemSize(pInfo->iQueue)) {
taosReadAllQitems(pInfo->iQueue, pInfo->iQall);
qItem.qall = &pInfo->iQall;
qItem.pRSmaInfo = pIter;
taosArrayPush(pSubmitQArr, &qItem);
}
ASSERT(taosQueueItemSize(pInfo->iQueue) == 0);
pIter = taosHashIterate(infoHash, pIter);
}
} else {
ASSERT(0);
}
atomic_store_64(&pRSmaStat->qBufSize, 0);
int32_t qSize = taosArrayGetSize(pSubmitQArr);
for (int32_t i = 0; i < qSize; ++i) {
SRSmaExecQItem *pItem = taosArrayGet(pSubmitQArr, i);
while (1) {
void *msg = NULL;
taosGetQitem(*(STaosQall **)pItem->qall, (void **)&msg);
if (msg) {
if (taosArrayPush(pSubmitArr, &msg) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
} else {
break;
}
}
int32_t size = taosArrayGetSize(pSubmitArr);
if (size > 0) {
SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo;
for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
}
}
tdFreeRSmaSubmitItems(pSubmitArr);
taosArrayClear(pSubmitArr);
}
}
// step 2: rsma fetch - consume data in buffer queue for suids triggered by timer
if (taosHashGetSize(RSMA_FETCH_HASH(pRSmaStat)) <= 0) {
goto _end;
}
pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), NULL);
if (pIter) {
tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr);
while ((pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), pIter))) {
tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr);
}
}
_end:
taosArrayDestroy(pSubmitArr);
taosArrayDestroy(pSubmitQArr);
return TSDB_CODE_SUCCESS;
_err:
taosArrayDestroy(pSubmitArr);
taosArrayDestroy(pSubmitQArr);
return TSDB_CODE_FAILED;
}
/**
* @brief exec rsma level 1data, fetch result of level 2/3 and submit
*
* @param pSma
* @param pMsg
* @return int32_t
*/
int32_t smaProcessExec(SSma *pSma, void *pMsg) {
SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg;
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) {
terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP;
goto _err;
}
smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) {
goto _err;
}
atomic_store_8(&pRSmaStat->execStat, 0);
smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
return TSDB_CODE_SUCCESS;
_err:
atomic_store_8(&pRSmaStat->execStat, 0);
smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(),
terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -139,7 +139,6 @@ static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppBuf) ...@@ -139,7 +139,6 @@ static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppBuf)
smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size); smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size);
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf);
pHdr->type = SNAP_DATA_QTASK; pHdr->type = SNAP_DATA_QTASK;
pHdr->size = size; pHdr->size = size;
...@@ -279,7 +278,8 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit ...@@ -279,7 +278,8 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit
TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (!qTaskF) { if (!qTaskF) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName, tstrerror(code)); smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName,
tstrerror(code));
goto _err; goto _err;
} }
qWriter->pWriteH = qTaskF; qWriter->pWriteH = qTaskF;
...@@ -309,7 +309,7 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) { ...@@ -309,7 +309,7 @@ int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
if (rollback) { if (rollback) {
// TODO: rsma1/rsma2 // TODO: rsma1/rsma2
// qtaskinfo // qtaskinfo
if(pWriter->pQTaskFWriter) { if (pWriter->pQTaskFWriter) {
taosRemoveFile(pWriter->pQTaskFWriter->fname); taosRemoveFile(pWriter->pQTaskFWriter->fname);
} }
} else { } else {
......
...@@ -175,7 +175,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { ...@@ -175,7 +175,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
} }
tdRefSmaStat(pSma, pStat); tdRefSmaStat(pSma, pStat);
pTsmaStat = SMA_TSMA_STAT(pStat); pTsmaStat = SMA_STAT_TSMA(pStat);
if (!pTsmaStat->pTSma) { if (!pTsmaStat->pTSma) {
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
......
...@@ -350,49 +350,45 @@ _err: ...@@ -350,49 +350,45 @@ _err:
} }
/** /**
* @brief pTSchema is shared * @brief Clone qTaskInfo of SRSmaInfo
* *
* @param pSma * @param pSma
* @param pDest * @param pInfo
* @param pSrc
* @return int32_t * @return int32_t
*/ */
int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) { int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
SVnode *pVnode = pSma->pVnode;
SRSmaParam *param = NULL; SRSmaParam *param = NULL;
if (!pSrc) { if (!pInfo) {
*pDest = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0); metaReaderInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid); smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaGetTableEntryByUid(&mr, pSrc->suid) < 0) { if (metaGetTableEntryByUid(&mr, pInfo->suid) < 0) {
smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid,
terrstr()); terrstr());
goto _err; goto _err;
} }
ASSERT(mr.me.type == TSDB_SUPER_TABLE); ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == pSrc->suid); ASSERT(mr.me.uid == pInfo->suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) { if (TABLE_IS_ROLLUP(mr.me.flags)) {
param = &mr.me.stbEntry.rsmaParam; param = &mr.me.stbEntry.rsmaParam;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) { if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) {
goto _err; goto _err;
} }
} }
smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid); smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid);
} else {
terrno = TSDB_CODE_RSMA_INVALID_SCHEMA;
goto _err;
} }
metaReaderClear(&mr); metaReaderClear(&mr);
*pDest = pSrc; // pointer copy
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
*pDest = NULL;
metaReaderClear(&mr); metaReaderClear(&mr);
smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr()); smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
\ No newline at end of file
...@@ -3135,32 +3135,6 @@ void* tsdbGetIvtIdx(SMeta* pMeta) { ...@@ -3135,32 +3135,6 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; } uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
/**
* @brief Get all suids since suid
*
* @param pMeta
* @param suid return all suids in one vnode if suid is 0
* @param list
* @return int32_t
*/
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
if (!pCur) {
return TSDB_CODE_FAILED;
}
while (1) {
tb_uid_t id = metaStbCursorNext(pCur);
if (id == 0) {
break;
}
taosArrayPush(list, &id);
}
metaCloseStbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
// ====================================== EXPOSED APIs ====================================== // ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader,
......
...@@ -78,7 +78,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) { ...@@ -78,7 +78,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) {
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
SVBufPoolNode *pNode; SVBufPoolNode *pNode;
void *p; void *p;
taosThreadSpinLock(&pPool->lock);
if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { if (pPool->node.size >= pPool->ptr - pPool->node.data + size) {
// allocate from the anchor node // allocate from the anchor node
p = pPool->ptr; p = pPool->ptr;
...@@ -89,6 +89,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { ...@@ -89,6 +89,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
pNode = taosMemoryMalloc(sizeof(*pNode) + size); pNode = taosMemoryMalloc(sizeof(*pNode) + size);
if (pNode == NULL) { if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosThreadSpinUnlock(&pPool->lock);
return NULL; return NULL;
} }
...@@ -101,7 +102,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { ...@@ -101,7 +102,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
pPool->size = pPool->size + sizeof(*pNode) + size; pPool->size = pPool->size + sizeof(*pNode) + size;
} }
taosThreadSpinUnlock(&pPool->lock);
return p; return p;
} }
...@@ -129,6 +130,12 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) ...@@ -129,6 +130,12 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
return -1; return -1;
} }
if (taosThreadSpinInit(&pPool->lock, 0) != 0) {
taosMemoryFree(pPool);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pPool->next = NULL; pPool->next = NULL;
pPool->pVnode = pVnode; pPool->pVnode = pVnode;
pPool->nRef = 0; pPool->nRef = 0;
...@@ -145,6 +152,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) ...@@ -145,6 +152,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
static int vnodeBufPoolDestroy(SVBufPool *pPool) { static int vnodeBufPoolDestroy(SVBufPool *pPool) {
vnodeBufPoolReset(pPool); vnodeBufPoolReset(pPool);
taosThreadSpinDestroy(&pPool->lock);
taosMemoryFree(pPool); taosMemoryFree(pPool);
return 0; return 0;
} }
......
...@@ -220,6 +220,10 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -220,6 +220,10 @@ int vnodeCommit(SVnode *pVnode) {
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied); pVnode->state.applied);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
smaAsyncPreCommit(pVnode->pSma);
vnodeBufPoolUnRef(pVnode->inUse); vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL; pVnode->inUse = NULL;
...@@ -237,10 +241,6 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -237,10 +241,6 @@ int vnodeCommit(SVnode *pVnode) {
} }
walBeginSnapshot(pVnode->pWal, pVnode->state.applied); walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
// preCommit
// smaSyncPreCommit(pVnode->pSma);
smaAsyncPreCommit(pVnode->pSma);
// commit each sub-system // commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) { if (metaCommit(pVnode->pMeta) < 0) {
ASSERT(0); ASSERT(0);
......
...@@ -424,6 +424,25 @@ int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) { ...@@ -424,6 +424,25 @@ int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeGetStbIdList(SVnode* pVnode, int64_t suid, SArray* list) {
SMStbCursor* pCur = metaOpenStbCursor(pVnode->pMeta, suid);
if (!pCur) {
return TSDB_CODE_FAILED;
}
while (1) {
tb_uid_t id = metaStbCursorNext(pCur);
if (id == 0) {
break;
}
taosArrayPush(list, &id);
}
metaCloseStbCursor(pCur);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid); SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid);
if (!pCur) { if (!pCur) {
......
...@@ -303,6 +303,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -303,6 +303,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_FETCH_RSMA: case TDMT_VND_FETCH_RSMA:
return smaProcessFetch(pVnode->pSma, pMsg); return smaProcessFetch(pVnode->pSma, pMsg);
case TDMT_VND_EXEC_RSMA:
return smaProcessExec(pVnode->pSma, pMsg);
default: default:
vError("unknown msg type:%d in query queue", pMsg->msgType); vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
...@@ -530,7 +532,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR ...@@ -530,7 +532,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
} }
tqUpdateTbUidList(pVnode->pTq, tbUids, true); tqUpdateTbUidList(pVnode->pTq, tbUids, true);
tdUpdateTbUidList(pVnode->pSma, pStore); if (tdUpdateTbUidList(pVnode->pSma, pStore) < 0) {
goto _exit;
}
tdUidStoreFree(pStore); tdUidStoreFree(pStore);
// prepare rsp // prepare rsp
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
#define TDENGINE_TSIMPLEHASH_H #define TDENGINE_TSIMPLEHASH_H
#include "tarray.h" #include "tarray.h"
#include "tlockfree.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -27,6 +26,10 @@ typedef uint32_t (*_hash_fn_t)(const char *, uint32_t); ...@@ -27,6 +26,10 @@ typedef uint32_t (*_hash_fn_t)(const char *, uint32_t);
typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len); typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len);
typedef void (*_hash_free_fn_t)(void *); typedef void (*_hash_free_fn_t)(void *);
/**
* @brief single thread hash
*
*/
typedef struct SSHashObj SSHashObj; typedef struct SSHashObj SSHashObj;
/** /**
...@@ -36,7 +39,7 @@ typedef struct SSHashObj SSHashObj; ...@@ -36,7 +39,7 @@ typedef struct SSHashObj SSHashObj;
* @param fn hash function to generate the hash value * @param fn hash function to generate the hash value
* @return * @return
*/ */
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t dataLen); SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn);
/** /**
* return the size of hash table * return the size of hash table
...@@ -48,22 +51,26 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj); ...@@ -48,22 +51,26 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj);
int32_t tSimpleHashPrint(const SSHashObj *pHashObj); int32_t tSimpleHashPrint(const SSHashObj *pHashObj);
/** /**
* put element into hash table, if the element with the same key exists, update it * @brief put element into hash table, if the element with the same key exists, update it
*
* @param pHashObj * @param pHashObj
* @param key * @param key
* @param keyLen
* @param data * @param data
* @return * @param dataLen
* @return int32_t
*/ */
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data); int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen);
/** /**
* return the payload data with the specified key * return the payload data with the specified key
* *
* @param pHashObj * @param pHashObj
* @param key * @param key
* @param keyLen
* @return * @return
*/ */
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key); void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen);
/** /**
* remove item with the specified key * remove item with the specified key
...@@ -71,7 +78,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key); ...@@ -71,7 +78,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key);
* @param key * @param key
* @param keyLen * @param keyLen
*/ */
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key); int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen);
/** /**
* Clear the hash table. * Clear the hash table.
...@@ -98,7 +105,7 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj); ...@@ -98,7 +105,7 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj);
* @param keyLen * @param keyLen
* @return * @return
*/ */
void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen); void *tSimpleHashGetKey(void *data, size_t* keyLen);
/** /**
* Create the hash table iterator * Create the hash table iterator
...@@ -109,6 +116,17 @@ void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen); ...@@ -109,6 +116,17 @@ void *tSimpleHashGetKey(const SSHashObj* pHashObj, void *data, size_t* keyLen);
*/ */
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter); void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter);
/**
* Create the hash table iterator
*
* @param pHashObj
* @param data
* @param key
* @param iter
* @return void*
*/
void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -533,8 +533,10 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, ...@@ -533,8 +533,10 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
vnodeGetCtbIdList(pVnode, pScanNode->suid, res); vnodeGetCtbIdList(pVnode, pScanNode->suid, res);
} }
} else { // Create one table group. } else { // Create one table group.
if(metaIsTableExist(metaHandle, tableUid)){
taosArrayPush(res, &tableUid); taosArrayPush(res, &tableUid);
} }
}
if (pTagCond) { if (pTagCond) {
SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond); SColumnInfoData* pColInfoData = getColInfoResult(metaHandle, pListInfo->suid, res, pTagCond);
...@@ -599,7 +601,10 @@ size_t getTableTagsBufLen(const SNodeList* pGroups) { ...@@ -599,7 +601,10 @@ size_t getTableTagsBufLen(const SNodeList* pGroups) {
int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId) { int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0); metaReaderInit(&mr, pMeta, 0);
metaGetTableEntryByUid(&mr, uid); if(metaGetTableEntryByUid(&mr, uid) != 0){ // table not exist
metaReaderClear(&mr);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
SNodeList* groupNew = nodesCloneList(pGroupNode); SNodeList* groupNew = nodesCloneList(pGroupNode);
......
...@@ -55,7 +55,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -55,7 +55,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
ASSERT(numOfBlocks > 1); // ASSERT(numOfBlocks > 1);
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
taosArrayPush(pInfo->pBlockLists, &pReq); taosArrayPush(pInfo->pBlockLists, &pReq);
......
...@@ -2154,7 +2154,7 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo ...@@ -2154,7 +2154,7 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo
static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) { static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) {
int32_t rows = pResBlock->info.rows; int32_t rows = pResBlock->info.rows;
blockDataEnsureCapacity(pResBlock, rows + 1);
// todo set the correct primary timestamp column // todo set the correct primary timestamp column
// output the result // output the result
......
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
*/ */
#include "tsimplehash.h" #include "tsimplehash.h"
#include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h"
#define SHASH_DEFAULT_LOAD_FACTOR 0.75 #define SHASH_DEFAULT_LOAD_FACTOR 0.75
#define HASH_MAX_CAPACITY (1024 * 1024 * 16) #define HASH_MAX_CAPACITY (1024 * 1024 * 16)
...@@ -31,10 +31,14 @@ ...@@ -31,10 +31,14 @@
taosMemoryFreeClear(_n); \ taosMemoryFreeClear(_n); \
} while (0); } while (0);
#pragma pack(push, 4)
typedef struct SHNode { typedef struct SHNode {
struct SHNode *next; struct SHNode *next;
uint32_t keyLen : 20;
uint32_t dataLen : 12;
char data[]; char data[];
} SHNode; } SHNode;
#pragma pack(pop)
struct SSHashObj { struct SSHashObj {
SHNode **hashList; SHNode **hashList;
...@@ -42,8 +46,6 @@ struct SSHashObj { ...@@ -42,8 +46,6 @@ struct SSHashObj {
int64_t size; // number of elements in hash table int64_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function _hash_fn_t hashFp; // hash function
_equal_fn_t equalFp; // equal function _equal_fn_t equalFp; // equal function
int32_t keyLen;
int32_t dataLen;
}; };
static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
...@@ -54,7 +56,7 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { ...@@ -54,7 +56,7 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
return i; return i;
} }
SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t dataLen) { SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) {
ASSERT(fn != NULL); ASSERT(fn != NULL);
if (capacity == 0) { if (capacity == 0) {
...@@ -74,8 +76,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t ...@@ -74,8 +76,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn, size_t keyLen, size_t
pHashObj->hashFp = fn; pHashObj->hashFp = fn;
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->keyLen = keyLen;
pHashObj->dataLen = dataLen;
pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *)); pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *));
if (!pHashObj->hashList) { if (!pHashObj->hashList) {
...@@ -93,40 +93,41 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) { ...@@ -93,40 +93,41 @@ int32_t tSimpleHashGetSize(const SSHashObj *pHashObj) {
return (int32_t)atomic_load_64((int64_t *)&pHashObj->size); return (int32_t)atomic_load_64((int64_t *)&pHashObj->size);
} }
static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { static SHNode *doCreateHashNode(const void *key, size_t keyLen, const void *data, size_t dataLen, uint32_t hashVal) {
SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dsize); SHNode *pNewNode = taosMemoryMalloc(sizeof(SHNode) + keyLen + dataLen);
if (!pNewNode) { if (!pNewNode) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pNewNode->keyLen = keyLen;
pNewNode->dataLen = dataLen;
pNewNode->next = NULL; pNewNode->next = NULL;
memcpy(GET_SHASH_NODE_DATA(pNewNode), pData, dsize); memcpy(GET_SHASH_NODE_DATA(pNewNode), data, dataLen);
memcpy(GET_SHASH_NODE_KEY(pNewNode, dsize), key, keyLen); memcpy(GET_SHASH_NODE_KEY(pNewNode, dataLen), key, keyLen);
return pNewNode; return pNewNode;
} }
static void taosHashTableResize(SSHashObj *pHashObj) { static void tSimpleHashTableResize(SSHashObj *pHashObj) {
if (!SHASH_NEED_RESIZE(pHashObj)) { if (!SHASH_NEED_RESIZE(pHashObj)) {
return; return;
} }
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u); int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
if (newCapacity > HASH_MAX_CAPACITY) { if (newCapacity > HASH_MAX_CAPACITY) {
// uDebug("current capacity:%zu, maximum capacity:%d, no resize applied due to limitation is reached", uDebug("current capacity:%zu, maximum capacity:%" PRIu64 ", no resize applied due to limitation is reached",
// pHashObj->capacity, HASH_MAX_CAPACITY); pHashObj->capacity, HASH_MAX_CAPACITY);
return; return;
} }
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity); void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
if (!pNewEntryList) { if (!pNewEntryList) {
// qWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity); uWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
return; return;
} }
size_t inc = newCapacity - pHashObj->capacity; size_t inc = newCapacity - pHashObj->capacity;
memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc); memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc * sizeof(void *));
pHashObj->hashList = pNewEntryList; pHashObj->hashList = pNewEntryList;
pHashObj->capacity = newCapacity; pHashObj->capacity = newCapacity;
...@@ -141,8 +142,8 @@ static void taosHashTableResize(SSHashObj *pHashObj) { ...@@ -141,8 +142,8 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
SHNode *pPrev = NULL; SHNode *pPrev = NULL;
while (pNode != NULL) { while (pNode != NULL) {
void *key = GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen); void *key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pNode->keyLen);
int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity); int32_t newIdx = HASH_INDEX(hashVal, pHashObj->capacity);
pNext = pNode->next; pNext = pNode->next;
...@@ -170,23 +171,23 @@ static void taosHashTableResize(SSHashObj *pHashObj) { ...@@ -170,23 +171,23 @@ static void taosHashTableResize(SSHashObj *pHashObj) {
// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0); // ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
} }
int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t dataLen) {
if (!pHashObj || !key) { if (!pHashObj || !key) {
return -1; return -1;
} }
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
// need the resize process, write lock applied // need the resize process, write lock applied
if (SHASH_NEED_RESIZE(pHashObj)) { if (SHASH_NEED_RESIZE(pHashObj)) {
taosHashTableResize(pHashObj); tSimpleHashTableResize(pHashObj);
} }
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHNode *pNode = pHashObj->hashList[slot]; SHNode *pNode = pHashObj->hashList[slot];
if (!pNode) { if (!pNode) {
SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal); SHNode *pNewNode = doCreateHashNode(key, keyLen, data, dataLen, hashVal);
if (!pNewNode) { if (!pNewNode) {
return -1; return -1;
} }
...@@ -197,14 +198,14 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { ...@@ -197,14 +198,14 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
} }
while (pNode) { while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) { if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
break; break;
} }
pNode = pNode->next; pNode = pNode->next;
} }
if (!pNode) { if (!pNode) {
SHNode *pNewNode = doCreateHashNode(key, pHashObj->keyLen, data, pHashObj->dataLen, hashVal); SHNode *pNewNode = doCreateHashNode(key, keyLen, data, dataLen, hashVal);
if (!pNewNode) { if (!pNewNode) {
return -1; return -1;
} }
...@@ -212,16 +213,16 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) { ...@@ -212,16 +213,16 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, const void *data) {
pHashObj->hashList[slot] = pNewNode; pHashObj->hashList[slot] = pNewNode;
atomic_add_fetch_64(&pHashObj->size, 1); atomic_add_fetch_64(&pHashObj->size, 1);
} else { // update data } else { // update data
memcpy(GET_SHASH_NODE_DATA(pNode), data, pHashObj->dataLen); memcpy(GET_SHASH_NODE_DATA(pNode), data, dataLen);
} }
return 0; return 0;
} }
static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, int32_t index) { static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, size_t keyLen, int32_t index) {
SHNode *pNode = pHashObj->hashList[index]; SHNode *pNode = pHashObj->hashList[index];
while (pNode) { while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) { if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
break; break;
} }
...@@ -233,12 +234,12 @@ static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void ...@@ -233,12 +234,12 @@ static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void
static FORCE_INLINE bool taosHashTableEmpty(const SSHashObj *pHashObj) { return tSimpleHashGetSize(pHashObj) == 0; } static FORCE_INLINE bool taosHashTableEmpty(const SSHashObj *pHashObj) { return tSimpleHashGetSize(pHashObj) == 0; }
void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { void *tSimpleHashGet(SSHashObj *pHashObj, const void *key, size_t keyLen) {
if (!pHashObj || taosHashTableEmpty(pHashObj) || !key) { if (!pHashObj || taosHashTableEmpty(pHashObj) || !key) {
return NULL; return NULL;
} }
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHNode *pNode = pHashObj->hashList[slot]; SHNode *pNode = pHashObj->hashList[slot];
...@@ -247,7 +248,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { ...@@ -247,7 +248,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
} }
char *data = NULL; char *data = NULL;
pNode = doSearchInEntryList(pHashObj, key, slot); pNode = doSearchInEntryList(pHashObj, key, keyLen, slot);
if (pNode != NULL) { if (pNode != NULL) {
data = GET_SHASH_NODE_DATA(pNode); data = GET_SHASH_NODE_DATA(pNode);
} }
...@@ -255,19 +256,19 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) { ...@@ -255,19 +256,19 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
return data; return data;
} }
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key) { int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key, size_t keyLen) {
if (!pHashObj || !key) { if (!pHashObj || !key) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)pHashObj->keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, (uint32_t)keyLen);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHNode *pNode = pHashObj->hashList[slot]; SHNode *pNode = pHashObj->hashList[slot];
SHNode *pPrev = NULL; SHNode *pPrev = NULL;
while (pNode) { while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pHashObj->dataLen), key, pHashObj->keyLen) == 0) { if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
if (!pPrev) { if (!pPrev) {
pHashObj->hashList[slot] = pNode->next; pHashObj->hashList[slot] = pNode->next;
} else { } else {
...@@ -312,6 +313,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) { ...@@ -312,6 +313,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj) {
tSimpleHashClear(pHashObj); tSimpleHashClear(pHashObj);
taosMemoryFreeClear(pHashObj->hashList); taosMemoryFreeClear(pHashObj->hashList);
taosMemoryFree(pHashObj);
} }
size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) { size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
...@@ -322,26 +324,54 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) { ...@@ -322,26 +324,54 @@ size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj) {
return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj); return (pHashObj->capacity * sizeof(void *)) + sizeof(SHNode) * tSimpleHashGetSize(pHashObj) + sizeof(SSHashObj);
} }
void *tSimpleHashGetKey(const SSHashObj *pHashObj, void *data, size_t *keyLen) { void *tSimpleHashGetKey(void *data, size_t *keyLen) {
#if 0 SHNode *node = (SHNode *)((char *)data - offsetof(SHNode, data));
int32_t offset = offsetof(SHNode, data);
SHNode *node = ((SHNode *)(char *)data - offset);
if (keyLen) { if (keyLen) {
*keyLen = pHashObj->keyLen; *keyLen = node->keyLen;
} }
return POINTER_SHIFT(data, pHashObj->dataLen); return POINTER_SHIFT(data, node->dataLen);
}
return GET_SHASH_NODE_KEY(node, pHashObj->dataLen); void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
#endif if (!pHashObj) {
if (keyLen) { return NULL;
*keyLen = pHashObj->keyLen; }
SHNode *pNode = NULL;
if (!data) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i];
if (!pNode) {
continue;
}
*iter = i;
return GET_SHASH_NODE_DATA(pNode);
}
return NULL;
} }
return POINTER_SHIFT(data, pHashObj->dataLen); pNode = (SHNode *)((char *)data - offsetof(SHNode, data));
if (pNode->next) {
return GET_SHASH_NODE_DATA(pNode->next);
}
++(*iter);
for (int32_t i = *iter; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i];
if (!pNode) {
continue;
}
*iter = i;
return GET_SHASH_NODE_DATA(pNode);
}
return NULL;
} }
void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { void *tSimpleHashIterateKV(const SSHashObj *pHashObj, void *data, void **key, int32_t *iter) {
if (!pHashObj) { if (!pHashObj) {
return NULL; return NULL;
} }
...@@ -355,6 +385,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { ...@@ -355,6 +385,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
continue; continue;
} }
*iter = i; *iter = i;
if (key) {
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
}
return GET_SHASH_NODE_DATA(pNode); return GET_SHASH_NODE_DATA(pNode);
} }
return NULL; return NULL;
...@@ -363,6 +396,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { ...@@ -363,6 +396,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
pNode = (SHNode *)((char *)data - offsetof(SHNode, data)); pNode = (SHNode *)((char *)data - offsetof(SHNode, data));
if (pNode->next) { if (pNode->next) {
if (key) {
*key = GET_SHASH_NODE_KEY(pNode->next, pNode->next->dataLen);
}
return GET_SHASH_NODE_DATA(pNode->next); return GET_SHASH_NODE_DATA(pNode->next);
} }
...@@ -373,6 +409,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) { ...@@ -373,6 +409,9 @@ void *tSimpleHashIterate(const SSHashObj *pHashObj, void *data, int32_t *iter) {
continue; continue;
} }
*iter = i; *iter = i;
if (key) {
*key = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
}
return GET_SHASH_NODE_DATA(pNode); return GET_SHASH_NODE_DATA(pNode);
} }
......
...@@ -32,31 +32,33 @@ ...@@ -32,31 +32,33 @@
TEST(testCase, tSimpleHashTest) { TEST(testCase, tSimpleHashTest) {
SSHashObj *pHashObj = SSHashObj *pHashObj =
tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), sizeof(int64_t), sizeof(int64_t)); tSimpleHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
assert(pHashObj != nullptr); assert(pHashObj != nullptr);
ASSERT_EQ(0, tSimpleHashGetSize(pHashObj)); ASSERT_EQ(0, tSimpleHashGetSize(pHashObj));
size_t keyLen = sizeof(int64_t);
size_t dataLen = sizeof(int64_t);
int64_t originKeySum = 0; int64_t originKeySum = 0;
for (int64_t i = 1; i <= 100; ++i) { for (int64_t i = 1; i <= 100; ++i) {
originKeySum += i; originKeySum += i;
tSimpleHashPut(pHashObj, (const void *)&i, (const void *)&i); tSimpleHashPut(pHashObj, (const void *)&i, keyLen, (const void *)&i, dataLen);
ASSERT_EQ(i, tSimpleHashGetSize(pHashObj)); ASSERT_EQ(i, tSimpleHashGetSize(pHashObj));
} }
for (int64_t i = 1; i <= 100; ++i) { for (int64_t i = 1; i <= 100; ++i) {
void *data = tSimpleHashGet(pHashObj, (const void *)&i); void *data = tSimpleHashGet(pHashObj, (const void *)&i, keyLen);
ASSERT_EQ(i, *(int64_t *)data); ASSERT_EQ(i, *(int64_t *)data);
} }
void *data = NULL; void *data = NULL;
int32_t iter = 0; int32_t iter = 0;
int64_t keySum = 0; int64_t keySum = 0;
int64_t dataSum = 0; int64_t dataSum = 0;
while ((data = tSimpleHashIterate(pHashObj, data, &iter))) { while ((data = tSimpleHashIterate(pHashObj, data, &iter))) {
void *key = tSimpleHashGetKey(pHashObj, data, NULL); void *key = tSimpleHashGetKey(data, NULL);
keySum += *(int64_t *)key; keySum += *(int64_t *)key;
dataSum += *(int64_t *)data; dataSum += *(int64_t *)data;
} }
...@@ -65,7 +67,7 @@ TEST(testCase, tSimpleHashTest) { ...@@ -65,7 +67,7 @@ TEST(testCase, tSimpleHashTest) {
ASSERT_EQ(keySum, originKeySum); ASSERT_EQ(keySum, originKeySum);
for (int64_t i = 1; i <= 100; ++i) { for (int64_t i = 1; i <= 100; ++i) {
tSimpleHashRemove(pHashObj, (const void *)&i); tSimpleHashRemove(pHashObj, (const void *)&i, keyLen);
ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj)); ASSERT_EQ(100 - i, tSimpleHashGetSize(pHashObj));
} }
......
...@@ -237,7 +237,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode); ...@@ -237,7 +237,7 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode);
bool syncNodeHasSnapshot(SSyncNode* pSyncNode); bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode);
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode); SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode);
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm); int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
......
...@@ -148,12 +148,35 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { ...@@ -148,12 +148,35 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); int64_t peerStartTime = syncIndexMgrGetStartTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]); int64_t peerRecvTime = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->peersId)[i]);
SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
int64_t recvTimeDiff = TABS(peerRecvTime - timeNow);
int64_t startTimeDiff = TABS(peerStartTime - pSyncNode->startTime);
int64_t logDiff = TABS(peerMatchIndex - syncNodeGetLastIndex(pSyncNode));
/*
int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow); int64_t recvTimeDiff = syncNodeAbs64(peerRecvTime, timeNow);
int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime); int64_t startTimeDiff = syncNodeAbs64(peerStartTime, pSyncNode->startTime);
int64_t logDiff = syncNodeAbs64(peerMatchIndex, syncNodeGetLastIndex(pSyncNode));
*/
int32_t addQuorum = 0; int32_t addQuorum = 0;
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
if (startTimeDiff < SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 1;
} else {
if (logDiff < SYNC_ADD_QUORUM_COUNT) {
addQuorum = 1;
} else {
addQuorum = 0;
}
}
} else {
addQuorum = 0;
}
/*
if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) { if (recvTimeDiff < SYNC_MAX_RECV_TIME_RANGE_MS) {
addQuorum = 1; addQuorum = 1;
} else { } else {
...@@ -163,6 +186,7 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { ...@@ -163,6 +186,7 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) {
if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) { if (startTimeDiff > SYNC_MAX_START_TIME_RANGE_MS) {
addQuorum = 0; addQuorum = 0;
} }
*/
quorum += addQuorum; quorum += addQuorum;
} }
......
...@@ -2284,7 +2284,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { ...@@ -2284,7 +2284,7 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
// return max(logLastIndex, snapshotLastIndex) // return max(logLastIndex, snapshotLastIndex)
// if no snapshot and log, return -1 // if no snapshot and log, return -1
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
...@@ -2773,10 +2773,26 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p ...@@ -2773,10 +2773,26 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
return 0; return 0;
} }
if (pEntry->term < ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "little term:%lu, can not do leader transfer", pEntry->term);
syncNodeEventLog(ths, logBuf);
return 0;
}
if (pEntry->index < syncNodeGetLastIndex(ths)) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "little index:%ld, can not do leader transfer", pEntry->index);
syncNodeEventLog(ths, logBuf);
return 0;
}
/*
if (ths->vgId > 1) { if (ths->vgId > 1) {
syncNodeEventLog(ths, "I am vnode, can not do leader transfer"); syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
return 0; return 0;
} }
*/
do { do {
char logBuf[128]; char logBuf[128];
......
...@@ -617,6 +617,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted ...@@ -617,6 +617,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FILE_CORRUPTED, "Rsma file corrupted
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema")
//index //index
TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册