未验证 提交 7c65ac75 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #13495 from taosdata/feature/qnode

enh: remove offline scheduler info
...@@ -2485,6 +2485,32 @@ typedef struct { ...@@ -2485,6 +2485,32 @@ typedef struct {
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp); int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp);
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp); int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp);
typedef struct {
char tbFName[TSDB_TABLE_FNAME_LEN];
} STableIndexReq;
int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq);
typedef struct {
int8_t intervalUnit;
int8_t slidingUnit;
int64_t interval;
int64_t offset;
int64_t sliding;
int64_t dstTbUid;
int32_t dstVgId; // for stream
char* expr;
} STableIndexInfo;
typedef struct {
SArray* pIndex;
} STableIndexRsp;
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
typedef struct { typedef struct {
int8_t mqMsgType; int8_t mqMsgType;
int32_t code; int32_t code;
......
...@@ -130,6 +130,7 @@ enum { ...@@ -130,6 +130,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL)
......
...@@ -272,6 +272,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons ...@@ -272,6 +272,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo); int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass); int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
...@@ -280,7 +282,6 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); ...@@ -280,7 +282,6 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet); int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate); int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate);
......
...@@ -2394,6 +2394,102 @@ int32_t tDeserializeSUserIndexRsp(void *buf, int32_t bufLen, SUserIndexRsp *pRsp ...@@ -2394,6 +2394,102 @@ int32_t tDeserializeSUserIndexRsp(void *buf, int32_t bufLen, SUserIndexRsp *pRsp
return 0; return 0;
} }
int32_t tSerializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->tbFName) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSTableIndexReq(void *buf, int32_t bufLen, STableIndexReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->tbFName) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSTableIndexInfo(SEncoder *pEncoder, STableIndexInfo* pInfo) {
if (tEncodeI8(pEncoder, pInfo->intervalUnit) < 0) return -1;
if (tEncodeI8(pEncoder, pInfo->slidingUnit) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->interval) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->offset) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->sliding) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->dstTbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->dstVgId) < 0) return -1;
if (tEncodeCStr(pEncoder, pInfo->expr) < 0) return -1;
return 0;
}
int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) {
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo* pInfo = (STableIndexInfo*)taosArrayGet(pRsp->pIndex, i);
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
}
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSTableIndexInfo(SDecoder *pDecoder, STableIndexInfo *pInfo) {
if (tDecodeI8(pDecoder, &pInfo->intervalUnit) < 0) return -1;
if (tDecodeI8(pDecoder, &pInfo->slidingUnit) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->interval) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->offset) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->sliding) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->dstTbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->dstVgId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pInfo->expr) < 0) return -1;
return 0;
}
int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
pRsp->pIndex = taosArrayInit(num, sizeof(STableIndexInfo));
if (NULL == pRsp->pIndex) return -1;
STableIndexInfo info;
for (int32_t i = 0; i < num; ++i) {
if (tDeserializeSTableIndexInfo(&decoder, &info) < 0) return -1;
if (NULL == taosArrayPush(pRsp->pIndex, &info)) {
taosMemoryFree(info.expr);
return -1;
}
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) { int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
......
...@@ -181,6 +181,7 @@ SArray *mmGetMsgHandles() { ...@@ -181,6 +181,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -309,6 +309,7 @@ typedef struct { ...@@ -309,6 +309,7 @@ typedef struct {
int8_t slidingUnit; int8_t slidingUnit;
int8_t timezone; int8_t timezone;
int32_t dstVgId; // for stream int32_t dstVgId; // for stream
int64_t dstTbUid;
int64_t interval; int64_t interval;
int64_t offset; int64_t offset;
int64_t sliding; int64_t sliding;
......
...@@ -40,6 +40,7 @@ static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpS ...@@ -40,6 +40,7 @@ static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpS
static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq); static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq);
static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq); static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
...@@ -59,6 +60,7 @@ int32_t mndInitSma(SMnode *pMnode) { ...@@ -59,6 +60,7 @@ int32_t mndInitSma(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbSmaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextSma);
...@@ -870,6 +872,55 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp ...@@ -870,6 +872,55 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
return code; return code;
} }
static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIndexRsp *rsp, bool *exist) {
int32_t code = 0;
SSmaObj *pSma = NULL;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
STableIndexInfo info;
while (1) {
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break;
if (pSma->stb[0] != indexReq->tbFName[0] || strcmp(pSma->stb, indexReq->tbFName)) {
continue;
}
info.intervalUnit = pSma->intervalUnit;
info.slidingUnit = pSma->slidingUnit;
info.interval = pSma->interval;
info.offset = pSma->offset;
info.sliding = pSma->sliding;
info.dstTbUid = pSma->dstTbUid;
info.dstVgId = pSma->dstVgId;
info.expr = taosMemoryMalloc(pSma->exprLen + 1);
if (info.expr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
sdbRelease(pSdb, pSma);
return code;
}
memcpy(info.expr, pSma->expr, pSma->exprLen);
info.expr[pSma->exprLen] = 0;
if (NULL == taosArrayPush(rsp->pIndex, &info)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
taosMemoryFree(info.expr);
sdbRelease(pSdb, pSma);
return code;
}
*exist = true;
sdbRelease(pSdb, pSma);
}
return code;
}
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) { static int32_t mndProcessGetSmaReq(SRpcMsg *pReq) {
SUserIndexReq indexReq = {0}; SUserIndexReq indexReq = {0};
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
...@@ -916,6 +967,59 @@ _OVER: ...@@ -916,6 +967,59 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
STableIndexReq indexReq = {0};
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
STableIndexRsp rsp = {0};
bool exist = false;
if (tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
rsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
if (NULL == rsp.pIndex) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
code = mndGetTableSma(pMnode, &indexReq, &rsp, &exist);
if (code) {
goto _OVER;
}
if (!exist) {
code = -1;
terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
} else {
int32_t contLen = tSerializeSTableIndexRsp(NULL, 0, &rsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
tSerializeSTableIndexRsp(pRsp, contLen, &rsp);
pReq->info.rsp = pRsp;
pReq->info.rspLen = contLen;
code = 0;
}
_OVER:
if (code != 0) {
mError("failed to get table index %s since %s", indexReq.tbFName, terrstr());
}
return code;
}
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
......
...@@ -310,22 +310,19 @@ typedef struct SCtgCacheOperation { ...@@ -310,22 +310,19 @@ typedef struct SCtgCacheOperation {
int32_t opId; int32_t opId;
void *data; void *data;
bool syncOp; bool syncOp;
uint64_t seqId; tsem_t rspSem;
} SCtgCacheOperation; } SCtgCacheOperation;
typedef struct SCtgQNode { typedef struct SCtgQNode {
SCtgCacheOperation op; SCtgCacheOperation *op;
struct SCtgQNode *next; struct SCtgQNode *next;
} SCtgQNode; } SCtgQNode;
typedef struct SCtgQueue { typedef struct SCtgQueue {
SRWLatch qlock; SRWLatch qlock;
uint64_t seqId;
uint64_t seqDone;
SCtgQNode *head; SCtgQNode *head;
SCtgQNode *tail; SCtgQNode *tail;
tsem_t reqSem; tsem_t reqSem;
tsem_t rspSem;
uint64_t qRemainNum; uint64_t qRemainNum;
} SCtgQueue; } SCtgQueue;
...@@ -493,6 +490,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu ...@@ -493,6 +490,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask); int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask); int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask); int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask); int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask); int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
......
...@@ -506,11 +506,6 @@ int32_t catalogInit(SCatalogCfg *cfg) { ...@@ -506,11 +506,6 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
} }
if (tsem_init(&gCtgMgmt.queue.rspSem, 0, 0)) {
qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
}
gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode)); gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == gCtgMgmt.queue.head) { if (NULL == gCtgMgmt.queue.head) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
...@@ -1141,6 +1136,17 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps ...@@ -1141,6 +1136,17 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL)); CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL));
} }
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == tbFName || NULL == pRes) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), tbFName, pRes, NULL));
}
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) { int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) {
CTG_API_ENTER(); CTG_API_ENTER();
...@@ -1194,10 +1200,6 @@ void catalogDestroy(void) { ...@@ -1194,10 +1200,6 @@ void catalogDestroy(void) {
if (tsem_post(&gCtgMgmt.queue.reqSem)) { if (tsem_post(&gCtgMgmt.queue.reqSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
} }
if (tsem_post(&gCtgMgmt.queue.rspSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
while (CTG_IS_LOCKED(&gCtgMgmt.lock)) { while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
taosUsleep(1); taosUsleep(1);
......
...@@ -501,25 +501,6 @@ _return: ...@@ -501,25 +501,6 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void ctgWaitOpDone(SCtgCacheOperation *action) {
while (true) {
tsem_wait(&gCtgMgmt.queue.rspSem);
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
tsem_post(&gCtgMgmt.queue.rspSem);
break;
}
if (gCtgMgmt.queue.seqDone >= action->seqId) {
break;
}
tsem_post(&gCtgMgmt.queue.rspSem);
sched_yield();
}
}
void ctgDequeue(SCtgCacheOperation **op) { void ctgDequeue(SCtgCacheOperation **op) {
SCtgQNode *orig = gCtgMgmt.queue.head; SCtgQNode *orig = gCtgMgmt.queue.head;
...@@ -530,7 +511,7 @@ void ctgDequeue(SCtgCacheOperation **op) { ...@@ -530,7 +511,7 @@ void ctgDequeue(SCtgCacheOperation **op) {
taosMemoryFreeClear(orig); taosMemoryFreeClear(orig);
*op = &node->op; *op = node->op;
} }
...@@ -541,9 +522,11 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { ...@@ -541,9 +522,11 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
CTG_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
operation->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1); if (operation->syncOp) {
tsem_init(&operation->rspSem, 0, 0);
}
node->op = *operation; node->op = operation;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
gCtgMgmt.queue.tail->next = node; gCtgMgmt.queue.tail->next = node;
...@@ -558,7 +541,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { ...@@ -558,7 +541,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name); ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name);
if (operation->syncOp) { if (operation->syncOp) {
ctgWaitOpDone(operation); tsem_wait(&operation->rspSem);
taosMemoryFree(operation);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -567,7 +551,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { ...@@ -567,7 +551,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_CACHE;
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg)); SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
...@@ -583,21 +569,24 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) ...@@ -583,21 +569,24 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
msg->dbId = dbId; msg->dbId = dbId;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) { int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_VGROUP;
op->syncOp = syncOp;
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg)); SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
...@@ -612,15 +601,15 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) ...@@ -612,15 +601,15 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp)
msg->pCtg = pCtg; msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName)); strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
...@@ -628,7 +617,10 @@ _return: ...@@ -628,7 +617,10 @@ _return:
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) { int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_STB_META;
op->syncOp = syncOp;
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg)); SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
...@@ -641,15 +633,15 @@ int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, ...@@ -641,15 +633,15 @@ int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
msg->dbId = dbId; msg->dbId = dbId;
msg->suid = suid; msg->suid = suid;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
...@@ -657,7 +649,10 @@ _return: ...@@ -657,7 +649,10 @@ _return:
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) { int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_TB_META;
op->syncOp = syncOp;
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg)); SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
...@@ -669,21 +664,24 @@ int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, ...@@ -669,21 +664,24 @@ int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
strncpy(msg->tbName, tbName, sizeof(msg->tbName)); strncpy(msg->tbName, tbName, sizeof(msg->tbName));
msg->dbId = dbId; msg->dbId = dbId;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) { int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_VGROUP, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VGROUP;
op->syncOp = syncOp;
SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg)); SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
...@@ -701,22 +699,25 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId ...@@ -701,22 +699,25 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
msg->dbId = dbId; msg->dbId = dbId;
msg->dbInfo = dbInfo; msg->dbInfo = dbInfo;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
ctgFreeVgInfo(dbInfo); ctgFreeVgInfo(dbInfo);
taosMemoryFreeClear(action.data); taosMemoryFreeClear(op->data);
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) { int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_TB_META, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_META;
op->syncOp = syncOp;
SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg)); SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
...@@ -731,9 +732,9 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy ...@@ -731,9 +732,9 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
msg->pCtg = pCtg; msg->pCtg = pCtg;
msg->output = output; msg->output = output;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -746,7 +747,9 @@ _return: ...@@ -746,7 +747,9 @@ _return:
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) { int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation operation= {.opId = CTG_OP_UPDATE_VG_EPSET}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VG_EPSET;
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg)); SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
...@@ -758,9 +761,9 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp ...@@ -758,9 +761,9 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp
msg->vgId = vgId; msg->vgId = vgId;
msg->epSet = *pEpSet; msg->epSet = *pEpSet;
operation.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &operation)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -775,7 +778,10 @@ _return: ...@@ -775,7 +778,10 @@ _return:
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) { int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_UPDATE_USER, .syncOp = syncOp}; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_USER;
op->syncOp = syncOp;
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg)); SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
if (NULL == msg) { if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
...@@ -785,9 +791,9 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp ...@@ -785,9 +791,9 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp
msg->pCtg = pCtg; msg->pCtg = pCtg;
msg->userAuth = *pAuth; msg->userAuth = *pAuth;
action.data = msg; op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action)); CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1607,6 +1613,39 @@ void ctgUpdateThreadUnexpectedStopped(void) { ...@@ -1607,6 +1613,39 @@ void ctgUpdateThreadUnexpectedStopped(void) {
if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
} }
void ctgCleanupCacheQueue(void) {
SCtgQNode *node = NULL;
SCtgQNode *nodeNext = NULL;
while (true) {
node = gCtgMgmt.queue.head->next;
while (node) {
if (node->op) {
taosMemoryFree(node->op->data);
if (node->op->syncOp) {
tsem_post(&node->op->rspSem);
} else {
taosMemoryFree(node->op);
}
}
nodeNext = node->next;
taosMemoryFree(node);
node = nodeNext;
}
if (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
taosUsleep(1);
} else {
break;
}
}
taosMemoryFreeClear(gCtgMgmt.queue.head);
gCtgMgmt.queue.tail = NULL;
}
void* ctgUpdateThreadFunc(void* param) { void* ctgUpdateThreadFunc(void* param) {
setThreadName("catalog"); setThreadName("catalog");
#ifdef WINDOWS #ifdef WINDOWS
...@@ -1622,7 +1661,8 @@ void* ctgUpdateThreadFunc(void* param) { ...@@ -1622,7 +1661,8 @@ void* ctgUpdateThreadFunc(void* param) {
} }
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
tsem_post(&gCtgMgmt.queue.rspSem); CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
ctgCleanupCacheQueue();
break; break;
} }
...@@ -1634,10 +1674,8 @@ void* ctgUpdateThreadFunc(void* param) { ...@@ -1634,10 +1674,8 @@ void* ctgUpdateThreadFunc(void* param) {
(*gCtgCacheOperation[operation->opId].func)(operation); (*gCtgCacheOperation[operation->opId].func)(operation);
gCtgMgmt.queue.seqDone = operation->seqId;
if (operation->syncOp) { if (operation->syncOp) {
tsem_post(&gCtgMgmt.queue.rspSem); tsem_post(&operation->rspSem);
} }
CTG_RT_STAT_INC(qDoneNum, 1); CTG_RT_STAT_INC(qDoneNum, 1);
......
...@@ -85,6 +85,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, ...@@ -85,6 +85,21 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("Got index from mnode, indexName:%s", target); qDebug("Got index from mnode, indexName:%s", target);
break; break;
} }
case TDMT_MND_GET_TABLE_INDEX: {
if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target);
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
CTG_ERR_RET(code);
}
qDebug("Got table index from mnode, tbFName:%s", target);
break;
}
case TDMT_MND_RETRIEVE_FUNC: { case TDMT_MND_RETRIEVE_FUNC: {
if (TSDB_CODE_SUCCESS != rspCode) { if (TSDB_CODE_SUCCESS != rspCode) {
qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target); qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target);
...@@ -412,6 +427,44 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo * ...@@ -412,6 +427,44 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask) {
char *msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void *)tbFName, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
CTG_ERR_RET(code);
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, POINTER_BYTES);
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask) { int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask) {
char *msg = NULL; char *msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
......
...@@ -203,6 +203,24 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32 ...@@ -203,6 +203,24 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
STableIndexReq indexReq = {0};
strcpy(indexReq.tbFName, input);
int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq);
void *pBuf = (*mallcFp)(bufLen);
tSerializeSTableIndexReq(pBuf, bufLen, &indexReq);
*msg = pBuf;
*msgLen = bufLen;
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
SUseDbOutput *pOut = output; SUseDbOutput *pOut = output;
...@@ -459,26 +477,43 @@ int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) { ...@@ -459,26 +477,43 @@ int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
void initQueryModuleMsgHandle() { STableIndexRsp out = {0};
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; if (tDeserializeSTableIndexRsp(msg, msgSize, &out) != 0) {
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize);
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; return TSDB_CODE_INVALID_MSG;
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; }
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg; *(void **)output = out.pIndex;
queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg;
return TSDB_CODE_SUCCESS;
}
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp; queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp;
} }
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
...@@ -356,7 +356,7 @@ int32_t qwOpenRef(void); ...@@ -356,7 +356,7 @@ int32_t qwOpenRef(void);
void qwSetHbParam(int64_t refId, SQWHbParam **pParam); void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SArray* pExpiredSch); void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
......
...@@ -539,8 +539,23 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { ...@@ -539,8 +539,23 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
} }
void qwClearExpiredSch(SArray* pExpiredSch) { void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch) {
int32_t num = taosArrayGetSize(pExpiredSch);
for (int32_t i = 0; i < num; ++i) {
uint64_t *sId = taosArrayGet(pExpiredSch, i);
SQWSchStatus *pSch = NULL;
if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) {
continue;
}
if (taosHashGetSize(pSch->tasksHash) <= 0) {
qwDestroySchStatus(pSch);
taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
qError("sch %" PRIx64 "destroyed", *sId);
}
qwReleaseScheduler(QW_WRITE, mgmt);
}
} }
...@@ -790,9 +790,10 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { ...@@ -790,9 +790,10 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
} }
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo)); QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));
sch->hbBrokenTs = 0;
QW_LOCK(QW_WRITE, &sch->hbConnLock); QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) { if (sch->hbConnInfo.handle) {
...@@ -912,7 +913,7 @@ _return: ...@@ -912,7 +913,7 @@ _return:
} }
if (taosArrayGetSize(pExpiredSch) > 0) { if (taosArrayGetSize(pExpiredSch) > 0) {
qwClearExpiredSch(pExpiredSch); qwClearExpiredSch(mgmt, pExpiredSch);
} }
taosMemoryFreeClear(rspList); taosMemoryFreeClear(rspList);
......
...@@ -207,6 +207,7 @@ typedef struct SSchJob { ...@@ -207,6 +207,7 @@ typedef struct SSchJob {
SArray *dataSrcTasks; // SArray<SQueryTask*> SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx; int32_t levelIdx;
SEpSet dataSrcEps; SEpSet dataSrcEps;
SHashObj *taskList;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
......
...@@ -63,6 +63,13 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN ...@@ -63,6 +63,13 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pN
if (pNodeList != NULL) { if (pNodeList != NULL) {
pJob->nodeList = taosArrayDup(pNodeList); pJob->nodeList = taosArrayDup(pNodeList);
} }
pJob->taskList =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->taskList) {
SCH_JOB_ELOG("taosHashInit %d taskList failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob)); SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
...@@ -486,23 +493,26 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { ...@@ -486,23 +493,26 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_SET_JOB_TYPE(pJob, plan->subplanType); SCH_SET_JOB_TYPE(pJob, plan->subplanType);
SSchTask task = {0}; SSchTask task = {0};
SSchTask *pTask = &task;
SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel)); SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
void *p = taosArrayPush(pLevel->subTasks, &task); SSchTask *pTask = taosArrayPush(pLevel->subTasks, &task);
if (NULL == p) { if (NULL == pTask) {
SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n); SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_ERR_JRET(schRecordQueryDataSrc(pJob, p)); SCH_ERR_JRET(schRecordQueryDataSrc(pJob, pTask));
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
++pJob->taskNum; ++pJob->taskNum;
} }
...@@ -1276,14 +1286,10 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas ...@@ -1276,14 +1286,10 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
} }
int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) { int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) {
schGetTaskFromList(pJob->execTasks, taskId, pTask); schGetTaskFromList(pJob->taskList, taskId, pTask);
if (NULL == *pTask) { if (NULL == *pTask) {
schGetTaskFromList(pJob->succTasks, taskId, pTask); SCH_JOB_ELOG("task not found in job task list, taskId:%" PRIx64, taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
if (NULL == *pTask) {
SCH_JOB_ELOG("task not found in execList & succList, taskId:%" PRIx64, taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1445,7 +1451,8 @@ void schFreeJobImpl(void *job) { ...@@ -1445,7 +1451,8 @@ void schFreeJobImpl(void *job) {
taosHashCleanup(pJob->execTasks); taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks); taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks); taosHashCleanup(pJob->succTasks);
taosHashCleanup(pJob->taskList);
taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList); taosArrayDestroy(pJob->nodeList);
taosArrayDestroy(pJob->dataSrcTasks); taosArrayDestroy(pJob->dataSrcTasks);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册