提交 eff9f9be 编写于 作者: D dapan1121

add sma update

上级 17f62750
......@@ -1136,8 +1136,8 @@ int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp
void tFreeSTableMetaRsp(STableMetaRsp* pRsp);
typedef struct {
SArray* pArray; // Array of STableMetaRsp
STableIndexRsp* pIndexRsp;
SArray* pMetaRsp; // Array of STableMetaRsp
SArray* pIndexRsp; // Array of STableIndexRsp;
} SSTbHbRsp;
int32_t tSerializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
......@@ -2514,6 +2514,8 @@ typedef struct {
} STableIndexInfo;
typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t suid;
int32_t version;
SArray* pIndex;
......
......@@ -105,9 +105,9 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
return -1;
}
int32_t numOfBatchs = taosArrayGetSize(hbRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
STableMetaRsp *rsp = taosArrayGet(hbRsp.pArray, i);
int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
if (rsp->numOfColumns < 0) {
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
......@@ -124,6 +124,16 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
}
}
int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
catalogUpdateTableIndex(pCatalog, rsp);
}
taosArrayDestroy(hbRsp.pIndexRsp);
hbRsp.pIndexRsp = NULL;
tFreeSSTbHbRsp(&hbRsp);
return TSDB_CODE_SUCCESS;
}
......
......@@ -2437,6 +2437,8 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tbName) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->suid) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pIndex);
......@@ -2473,6 +2475,8 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tbName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->suid) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
int32_t num = 0;
......@@ -2640,12 +2644,30 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
if (tStartEncode(&encoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tEncodeI32(&encoder, numOfBatch) < 0) return -1;
for (int32_t i = 0; i < numOfBatch; ++i) {
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pArray, i);
int32_t numOfMeta = taosArrayGetSize(pRsp->pMetaRsp);
if (tEncodeI32(&encoder, numOfMeta) < 0) return -1;
for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pMetaRsp, i);
if (tEncodeSTableMetaRsp(&encoder, pMetaRsp) < 0) return -1;
}
int32_t numOfIndex = taosArrayGetSize(pRsp->pIndexRsp);
for (int32_t i = 0; i < numOfMeta; ++i) {
STableIndexRsp *pIndexRsp = taosArrayGet(pRsp->pIndexRsp, i);
if (tEncodeCStr(&encoder, pIndexRsp->tbName) < 0) return -1;
if (tEncodeCStr(&encoder, pIndexRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pIndexRsp->suid) < 0) return -1;
if (tEncodeI32(&encoder, pIndexRsp->version) < 0) return -1;
int32_t num = taosArrayGetSize(pIndexRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) {
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pIndexRsp->pIndex, i);
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
}
}
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -2671,20 +2693,53 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
if (tStartDecode(&decoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1;
int32_t numOfMeta = taosArrayGetSize(pRsp->pMetaRsp);
if (tDecodeI32(&decoder, &numOfMeta) < 0) return -1;
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(STableMetaRsp));
if (pRsp->pArray == NULL) {
pRsp->pMetaRsp = taosArrayInit(numOfMeta, sizeof(STableMetaRsp));
if (pRsp->pMetaRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfBatch; ++i) {
for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp tableMetaRsp = {0};
if (tDecodeSTableMetaRsp(&decoder, &tableMetaRsp) < 0) return -1;
taosArrayPush(pRsp->pArray, &tableMetaRsp);
taosArrayPush(pRsp->pMetaRsp, &tableMetaRsp);
}
int32_t numOfIndex = taosArrayGetSize(pRsp->pIndexRsp);
if (tDecodeI32(&decoder, &numOfIndex) < 0) return -1;
pRsp->pIndexRsp = taosArrayInit(numOfIndex, sizeof(STableIndexRsp));
if (pRsp->pIndexRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp tableIndexRsp = {0};
if (tDecodeCStrTo(&decoder, tableIndexRsp.tbName) < 0) return -1;
if (tDecodeCStrTo(&decoder, tableIndexRsp.dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &tableIndexRsp.suid) < 0) return -1;
if (tDecodeI32(&decoder, &tableIndexRsp.version) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
tableIndexRsp.pIndex = taosArrayInit(num, sizeof(STableIndexInfo));
if (NULL == tableIndexRsp.pIndex) return -1;
STableIndexInfo info;
for (int32_t i = 0; i < num; ++i) {
if (tDeserializeSTableIndexInfo(&decoder, &info) < 0) return -1;
if (NULL == taosArrayPush(tableIndexRsp.pIndex, &info)) {
taosMemoryFree(info.expr);
return -1;
}
}
}
taosArrayPush(pRsp->pIndexRsp, &tableIndexRsp);
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
......@@ -2693,14 +2748,33 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
void tFreeSTableMetaRsp(STableMetaRsp *pRsp) { taosMemoryFreeClear(pRsp->pSchemas); }
void tFreeSTableIndexRsp(void *info) {
if (NULL == info) {
return;
}
STableIndexRsp *pInfo = (STableIndexRsp *)info;
taosArrayDestroyEx(pInfo->pIndex, tFreeSTableIndexInfo);
}
void tFreeSSTbHbRsp(SSTbHbRsp *pRsp) {
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
for (int32_t i = 0; i < numOfBatch; ++i) {
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pArray, i);
int32_t numOfMeta = taosArrayGetSize(pRsp->pMetaRsp);
for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pMetaRsp, i);
tFreeSTableMetaRsp(pMetaRsp);
}
taosArrayDestroy(pRsp->pArray);
taosArrayDestroy(pRsp->pMetaRsp);
int32_t numOfIndex = taosArrayGetSize(pRsp->pIndexRsp);
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp *pIndexRsp = taosArrayGet(pRsp->pIndexRsp, i);
tFreeSTableIndexRsp(pIndexRsp);
}
taosArrayDestroy(pRsp->pIndexRsp);
}
int32_t tSerializeSShowRsp(void *buf, int32_t bufLen, SShowRsp *pRsp) {
......
......@@ -885,6 +885,8 @@ static int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp
return TSDB_CODE_SUCCESS;
}
strcpy(rsp->dbFName, pStb->db);
strcpy(rsp->tbName, pStb->name);
rsp->suid = pStb->uid;
rsp->version = pStb->smaVer;
mndReleaseStb(pMnode, pStb);
......
......@@ -1674,15 +1674,15 @@ _OVER:
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
int32_t *pRspLen) {
SSTbHbRsp hbRsp = {0};
hbRsp.pArray = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
if (hbRsp.pArray == NULL) {
hbRsp.pMetaRsp = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
if (hbRsp.pMetaRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
hbRsp.pIndexRsp = taosMemoryCalloc(1, sizeof(STableIndexRsp));
hbRsp.pIndexRsp = taosArrayInit(numOfStbs, sizeof(STableIndexRsp));
if (NULL == hbRsp.pIndexRsp) {
taosArrayDestroy(hbRsp.pArray);
taosArrayDestroy(hbRsp.pMetaRsp);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -1700,12 +1700,12 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) {
metaRsp.numOfColumns = -1;
metaRsp.suid = pStbVersion->suid;
taosArrayPush(hbRsp.pArray, &metaRsp);
taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
continue;
}
if (pStbVersion->sversion != metaRsp.sversion || pStbVersion->tversion != metaRsp.tversion) {
taosArrayPush(hbRsp.pArray, &metaRsp);
taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
} else {
tFreeSTableMetaRsp(&metaRsp);
}
......@@ -1713,11 +1713,19 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
if (pStbVersion->smaVer != smaVer) {
bool exist = false;
char tbFName[TSDB_TABLE_FNAME_LEN];
STableIndexRsp indexRsp = {0};
sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
int32_t code = mndGetTableSma(pMnode, tbFName, hbRsp.pIndexRsp, &exist);
int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
if (code || !exist) {
taosMemoryFreeClear(hbRsp.pIndexRsp);
indexRsp.suid = pStbVersion->suid;
indexRsp.version = -1;
indexRsp.pIndex = NULL;
}
strcpy(indexRsp->dbFName, pStbVersion->dbFName);
strcpy(indexRsp->tbName, pStbVersion->stbName);
taosArrayPush(hbRsp.pIndexRsp, &indexRsp);
}
}
......
......@@ -319,8 +319,6 @@ typedef struct SCtgUpdateUserMsg {
typedef struct SCtgUpdateTbIndexMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
STableIndex* pIndex;
} SCtgUpdateTbIndexMsg;
......@@ -503,7 +501,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, SName* pName, STableIndex *pIndex, bool syncOp);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex *pIndex, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
......
......@@ -751,6 +751,30 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp *pRsp) {
CTG_API_ENTER();
int32_t code = 0;
if (NULL == pCtg || NULL == pRsp) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
STableIndex* pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pIndex) {
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pIndex, pRsp, sizeof(STableIndex));
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, &pIndex, false));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
CTG_API_ENTER();
......@@ -1153,16 +1177,36 @@ int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo *pConn, const SNam
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pRes, NULL));
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pTableName, pRes));
if (*pRes) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
SArray* pInfo = NULL;
CTG_ERR_JRET(ctgCloneTableIndex(*pRes, &pInfo));
STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pIndex) {
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t code = 0;
CTG_ERR_JRET(ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pIndex, NULL));
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, pTableName, pInfo, false));
SArray* pInfo = NULL;
CTG_ERR_JRET(ctgCloneTableIndex(pIndex->pIndex, &pInfo));
*pRes = pInfo;
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, pTableName, &pIndex, false));
CTG_API_LEAVE(code);
_return:
tFreeSTableIndexRsp(pIndex);
taosMemoryFree(pIndex);
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
*pRes = NULL;
CTG_API_LEAVE(code);
}
......
......@@ -876,10 +876,9 @@ int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
SArray* pInfo = NULL;
CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo));
pTask->res = pInfo;
pTask->msgCtx.out = NULL;
SCtgTbIndexCtx* ctx = pTask->taskCtx;
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, ctx->pName, pOut, false));
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false));
_return:
......
......@@ -838,7 +838,7 @@ _return:
CTG_RET(code);
}
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, SName* pName, STableIndex *pIndex, bool syncOp) {
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_INDEX;
......@@ -851,19 +851,19 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, SName* pName, STableIndex *pInde
}
msg->pCtg = pCtg;
msg->pIndex = pIndex;
tNameGetFullDbName(pName, msg->dbFName);
strcpy(msg->tbName, pName->tname);
msg->pIndex = *pIndex;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
*pIndex = NULL;
return TSDB_CODE_SUCCESS;
_return:
taosArrayDestroyEx(pIndex, tFreeSTableIndexInfo);
taosArrayDestroyEx(*pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(*pIndex);
taosMemoryFreeClear(msg);
CTG_RET(code);
......@@ -1745,12 +1745,12 @@ int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
STableIndex* pIndex = msg->pIndex;
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, msg->dbFName, 0, &dbCache));
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache));
if (NULL == dbCache) {
CTG_ERR_JRET(code);
}
CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, msg->dbFName, msg->tbName, &pIndex));
CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));
_return:
......
......@@ -431,7 +431,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, SArray** out, SCtgTask* pTask) {
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
......@@ -448,7 +448,11 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
}
if (pTask) {
void* pOut = NULL;
void* pOut = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
......
......@@ -484,17 +484,11 @@ int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
STableIndexRsp *out = taosMemoryCalloc(1, sizeof(STableIndexRsp));
if (NULL == out) {
return TSDB_CODE_OUT_OF_MEMORY;
}
STableIndexRsp *out = (STableIndexRsp*)output;
if (tDeserializeSTableIndexRsp(msg, msgSize, out) != 0) {
qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize);
return TSDB_CODE_INVALID_MSG;
}
*(void **)output = out;
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册