提交 dd817708 编写于 作者: C Cary Xu

fix: rsma ref fix and add debug logs

上级 301edfe7
...@@ -27,7 +27,6 @@ target_sources( ...@@ -27,7 +27,6 @@ target_sources(
"src/meta/metaSnapshot.c" "src/meta/metaSnapshot.c"
# sma # sma
"src/sma/sma.c"
"src/sma/smaEnv.c" "src/sma/smaEnv.c"
"src/sma/smaUtil.c" "src/sma/smaUtil.c"
"src/sma/smaOpen.c" "src/sma/smaOpen.c"
......
...@@ -47,7 +47,8 @@ struct SSmaEnv { ...@@ -47,7 +47,8 @@ struct SSmaEnv {
}; };
typedef struct { typedef struct {
int32_t smaRef; int8_t inited;
int32_t rsetId;
} SSmaMgmt; } SSmaMgmt;
#define SMA_ENV_LOCK(env) ((env)->lock) #define SMA_ENV_LOCK(env) ((env)->lock)
...@@ -95,6 +96,7 @@ enum { ...@@ -95,6 +96,7 @@ enum {
TASK_TRIGGER_STAT_CANCELLED = 4, TASK_TRIGGER_STAT_CANCELLED = 4,
TASK_TRIGGER_STAT_FINISHED = 5, TASK_TRIGGER_STAT_FINISHED = 5,
}; };
void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
...@@ -104,6 +106,10 @@ int32_t tdInsertRSmaData(SSma *pSma, char *msg); ...@@ -104,6 +106,10 @@ int32_t tdInsertRSmaData(SSma *pSma, char *msg);
int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat);
int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat);
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln);
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
int32_t tdLockSma(SSma *pSma); int32_t tdLockSma(SSma *pSma);
......
...@@ -163,6 +163,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool ...@@ -163,6 +163,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
const char* stbFullName, int32_t vgId); const char* stbFullName, int32_t vgId);
// sma // sma
int32_t smaInit();
void smaCleanUp();
int32_t smaOpen(SVnode* pVnode); int32_t smaOpen(SVnode* pVnode);
int32_t smaClose(SSma* pSma); int32_t smaClose(SSma* pSma);
int32_t smaBegin(SSma* pSma); int32_t smaBegin(SSma* pSma);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
// functions for external invocation
// TODO: Who is responsible for resource allocate and release?
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) {
smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* days) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days)) < 0) {
smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
}
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
return code;
}
// functions for internal invocation
#if 0
/**
* @brief TODO: Assume that the final generated result it less than 3M
*
* @param pReq
* @param pDataBlocks
* @param vgId
* @param suid // TODO: check with Liao whether suid response is reasonable
*
* TODO: colId should be set
*/
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid, const char* stbName, bool isCreateCtb) {
int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; ++i) {
SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info;
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(pBlkInfo->numOfCols));
bufSize += sizeof(SSubmitBlk);
}
*pReq = taosMemoryCalloc(1, bufSize);
if (!(*pReq)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
void* pDataBuf = *pReq;
SArray* pTagArray = NULL;
int32_t msgLen = sizeof(SSubmitReq);
int32_t numOfBlks = 0;
int32_t schemaLen = 0;
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
SDataBlockInfo* pDataBlkInfo = &pDataBlock->info;
int32_t colNum = pDataBlkInfo->numOfCols;
int32_t rows = pDataBlkInfo->rows;
int32_t rowSize = pDataBlkInfo->rowSize;
int64_t groupId = pDataBlkInfo->groupId;
if (rb.nCols != colNum) {
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
}
if(isCreateCtb) {
SMetaReader mr = {0};
const char* ctbName = buildCtbNameByGroupId(stbName, pDataBlock->info.groupId);
if (metaGetTableEntryByName(&mr, ctbName) != 0) {
smaDebug("vgId:%d, no tsma ctb %s exists", vgId, ctbName);
}
SVCreateTbReq ctbReq = {0};
ctbReq.name = ctbName;
ctbReq.type = TSDB_CHILD_TABLE;
ctbReq.ctb.suid = suid;
STagVal tagVal = {.cid = colNum + PRIMARYKEY_TIMESTAMP_COL_ID,
.type = TSDB_DATA_TYPE_BIGINT,
.i64 = groupId};
STag* pTag = NULL;
if(!pTagArray) {
pTagArray = taosArrayInit(1, sizeof(STagVal));
if (!pTagArray) goto _err;
}
taosArrayClear(pTagArray);
taosArrayPush(pTagArray, &tagVal);
tTagNew(pTagArray, 1, false, &pTag);
if (pTag == NULL) {
tdDestroySVCreateTbReq(&ctbReq);
goto _err;
}
ctbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &ctbReq, schemaLen, code);
tdDestroySVCreateTbReq(&ctbReq);
if (code < 0) {
goto _err;
}
}
SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen);
pSubmitBlk->suid = suid;
pSubmitBlk->uid = groupId;
pSubmitBlk->numOfRows = rows;
msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0;
for (int32_t j = 0; j < rows; ++j) { // iterate by row
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
bool isStartKey = false;
int32_t offset = 0;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
STColumn* pCol = &pTSchema->columns[k];
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
if (!isStartKey) {
isStartKey = true;
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
offset, k);
} else {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var,
true, offset, k);
}
break;
case TSDB_DATA_TYPE_NCHAR: {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true,
offset, k);
break;
}
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true,
offset, k);
break;
}
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_MEDIUMBLOB:
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
TASSERT(0);
break;
default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
if (pCol->type == pColInfoData->info.type) {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset,
k);
} else {
char tv[8] = {0};
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else {
uint64_t v = 0;
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
}
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset,
k);
}
} else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
TASSERT(0);
}
break;
}
offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation
}
dataLen += TD_ROW_LEN(rb.pBuf);
#ifdef TD_DEBUG_PRINT_ROW
tdSRowPrint(rb.pBuf, pTSchema, __func__);
#endif
}
++numOfBlks;
pSubmitBlk->dataLen = dataLen;
msgLen += pSubmitBlk->dataLen;
}
(*pReq)->length = msgLen;
(*pReq)->header.vgId = htonl(vgId);
(*pReq)->header.contLen = htonl(msgLen);
(*pReq)->length = (*pReq)->header.contLen;
(*pReq)->numOfBlocks = htonl(numOfBlks);
SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
while (numOfBlks--) {
int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(*pReq);
taosArrayDestroy(pTagArray);
return TSDB_CODE_FAILED;
}
#endif
...@@ -121,7 +121,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { ...@@ -121,7 +121,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) {
// step 3: perform persist task for qTaskInfo // step 3: perform persist task for qTaskInfo
tdRSmaPersistExecImpl(pRSmaStat); tdRSmaPersistExecImpl(pRSmaStat);
smaDebug("vgId:%d, rsma pre commit succeess", SMA_VID(pSma)); smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -173,6 +173,7 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) { ...@@ -173,6 +173,7 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) {
} }
if ((pDir = taosOpenDir(dir)) == NULL) { if ((pDir = taosOpenDir(dir)) == NULL) {
regfree(&regex);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
smaWarn("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr()); smaWarn("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
#define RSMA_TASK_INFO_HASH_SLOT 8 #define RSMA_TASK_INFO_HASH_SLOT 8
#define SMA_MGMT_REF_NUM 1024 #define SMA_MGMT_REF_NUM 10240
extern SSmaMgmt smaMgmt; extern SSmaMgmt smaMgmt;
...@@ -30,7 +30,62 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaE ...@@ -30,7 +30,62 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaE
static void *tdFreeTSmaStat(STSmaStat *pStat); static void *tdFreeTSmaStat(STSmaStat *pStat);
static void tdDestroyRSmaStat(void *pRSmaStat); static void tdDestroyRSmaStat(void *pRSmaStat);
/**
* @brief rsma init
*
* @return int32_t
*/
// implementation // implementation
int32_t smaInit() {
int8_t old;
int32_t nLoops = 0;
while (1) {
old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2);
if (old != 2) break;
if (++nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
if (old == 0) {
smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat);
if (smaMgmt.rsetId < 0) {
smaError("failed to init sma rset since %s", terrstr());
atomic_store_8(&smaMgmt.inited, 0);
return TSDB_CODE_FAILED;
}
smaInfo("sma rset is initialized, rsetId:%d", smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 1);
}
return TSDB_CODE_SUCCESS;
}
/**
* @brief rsma cleanup
*
*/
void smaCleanUp() {
int8_t old;
int32_t nLoops = 0;
while (1) {
old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2);
if (old != 2) break;
if (++nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
if (old == 1) {
smaInfo("sma rset is cleaned up, resetId:%d", smaMgmt.rsetId);
taosCloseRef(smaMgmt.rsetId);
atomic_store_8(&smaMgmt.inited, 0);
}
}
static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) { static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) {
SSmaEnv *pEnv = NULL; SSmaEnv *pEnv = NULL;
...@@ -135,17 +190,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS ...@@ -135,17 +190,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT);
// init smaMgmt // init smaMgmt
smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat); smaInit();
if (smaMgmt.smaRef < 0) {
smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
int64_t refId = taosAddRef(smaMgmt.smaRef, pRSmaStat); int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat);
if (refId < 0) { if (refId < 0) {
smaError("taosAddRef smaRef failed, since:%s", tstrerror(terrno)); smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma),
refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} else {
smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId,
smaMgmt.rsetId, SMA_MGMT_REF_NUM);
} }
pRSmaStat->refId = refId; pRSmaStat->refId = refId;
...@@ -275,8 +329,13 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { ...@@ -275,8 +329,13 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat)); tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat));
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) { } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat);
if (taosRemoveRef(smaMgmt.smaRef, RSMA_REF_ID(pRSmaStat)) < 0) { if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) {
smaError("remove refId from rsmaRef:0x%" PRIx64 " failed since %s", RSMA_REF_ID(pRSmaStat), terrstr()); smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", SMA_VID(pRSmaStat->pSma),
RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId, terrstr());
ASSERT(0);
} else {
smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", SMA_VID(pRSmaStat->pSma),
RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId);
} }
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -323,7 +382,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { ...@@ -323,7 +382,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
} }
break; break;
default: default:
TASSERT(0); smaError("vgId:%d undefined smaType:%", SMA_VID(pSma), smaType);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
......
...@@ -19,7 +19,8 @@ ...@@ -19,7 +19,8 @@
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
SSmaMgmt smaMgmt = { SSmaMgmt smaMgmt = {
.smaRef = -1, .inited = 0,
.rsetId = -1,
}; };
#define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver" #define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver"
...@@ -608,9 +609,11 @@ _err: ...@@ -608,9 +609,11 @@ _err:
static void tdRSmaFetchTrigger(void *param, void *tmrId) { static void tdRSmaFetchTrigger(void *param, void *tmrId) {
SRSmaInfoItem *pItem = param; SRSmaInfoItem *pItem = param;
SSma *pSma = NULL; SSma *pSma = NULL;
SRSmaStat *pStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, pItem->refId); SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
if (!pStat) { if (!pStat) {
smaDebug("rsma fetch task not start since already destroyed"); smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
pItem->refId);
return; return;
} }
...@@ -622,9 +625,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -622,9 +625,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_PAUSED:
case TASK_TRIGGER_STAT_CANCELLED: case TASK_TRIGGER_STAT_CANCELLED:
case TASK_TRIGGER_STAT_FINISHED: { case TASK_TRIGGER_STAT_FINISHED: {
taosReleaseRef(smaMgmt.smaRef, pItem->refId); tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled", smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is %" PRIi8
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid); ", rsetId rsetId:%" PRIi64 " refId:%d",
SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId);
return; return;
} }
default: default:
...@@ -665,7 +669,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { ...@@ -665,7 +669,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
} }
_end: _end:
taosReleaseRef(smaMgmt.smaRef, pItem->refId); tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__);
} }
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid,
...@@ -1258,7 +1262,8 @@ _end: ...@@ -1258,7 +1262,8 @@ _end:
} }
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId); smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), smaMgmt.rsetId, pRSmaStat->refId);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__);
taosThreadExit(NULL); taosThreadExit(NULL);
return NULL; return NULL;
} }
...@@ -1283,7 +1288,9 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { ...@@ -1283,7 +1288,9 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)));
} }
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId); smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d)", SMA_VID(pRSmaStat->pSma), smaMgmt.rsetId,
pRSmaStat->refId);
tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__);
} }
taosThreadAttrDestroy(&thAttr); taosThreadAttrDestroy(&thAttr);
...@@ -1297,8 +1304,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { ...@@ -1297,8 +1304,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) {
*/ */
static void tdRSmaPersistTrigger(void *param, void *tmrId) { static void tdRSmaPersistTrigger(void *param, void *tmrId) {
SRSmaStat *rsmaStat = param; SRSmaStat *rsmaStat = param;
SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, rsmaStat->refId); SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.rsetId, rsmaStat->refId);
ASSERT(0);
if (!pRSmaStat) { if (!pRSmaStat) {
smaDebug("rsma persistence task not start since already destroyed"); smaDebug("rsma persistence task not start since already destroyed");
return; return;
...@@ -1341,5 +1348,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { ...@@ -1341,5 +1348,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat);
} break; } break;
} }
taosReleaseRef(smaMgmt.smaRef, rsmaStat->refId); taosReleaseRef(smaMgmt.rsetId, rsmaStat->refId);
} }
...@@ -20,6 +20,36 @@ ...@@ -20,6 +20,36 @@
#define SMA_STORAGE_MINUTES_DAY 1440 #define SMA_STORAGE_MINUTES_DAY 1440
#define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file #define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file
// TODO: Who is responsible for resource allocate and release?
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) {
smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno));
}
// TODO: destroy SSDataBlocks(msg)
return code;
}
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days)) < 0) {
smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno));
}
smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days);
return code;
}
/** /**
* @brief Judge the tsma file split days * @brief Judge the tsma file split days
* *
......
...@@ -294,4 +294,23 @@ int32_t tdRemoveTFile(STFile *pTFile) { ...@@ -294,4 +294,23 @@ int32_t tdRemoveTFile(STFile *pTFile) {
} }
// smaXXXUtil ================ // smaXXXUtil ================
void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) {
void *pResult = taosAcquireRef(rsetId, refId);
if (!pResult) {
smaWarn("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr());
} else {
smaDebug("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId);
}
return pResult;
}
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) {
if (taosReleaseRef(rsetId, refId) < 0) {
smaWarn("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr());
return TSDB_CODE_FAILED;
}
smaDebug("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId);
return TSDB_CODE_SUCCESS;
}
// ... // ...
\ No newline at end of file
...@@ -100,6 +100,7 @@ void vnodeCleanup() { ...@@ -100,6 +100,7 @@ void vnodeCleanup() {
walCleanUp(); walCleanUp();
tqCleanUp(); tqCleanUp();
smaCleanUp();
} }
int vnodeScheduleTask(int (*execute)(void*), void* arg) { int vnodeScheduleTask(int (*execute)(void*), void* arg) {
......
...@@ -162,28 +162,28 @@ class TDTestCase: ...@@ -162,28 +162,28 @@ class TDTestCase:
keyDict['h'] = 'abc' keyDict['h'] = 'abc'
retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -h %s test suceess"%keyDict['h']) tdLog.info("taos -h %s test success"%keyDict['h'])
else: else:
tdLog.exit("taos -h %s fail"%keyDict['h']) tdLog.exit("taos -h %s fail"%keyDict['h'])
keyDict['h'] = '\'abc\'' keyDict['h'] = '\'abc\''
retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -h %s test suceess"%keyDict['h']) tdLog.info("taos -h %s test success"%keyDict['h'])
else: else:
tdLog.exit("taos -h %s fail"%keyDict['h']) tdLog.exit("taos -h %s fail"%keyDict['h'])
keyDict['h'] = '3' keyDict['h'] = '3'
retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -h %s test suceess"%keyDict['h']) tdLog.info("taos -h %s test success"%keyDict['h'])
else: else:
tdLog.exit("taos -h %s fail"%keyDict['h']) tdLog.exit("taos -h %s fail"%keyDict['h'])
keyDict['h'] = '\'3\'' keyDict['h'] = '\'3\''
retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -h %s test suceess"%keyDict['h']) tdLog.info("taos -h %s test success"%keyDict['h'])
else: else:
tdLog.exit("taos -h %s fail"%keyDict['h']) tdLog.exit("taos -h %s fail"%keyDict['h'])
...@@ -193,42 +193,42 @@ class TDTestCase: ...@@ -193,42 +193,42 @@ class TDTestCase:
keyDict['P'] = 'abc' keyDict['P'] = 'abc'
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Invalid port" in retVal): if (retCode == "TAOS_FAIL") and ("Invalid port" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
keyDict['P'] = '\'abc\'' keyDict['P'] = '\'abc\''
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Invalid port" in retVal): if (retCode == "TAOS_FAIL") and ("Invalid port" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
keyDict['P'] = '3' keyDict['P'] = '3'
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
keyDict['P'] = '\'3\'' keyDict['P'] = '\'3\''
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
keyDict['P'] = '12ab' keyDict['P'] = '12ab'
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
keyDict['P'] = '\'12ab\'' keyDict['P'] = '\'12ab\''
retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '')
if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal):
tdLog.info("taos -P %s test suceess"%keyDict['P']) tdLog.info("taos -P %s test success"%keyDict['P'])
else: else:
tdLog.exit("taos -P %s fail"%keyDict['P']) tdLog.exit("taos -P %s fail"%keyDict['P'])
...@@ -293,7 +293,7 @@ class TDTestCase: ...@@ -293,7 +293,7 @@ class TDTestCase:
keyDict['p'] = 'errorPassword' keyDict['p'] = 'errorPassword'
retCode, retVal = taos_command(buildPath, "u", keyDict['u'], "taos>", keyDict['c'], sqlString, 'p', keyDict['p']) retCode, retVal = taos_command(buildPath, "u", keyDict['u'], "taos>", keyDict['c'], sqlString, 'p', keyDict['p'])
if retCode == "TAOS_FAIL" and "Authentication failure" in retVal: if retCode == "TAOS_FAIL" and "Authentication failure" in retVal:
tdLog.info("taos -p %s test suceess"%keyDict['p']) tdLog.info("taos -p %s test success"%keyDict['p'])
else: else:
tdLog.exit("taos -u %s -p %s"%(keyDict['u'], keyDict['p'])) tdLog.exit("taos -u %s -p %s"%(keyDict['u'], keyDict['p']))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册