提交 135836c7 编写于 作者: L Liu Jicong

fix(tmq): tq deserialize msg

上级 e1a5000e
...@@ -2563,6 +2563,12 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p ...@@ -2563,6 +2563,12 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
buf = taosDecodeFixedI8(buf, &pRsp->withSchema); buf = taosDecodeFixedI8(buf, &pRsp->withSchema);
buf = taosDecodeFixedI8(buf, &pRsp->withTag); buf = taosDecodeFixedI8(buf, &pRsp->withTag);
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
}
if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
}
for (int32_t i = 0; i < pRsp->blockNum; i++) { for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = 0; int32_t bLen = 0;
...@@ -2572,20 +2578,14 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p ...@@ -2572,20 +2578,14 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
taosArrayPush(pRsp->blockDataLen, &bLen); taosArrayPush(pRsp->blockDataLen, &bLen);
taosArrayPush(pRsp->blockData, &data); taosArrayPush(pRsp->blockData, &data);
if (pRsp->withSchema) { if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
buf = taosDecodeSSchemaWrapper(buf, pSW); buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
} else {
pRsp->blockSchema = NULL;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
char* name = NULL; char* name = NULL;
buf = taosDecodeString(buf, &name); buf = taosDecodeString(buf, &name);
taosArrayPush(pRsp->blockTbName, &name); taosArrayPush(pRsp->blockTbName, &name);
} else {
pRsp->blockTbName = NULL;
} }
} }
} }
......
...@@ -31,16 +31,14 @@ extern int32_t taosTmrThreads; ...@@ -31,16 +31,14 @@ extern int32_t taosTmrThreads;
void *taosTmrInit(int32_t maxTmr, int32_t resoultion, int32_t longest, const char *label); void *taosTmrInit(int32_t maxTmr, int32_t resoultion, int32_t longest, const char *label);
void taosTmrCleanUp(void *handle);
tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle); tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle);
bool taosTmrStop(tmr_h tmrId); bool taosTmrStop(tmr_h tmrId);
bool taosTmrStopA(tmr_h *timerId);
bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId); bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *handle, tmr_h *pTmrId);
void taosTmrCleanUp(void *handle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -81,6 +81,8 @@ typedef struct { ...@@ -81,6 +81,8 @@ typedef struct {
// rpc info // rpc info
int64_t reqId; int64_t reqId;
SRpcHandleInfo rpcInfo; SRpcHandleInfo rpcInfo;
tmr_h timerId;
int8_t tmrStopped;
// exec // exec
int8_t inputStatus; int8_t inputStatus;
int8_t execStatus; int8_t execStatus;
...@@ -164,13 +166,12 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); ...@@ -164,13 +166,12 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// tqOffset // tqOffset
STqOffsetStore* STqOffsetOpen(STqOffsetCfg*); STqOffsetStore* tqOffsetOpen(STqOffsetCfg*);
void STqOffsetClose(STqOffsetStore*); void tqOffsetClose(STqOffsetStore*);
int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset); int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey); int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -174,6 +174,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -174,6 +174,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum); ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum); ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
if (rsp.withSchema) {
ASSERT(taosArrayGetSize(rsp.blockSchema) == rsp.blockNum);
}
rsp.rspOffset = fetchOffset; rsp.rspOffset = fetchOffset;
......
...@@ -30,7 +30,7 @@ struct STqOffsetStore { ...@@ -30,7 +30,7 @@ struct STqOffsetStore {
SHashObj* pHash; // SHashObj<subscribeKey, offset> SHashObj* pHash; // SHashObj<subscribeKey, offset>
}; };
STqOffsetStore* STqOffsetOpen(STqOffsetCfg* pCfg) { STqOffsetStore* tqOffsetOpen(STqOffsetCfg* pCfg) {
STqOffsetStore* pStore = taosMemoryMalloc(sizeof(STqOffsetStore)); STqOffsetStore* pStore = taosMemoryMalloc(sizeof(STqOffsetStore));
if (pStore == NULL) { if (pStore == NULL) {
return NULL; return NULL;
......
...@@ -15,6 +15,11 @@ ...@@ -15,6 +15,11 @@
#include "tq.h" #include "tq.h"
void tqTmrRspFunc(void* param, void* tmrId) {
STqHandle* pHandle = (STqHandle*)param;
atomic_store_8(&pHandle->pushHandle.tmrStopped, 1);
}
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
// 1. guard and set status executing // 1. guard and set status executing
// 2. check processedVer // 2. check processedVer
...@@ -50,12 +55,15 @@ int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) { ...@@ -50,12 +55,15 @@ int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) {
return 0; return 0;
} }
void tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer) { int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer,
int64_t timeout) {
memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo)); memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo));
atomic_store_64(&pHandle->pushHandle.reqId, reqId); atomic_store_64(&pHandle->pushHandle.reqId, reqId);
atomic_store_64(&pHandle->pushHandle.processedVer, processedVer); atomic_store_64(&pHandle->pushHandle.processedVer, processedVer);
atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL); atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL);
// set timeout timer atomic_store_8(&pHandle->pushHandle.tmrStopped, 0);
taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId);
return 0;
} }
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) { int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
......
...@@ -69,6 +69,9 @@ int vnodeInit(int nthreads) { ...@@ -69,6 +69,9 @@ int vnodeInit(int nthreads) {
if (walInit() < 0) { if (walInit() < 0) {
return -1; return -1;
} }
if (tqInit() < 0) {
return -1;
}
return 0; return 0;
} }
...@@ -94,6 +97,9 @@ void vnodeCleanup() { ...@@ -94,6 +97,9 @@ void vnodeCleanup() {
taosMemoryFreeClear(vnodeGlobal.threads); taosMemoryFreeClear(vnodeGlobal.threads);
taosThreadCondDestroy(&(vnodeGlobal.hasTask)); taosThreadCondDestroy(&(vnodeGlobal.hasTask));
taosThreadMutexDestroy(&(vnodeGlobal.mutex)); taosThreadMutexDestroy(&(vnodeGlobal.mutex));
walCleanUp();
tqCleanUp();
} }
int vnodeScheduleTask(int (*execute)(void*), void* arg) { int vnodeScheduleTask(int (*execute)(void*), void* arg) {
...@@ -155,4 +161,4 @@ static void* loop(void* arg) { ...@@ -155,4 +161,4 @@ static void* loop(void* arg) {
} }
return NULL; return NULL;
} }
\ No newline at end of file
#include "qworker.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
#include "executor.h" #include "executor.h"
#include "planner.h" #include "planner.h"
#include "query.h" #include "query.h"
#include "qwInt.h" #include "qwInt.h"
#include "qwMsg.h" #include "qwMsg.h"
#include "qworker.h"
#include "tcommon.h" #include "tcommon.h"
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
...@@ -406,7 +406,6 @@ int32_t qwDropTask(QW_FPARAMS_DEF) { ...@@ -406,7 +406,6 @@ int32_t qwDropTask(QW_FPARAMS_DEF) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
int32_t paramIdx = 0; int32_t paramIdx = 0;
int32_t newParamIdx = 0; int32_t newParamIdx = 0;
...@@ -430,11 +429,10 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { ...@@ -430,11 +429,10 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
*pParam = &gQwMgmt.param[paramIdx]; *pParam = &gQwMgmt.param[paramIdx];
} }
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion); qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
if (dbFName[0] && tbName[0]) { if (dbFName[0] && tbName[0]) {
...@@ -444,7 +442,6 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { ...@@ -444,7 +442,6 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
} }
} }
void qwCloseRef(void) { void qwCloseRef(void) {
taosWLockLatch(&gQwMgmt.lock); taosWLockLatch(&gQwMgmt.lock);
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
...@@ -454,13 +451,13 @@ void qwCloseRef(void) { ...@@ -454,13 +451,13 @@ void qwCloseRef(void) {
taosWUnLockLatch(&gQwMgmt.lock); taosWUnLockLatch(&gQwMgmt.lock);
} }
void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); } void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); }
void qwDestroyImpl(void *pMgmt) { void qwDestroyImpl(void *pMgmt) {
SQWorker *mgmt = (SQWorker *)pMgmt; SQWorker *mgmt = (SQWorker *)pMgmt;
taosTmrStopA(&mgmt->hbTimer); taosTmrStop(mgmt->hbTimer);
mgmt->hbTimer = NULL;
taosTmrCleanUp(mgmt->timer); taosTmrCleanUp(mgmt->timer);
// TODO STOP ALL QUERY // TODO STOP ALL QUERY
...@@ -527,10 +524,10 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { ...@@ -527,10 +524,10 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
switch (type) { switch (type) {
case QUERY_QUEUE: case QUERY_QUEUE:
pStat = &mgmt->stat.msgStat.waitTime[0]; pStat = &mgmt->stat.msgStat.waitTime[0];
return pStat->num ? (pStat->total/pStat->num) : 0; return pStat->num ? (pStat->total / pStat->num) : 0;
case FETCH_QUEUE: case FETCH_QUEUE:
pStat = &mgmt->stat.msgStat.waitTime[1]; pStat = &mgmt->stat.msgStat.waitTime[1];
return pStat->num ? (pStat->total/pStat->num) : 0; return pStat->num ? (pStat->total / pStat->num) : 0;
default: default:
qError("unsupported queue type %d", type); qError("unsupported queue type %d", type);
} }
...@@ -538,5 +535,3 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { ...@@ -538,5 +535,3 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
return -1; return -1;
} }
...@@ -23,19 +23,19 @@ ...@@ -23,19 +23,19 @@
#define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue. #define DUMP_SCHEDULER_TIME_WINDOW 30000 // every 30sec, take a snap shot of task queue.
typedef struct { typedef struct {
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
tsem_t emptySem; tsem_t emptySem;
tsem_t fullSem; tsem_t fullSem;
TdThreadMutex queueMutex; TdThreadMutex queueMutex;
int32_t fullSlot; int32_t fullSlot;
int32_t emptySlot; int32_t emptySlot;
int32_t queueSize; int32_t queueSize;
int32_t numOfThreads; int32_t numOfThreads;
TdThread *qthread; TdThread *qthread;
SSchedMsg *queue; SSchedMsg *queue;
bool stop; bool stop;
void *pTmrCtrl; void *pTmrCtrl;
void *pTimer; void *pTimer;
} SSchedQueue; } SSchedQueue;
static void *taosProcessSchedQueue(void *param); static void *taosProcessSchedQueue(void *param);
...@@ -218,7 +218,8 @@ void taosCleanUpScheduler(void *param) { ...@@ -218,7 +218,8 @@ void taosCleanUpScheduler(void *param) {
taosThreadMutexDestroy(&pSched->queueMutex); taosThreadMutexDestroy(&pSched->queueMutex);
if (pSched->pTimer) { if (pSched->pTimer) {
taosTmrStopA(&pSched->pTimer); taosTmrStop(pSched->pTimer);
pSched->pTimer = NULL;
} }
if (pSched->queue) taosMemoryFree(pSched->queue); if (pSched->queue) taosMemoryFree(pSched->queue);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册