未验证 提交 191637a6 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12672 from taosdata/feature/tq

test(stream): _wstartts should be reverse quoted
...@@ -106,8 +106,8 @@ int32_t create_topic() { ...@@ -106,8 +106,8 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -206,7 +206,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { ...@@ -206,7 +206,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
int32_t totalVgNum = pOutput->pSub->vgNum; int32_t totalVgNum = pOutput->pSub->vgNum;
mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum); mInfo("mq rebalance: subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum);
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
...@@ -231,6 +231,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -231,6 +231,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp, .pVgEp = pVgEp,
}; };
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("mq rebalance: remove vg %d from consumer %ld", pVgEp->vgId, consumerId);
} }
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
// put into removed // put into removed
...@@ -250,6 +251,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -250,6 +251,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp, .pVgEp = pVgEp,
}; };
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
mInfo("mq rebalance: remove vg %d from unassigned", pVgEp->vgId);
} }
} }
...@@ -263,6 +265,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -263,6 +265,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
minVgCnt = totalVgNum / afterRebConsumerNum; minVgCnt = totalVgNum / afterRebConsumerNum;
imbConsumerNum = totalVgNum % afterRebConsumerNum; imbConsumerNum = totalVgNum % afterRebConsumerNum;
} }
mInfo("mq rebalance: %d consumer after rebalance, at least %d vg each, %d consumer has more vg", afterRebConsumerNum,
minVgCnt, imbConsumerNum);
// 4. first scan: remove consumer more than wanted, put to remove hash // 4. first scan: remove consumer more than wanted, put to remove hash
int32_t imbCnt = 0; int32_t imbCnt = 0;
...@@ -290,6 +294,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -290,6 +294,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp, .pVgEp = pVgEp,
}; };
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("mq rebalance: remove vg %d from consumer %ld (first scan)", pVgEp->vgId, pConsumerEp->consumerId);
} }
imbCnt++; imbCnt++;
} }
...@@ -303,6 +308,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -303,6 +308,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp, .pVgEp = pVgEp,
}; };
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("mq rebalance: remove vg %d from consumer %ld (first scan)", pVgEp->vgId, pConsumerEp->consumerId);
} }
} }
} }
...@@ -319,6 +325,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -319,6 +325,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *)); newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
taosArrayPush(pOutput->newConsumers, &consumerId); taosArrayPush(pOutput->newConsumers, &consumerId);
mInfo("mq rebalance: add new consumer %ld", consumerId);
} }
} }
...@@ -343,6 +350,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -343,6 +350,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId; pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg); taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vg %d to consumer %ld (second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
} }
} }
...@@ -360,6 +368,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -360,6 +368,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId; pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg); taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vg %d to consumer %ld (second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
} }
} else { } else {
// if all consumer is removed, put all vg into unassigned // if all consumer is removed, put all vg into unassigned
...@@ -372,6 +381,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -372,6 +381,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
ASSERT(pRebOutput->newConsumerId == -1); ASSERT(pRebOutput->newConsumerId == -1);
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
taosArrayPush(pOutput->rebVgs, pRebOutput); taosArrayPush(pOutput->rebVgs, pRebOutput);
mInfo("mq rebalance: unassign vg %d (second scan)", pRebOutput->pVgEp->vgId);
} }
} }
......
...@@ -179,6 +179,7 @@ struct STQ { ...@@ -179,6 +179,7 @@ struct STQ {
SHashObj* pStreamTasks; SHashObj* pStreamTasks;
SVnode* pVnode; SVnode* pVnode;
SWal* pWal; SWal* pWal;
// TDB* pTdb;
}; };
typedef struct { typedef struct {
......
...@@ -32,6 +32,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { ...@@ -32,6 +32,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->pWal = pWal; pTq->pWal = pWal;
/*if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {*/
/*ASSERT(0);*/
/*}*/
#if 0 #if 0
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
......
...@@ -106,7 +106,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { ...@@ -106,7 +106,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
pMsg->contentLen = pMsg->contentLen; pMsg->contentLen = pMsg->contentLen;
#endif #endif
qDebugL("stream task string %s", (const char*)msg); /*qDebugL("stream task string %s", (const char*)msg);*/
struct SSubplan* plan = NULL; struct SSubplan* plan = NULL;
int32_t code = qStringToSubplan(msg, &plan); int32_t code = qStringToSubplan(msg, &plan);
......
...@@ -5311,4 +5311,4 @@ int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t ke ...@@ -5311,4 +5311,4 @@ int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t ke
pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);; pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
...@@ -9,11 +9,11 @@ ...@@ -9,11 +9,11 @@
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true}; SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true};
SQWorkerMgmt gQwMgmt = { SQWorkerMgmt gQwMgmt = {
.lock = 0, .lock = 0,
.qwRef = -1, .qwRef = -1,
.qwNum = 0, .qwNum = 0,
}; };
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
...@@ -110,9 +110,9 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { ...@@ -110,9 +110,9 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
QW_LOCK(QW_READ, &mgmt->schLock); QW_LOCK(QW_READ, &mgmt->schLock);
QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash)); /*QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));*/
void * key = NULL; void *key = NULL;
size_t keyLen = 0; size_t keyLen = 0;
int32_t i = 0; int32_t i = 0;
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
...@@ -127,7 +127,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { ...@@ -127,7 +127,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
QW_UNLOCK(QW_READ, &mgmt->schLock); QW_UNLOCK(QW_READ, &mgmt->schLock);
QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash)); /*QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));*/
} }
char *qwPhaseStr(int32_t phase) { char *qwPhaseStr(int32_t phase) {
...@@ -462,7 +462,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { ...@@ -462,7 +462,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
} }
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
SQWSchStatus * sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -499,7 +499,7 @@ _return: ...@@ -499,7 +499,7 @@ _return:
} }
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) { int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) {
SQWSchStatus * sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -550,11 +550,11 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -550,11 +550,11 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
int32_t code = 0; int32_t code = 0;
bool qcontinue = true; bool qcontinue = true;
SSDataBlock * pRes = NULL; SSDataBlock *pRes = NULL;
uint64_t useconds = 0; uint64_t useconds = 0;
int32_t i = 0; int32_t i = 0;
int32_t execNum = 0; int32_t execNum = 0;
qTaskInfo_t * taskHandle = &ctx->taskHandle; qTaskInfo_t *taskHandle = &ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle; DataSinkHandle sinkHandle = ctx->sinkHandle;
while (true) { while (true) {
...@@ -632,7 +632,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) ...@@ -632,7 +632,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
void * key = NULL; void *key = NULL;
size_t keyLen = 0; size_t keyLen = 0;
int32_t i = 0; int32_t i = 0;
STaskStatus status = {0}; STaskStatus status = {0};
...@@ -719,8 +719,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void ...@@ -719,8 +719,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
} }
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
SQWTaskCtx * ctx = NULL; SQWTaskCtx *ctx = NULL;
SRpcHandleInfo *dropConnection = NULL; SRpcHandleInfo *dropConnection = NULL;
SRpcHandleInfo *cancelConnection = NULL; SRpcHandleInfo *cancelConnection = NULL;
...@@ -925,13 +925,13 @@ _return: ...@@ -925,13 +925,13 @@ _return:
} }
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
SSubplan* plan = NULL; SSubplan *plan = NULL;
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL; DataSinkHandle sinkHandle = NULL;
SQWTaskCtx * ctx = NULL; SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
...@@ -944,7 +944,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ...@@ -944,7 +944,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
ctx->ctrlConnInfo = qwMsg->connInfo; ctx->ctrlConnInfo = qwMsg->connInfo;
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); /*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/
code = qStringToSubplan(qwMsg->msg, &plan); code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
...@@ -1055,10 +1055,10 @@ _return: ...@@ -1055,10 +1055,10 @@ _return:
} }
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWTaskCtx * ctx = NULL; SQWTaskCtx *ctx = NULL;
int32_t code = 0; int32_t code = 0;
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
void * rsp = NULL; void *rsp = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
bool queryEnd = false; bool queryEnd = false;
...@@ -1138,8 +1138,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1138,8 +1138,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0; int32_t code = 0;
int32_t dataLen = 0; int32_t dataLen = 0;
bool locked = false; bool locked = false;
SQWTaskCtx * ctx = NULL; SQWTaskCtx *ctx = NULL;
void * rsp = NULL; void *rsp = NULL;
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
...@@ -1274,7 +1274,7 @@ _return: ...@@ -1274,7 +1274,7 @@ _return:
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
SQWSchStatus * sch = NULL; SQWSchStatus *sch = NULL;
QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
...@@ -1300,7 +1300,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re ...@@ -1300,7 +1300,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
SQWSchStatus * sch = NULL; SQWSchStatus *sch = NULL;
if (qwMsg->code) { if (qwMsg->code) {
QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req)); QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
...@@ -1338,28 +1338,28 @@ _return: ...@@ -1338,28 +1338,28 @@ _return:
qwMsg->connInfo.handle = NULL; qwMsg->connInfo.handle = NULL;
} }
QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
void qwProcessHbTimerEvent(void *param, void *tmrId) { void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWHbParam* hbParam = (SQWHbParam*)param; SQWHbParam *hbParam = (SQWHbParam *)param;
if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) { if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) {
return; return;
} }
int64_t refId = hbParam->refId; int64_t refId = hbParam->refId;
SQWorker *mgmt = qwAcquire(refId); SQWorker *mgmt = qwAcquire(refId);
if (NULL == mgmt) { if (NULL == mgmt) {
QW_DLOG("qwAcquire %" PRIx64 "failed", refId); QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
taosMemoryFree(param); taosMemoryFree(param);
return; return;
} }
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
int32_t taskNum = 0; int32_t taskNum = 0;
SQWHbInfo * rspList = NULL; SQWHbInfo *rspList = NULL;
int32_t code = 0; int32_t code = 0;
qwDbgDumpMgmtInfo(mgmt); qwDbgDumpMgmtInfo(mgmt);
...@@ -1383,7 +1383,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { ...@@ -1383,7 +1383,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
return; return;
} }
void * key = NULL; void *key = NULL;
size_t keyLen = 0; size_t keyLen = 0;
int32_t i = 0; int32_t i = 0;
...@@ -1413,29 +1413,27 @@ _return: ...@@ -1413,29 +1413,27 @@ _return:
for (int32_t j = 0; j < i; ++j) { for (int32_t j = 0; j < i; ++j) {
qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), /*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); /*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
tFreeSSchedulerHbRsp(&rspList[j].rsp); tFreeSSchedulerHbRsp(&rspList[j].rsp);
} }
taosMemoryFreeClear(rspList); taosMemoryFreeClear(rspList);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
qwRelease(refId); qwRelease(refId);
} }
void qwCloseRef(void) { void qwCloseRef(void) {
taosWLockLatch(&gQwMgmt.lock); taosWLockLatch(&gQwMgmt.lock);
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
taosCloseRef(gQwMgmt.qwRef); taosCloseRef(gQwMgmt.qwRef);
gQwMgmt.qwRef= -1; gQwMgmt.qwRef = -1;
} }
taosWUnLockLatch(&gQwMgmt.lock); taosWUnLockLatch(&gQwMgmt.lock);
} }
void qwDestroySchStatus(SQWSchStatus *pStatus) { void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); }
taosHashCleanup(pStatus->tasksHash);
}
void qwDestroyImpl(void *pMgmt) { void qwDestroyImpl(void *pMgmt) {
SQWorker *mgmt = (SQWorker *)pMgmt; SQWorker *mgmt = (SQWorker *)pMgmt;
...@@ -1454,12 +1452,12 @@ void qwDestroyImpl(void *pMgmt) { ...@@ -1454,12 +1452,12 @@ void qwDestroyImpl(void *pMgmt) {
SQWSchStatus *sch = (SQWSchStatus *)pIter; SQWSchStatus *sch = (SQWSchStatus *)pIter;
qwDestroySchStatus(sch); qwDestroySchStatus(sch);
pIter = taosHashIterate(mgmt->schHash, pIter); pIter = taosHashIterate(mgmt->schHash, pIter);
} }
taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->schHash);
taosMemoryFree(mgmt); taosMemoryFree(mgmt);
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
qwCloseRef(); qwCloseRef();
} }
...@@ -1467,7 +1465,7 @@ void qwDestroyImpl(void *pMgmt) { ...@@ -1467,7 +1465,7 @@ void qwDestroyImpl(void *pMgmt) {
int32_t qwOpenRef(void) { int32_t qwOpenRef(void) {
taosWLockLatch(&gQwMgmt.lock); taosWLockLatch(&gQwMgmt.lock);
if (gQwMgmt.qwRef < 0) { if (gQwMgmt.qwRef < 0) {
gQwMgmt.qwRef= taosOpenRef(100, qwDestroyImpl); gQwMgmt.qwRef = taosOpenRef(100, qwDestroyImpl);
if (gQwMgmt.qwRef < 0) { if (gQwMgmt.qwRef < 0) {
taosWUnLockLatch(&gQwMgmt.lock); taosWUnLockLatch(&gQwMgmt.lock);
qError("init qworker ref failed"); qError("init qworker ref failed");
...@@ -1475,14 +1473,14 @@ int32_t qwOpenRef(void) { ...@@ -1475,14 +1473,14 @@ int32_t qwOpenRef(void) {
} }
} }
taosWUnLockLatch(&gQwMgmt.lock); taosWUnLockLatch(&gQwMgmt.lock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
int32_t paramIdx = 0; int32_t paramIdx = 0;
int32_t newParamIdx = 0; int32_t newParamIdx = 0;
while (true) { while (true) {
paramIdx = atomic_load_32(&gQwMgmt.paramIdx); paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
if (paramIdx == tListLen(gQwMgmt.param)) { if (paramIdx == tListLen(gQwMgmt.param)) {
...@@ -1490,7 +1488,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { ...@@ -1490,7 +1488,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
} else { } else {
newParamIdx = paramIdx + 1; newParamIdx = paramIdx + 1;
} }
if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) { if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) {
break; break;
} }
...@@ -1577,12 +1575,12 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW ...@@ -1577,12 +1575,12 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
SQWHbParam *param = NULL; SQWHbParam *param = NULL;
qwSetHbParam(mgmt->refId, &param); qwSetHbParam(mgmt->refId, &param);
mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer); mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void *)param, mgmt->timer);
if (NULL == mgmt->hbTimer) { if (NULL == mgmt->hbTimer) {
qError("start hb timer failed"); qError("start hb timer failed");
QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
*qWorkerMgmt = mgmt; *qWorkerMgmt = mgmt;
qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt); qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
...@@ -1599,9 +1597,9 @@ _return: ...@@ -1599,9 +1597,9 @@ _return:
taosTmrCleanUp(mgmt->timer); taosTmrCleanUp(mgmt->timer);
taosMemoryFreeClear(mgmt); taosMemoryFreeClear(mgmt);
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
} }
QW_RET(code); QW_RET(code);
} }
...@@ -1678,7 +1676,7 @@ int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64 ...@@ -1678,7 +1676,7 @@ int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64
} }
int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) { int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) {
SQWSchStatus * sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -1705,7 +1703,7 @@ int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId ...@@ -1705,7 +1703,7 @@ int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId
} }
int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus * sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
......
...@@ -24,7 +24,7 @@ sql insert into t1 values(1648791233002,3,2,3,2.1); ...@@ -24,7 +24,7 @@ sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1); sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1); sql insert into t1 values(1648791213004,4,2,3,4.1);
sleep 1000 sleep 1000
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then if $rows != 4 then
print ======$rows print ======$rows
...@@ -137,7 +137,7 @@ endi ...@@ -137,7 +137,7 @@ endi
sql insert into t1 values(1648791223001,12,14,13,11.1); sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500 sleep 500
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then if $rows != 4 then
print ======$rows print ======$rows
...@@ -250,7 +250,7 @@ endi ...@@ -250,7 +250,7 @@ endi
sql insert into t1 values(1648791223002,12,14,13,11.1); sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100 sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
if $data11 != 2 then if $data11 != 2 then
...@@ -280,7 +280,7 @@ endi ...@@ -280,7 +280,7 @@ endi
sql insert into t1 values(1648791223003,12,14,13,11.1); sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100 sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
if $data11 != 3 then if $data11 != 3 then
...@@ -312,7 +312,7 @@ sql insert into t1 values(1648791223001,1,1,1,1.1); ...@@ -312,7 +312,7 @@ sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1); sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1); sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100 sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
if $data11 != 3 then if $data11 != 3 then
...@@ -344,7 +344,7 @@ sql insert into t1 values(1648791233003,3,2,3,2.1); ...@@ -344,7 +344,7 @@ sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1); sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1); sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100 sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2 # row 2
if $data21 != 2 then if $data21 != 2 then
...@@ -374,7 +374,7 @@ endi ...@@ -374,7 +374,7 @@ endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1); sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100 sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0 # row 0
if $data01 != 4 then if $data01 != 4 then
...@@ -404,7 +404,7 @@ endi ...@@ -404,7 +404,7 @@ endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1); sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100 sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
if $data11 != 4 then if $data11 != 4 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册