提交 a4ba5401 编写于 作者: H Haojun Liao

enh(stream): set the start version of all operators.

上级 497c9ea9
......@@ -91,6 +91,9 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
// todo refactor
int64_t qGetCheckpointVersion(qTaskInfo_t tinfo);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
......
......@@ -295,14 +295,11 @@ struct SStreamTask {
int16_t dispatchMsgType;
int8_t taskStatus;
int8_t schedStatus;
// node info
int32_t selfChildId;
int32_t nodeId;
SEpSet epSet;
int64_t recoverSnapVer;
int64_t startVer;
int32_t selfChildId;
int32_t nodeId;
SEpSet epSet;
int64_t recoverSnapVer;
int64_t startVer;
// fill history
int8_t fillHistory;
......@@ -340,15 +337,15 @@ struct SStreamTask {
// state backend
SStreamState* pState;
// do not serialize
int32_t recoverTryingDownstream;
int32_t recoverWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
// the followings attributes don't be serialized
int32_t recoverTryingDownstream;
int32_t recoverWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
struct SStreamMeta* pMeta;
};
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
......@@ -597,6 +594,8 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
......
......@@ -183,6 +183,10 @@ void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId)
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset);
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver);
void saveOffsetForAllTasks(STQ* pTq, int64_t ver);
void initOffsetForAllRestoreTasks(STQ* pTq);
#ifdef __cplusplus
}
#endif
......
......@@ -907,6 +907,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->startVer = ver;
pTask->pMeta = pTq->pStreamMeta;
// expand executor
if (pTask->fillHistory) {
......@@ -979,7 +980,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
streamSetupTrigger(pTask);
tqInfo("vgId:%d expand stream task, s-task:%s, child id %d, level %d", vgId, pTask->id.idStr, pTask->selfChildId, pTask->taskLevel);
tqInfo("vgId:%d expand stream task, s-task:%s, ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr,
pTask->startVer, pTask->selfChildId, pTask->taskLevel);
return 0;
}
......@@ -1370,16 +1372,6 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
return 0;
}
static void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) {
STqOffset offset = {0};
tqOffsetResetToLog(&offset.val, ver);
tstrncpy(offset.subKey, pKey, tListLen(offset.subKey));
// keep the offset info in the offset store
tqOffsetWrite(pOffsetStore, &offset);
}
static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit,
const char* key, int64_t ver) {
doSaveTaskOffset(pOffsetStore, key, ver);
......@@ -1392,36 +1384,6 @@ static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTa
return TSDB_CODE_SUCCESS;
}
static void saveOffsetForAllTasks(STQ* pTq, SPackedData submit) {
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->taskStatus);
continue;
}
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver);
}
}
}
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
void* pIter = NULL;
......@@ -1429,7 +1391,7 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to create data submit for stream since out of memory");
saveOffsetForAllTasks(pTq, submit);
saveOffsetForAllTasks(pTq, submit.ver);
return -1;
}
......@@ -1518,11 +1480,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
tqDoRestoreSourceStreamTasks(pTq);
return 0;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId);
if (pTask != NULL) {
if (pTask->taskStatus == TASK_STATUS__NORMAL) {
tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr);
streamProcessRunReq(pTask);
} else if (pTask->taskStatus == TASK_STATUS_RESTORE) {
tqDebug("vgId:%d s-task:%s start to restore from last ck", vgId, pTask->id.idStr);
streamProcessRunReq(pTask);
} else {
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
}
......@@ -1683,8 +1648,10 @@ int32_t tqRestoreStreamTasks(STQ* pTq) {
return -1;
}
tqInfo("vgId:%d start to restore all stream tasks", vgId);
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks);
tqInfo("vgId:%d start restoring stream tasks, total tasks:%d", vgId, numOfTasks);
initOffsetForAllRestoreTasks(pTq);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = ALL_STREAM_TASKS_ID;
......
......@@ -16,10 +16,13 @@
#include "tq.h"
int tqCommit(STQ* pTq) {
#if 0
// stream meta commit does not be aligned to the vnode commit
if (streamMetaCommit(pTq->pStreamMeta) < 0) {
tqError("vgId:%d, failed to commit stream meta since %s", TD_VID(pTq->pVnode), terrstr());
return -1;
}
#endif
return tqOffsetCommitFile(pTq->pOffsetStore);
}
......@@ -15,25 +15,19 @@
#include "tq.h"
static int32_t restoreStreamTask(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList);
static int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList);
static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
// this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
// will not stop eventually.
int tqDoRestoreSourceStreamTasks(STQ* pTq) {
// todo set the offset value from the previous check point offset
int64_t st = taosGetTimestampMs();
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pRestoreTasks);
tqInfo("vgId:%d start restoring stream tasks, total tasks:%d", vgId, numOfTasks);
while (1) {
SArray* pTaskList = taosArrayInit(4, POINTER_BYTES);
// check all restore tasks
restoreStreamTask(pTq->pStreamMeta, pTq->pOffsetStore, pTaskList);
restoreStreamTaskImpl(pTq->pStreamMeta, pTq->pOffsetStore, pTaskList);
transferToNormalTask(pTq->pStreamMeta, pTaskList);
taosArrayDestroy(pTaskList);
......@@ -44,7 +38,7 @@ int tqDoRestoreSourceStreamTasks(STQ* pTq) {
}
int64_t et = taosGetTimestampMs();
tqInfo("vgId:%d restoring task completed, elapsed time:%" PRId64 " sec.", vgId, (et - st));
tqInfo("vgId:%d restoring task completed, elapsed time:%" PRId64 " sec.", TD_VID(pTq->pVnode), (et - st));
return 0;
}
......@@ -68,7 +62,7 @@ int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) {
return TSDB_CODE_SUCCESS;
}
int32_t restoreStreamTask(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList) {
int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, SArray* pTaskList) {
// check all restore tasks
void* pIter = NULL;
......@@ -93,7 +87,8 @@ int32_t restoreStreamTask(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
if (tInputQueueIsFull(pTask)) {
tqDebug("s-task:%s input queue is full, do nothing" PRId64, pTask->id.idStr);
tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr);
taosMsleep(10);
continue;
}
......
......@@ -69,4 +69,76 @@ int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pO
}
return 0;
}
\ No newline at end of file
}
void initOffsetForAllRestoreTasks(STQ* pTq) {
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pRestoreTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->taskStatus);
continue;
}
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, pTask->startVer);
}
}
}
void saveOffsetForAllTasks(STQ* pTq, int64_t ver) {
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->taskStatus);
continue;
}
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, ver);
}
}
}
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) {
STqOffset offset = {0};
tqOffsetResetToLog(&offset.val, ver);
tstrncpy(offset.subKey, pKey, tListLen(offset.subKey));
// keep the offset info in the offset store
tqOffsetWrite(pOffsetStore, &offset);
}
......@@ -127,14 +127,9 @@ enum {
};
typedef struct {
// TODO remove prepareStatus
// STqOffsetVal prepareStatus; // for tmq
STqOffsetVal currentOffset; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
// int8_t returned;
int64_t snapshotVer;
// const SSubmitReq* pReq;
STqOffsetVal currentOffset; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int64_t snapshotVer;
SPackedData submit;
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN];
......@@ -144,6 +139,7 @@ typedef struct {
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
SStreamState* pState;
int64_t dataVersion;
} SStreamTaskInfo;
typedef struct {
......@@ -191,7 +187,6 @@ enum {
OP_OPENED = 0x1,
OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9,
// OP_EXEC_RECV = 0x11,
};
typedef struct SOperatorFpSet {
......@@ -560,6 +555,7 @@ typedef struct SStreamIntervalOperatorInfo {
uint64_t numOfDatapack;
SArray* pUpdated;
SSHashObj* pUpdatedMap;
int64_t dataVersion;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
......@@ -609,6 +605,7 @@ typedef struct SStreamSessionAggOperatorInfo {
bool ignoreExpiredDataSaved;
SArray* pUpdated;
SSHashObj* pStUpdated;
int64_t dataVersion;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
......@@ -627,6 +624,7 @@ typedef struct SStreamStateAggOperatorInfo {
bool ignoreExpiredDataSaved;
SArray* pUpdated;
SSHashObj* pSeUpdated;
int64_t dataVersion;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
......
......@@ -198,6 +198,12 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
return code;
}
int64_t qGetCheckpointVersion(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = tinfo;
return pTaskInfo->streamInfo.dataVersion;
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR;
......
......@@ -2333,9 +2333,14 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
return startPos;
}
static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version) {
pTaskInfo->streamInfo.dataVersion = version;
}
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
SSHashObj* pUpdatedMap) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
......@@ -2501,6 +2506,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer
clearStreamIntervalOperator(pInfo);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion);
qDebug("===stream===clear semi operator");
} else {
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
......@@ -2774,6 +2780,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->dataVersion = 0;
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true;
......@@ -3124,6 +3131,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
int32_t rows = pSDataBlock->info.rows;
int32_t winRows = 0;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = NULL;
......@@ -3587,6 +3596,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL;
pInfo->pStUpdated = NULL;
pInfo->dataVersion = 0;
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
OP_NOT_OPENED, pInfo, pTaskInfo);
......@@ -3897,6 +3907,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t winRows = 0;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
......@@ -4113,6 +4126,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL;
pInfo->pSeUpdated = NULL;
pInfo->dataVersion = 0;
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
......@@ -4748,6 +4762,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
&pInfo->delKey);
setOperatorCompleted(pOperator);
streamStateCommit(pTaskInfo->streamInfo.pState);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion);
return NULL;
}
......
......@@ -21,8 +21,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
while(pTask->taskLevel == TASK_LEVEL__SOURCE && atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
qError("stream task wait for the end of fill history");
while (pTask->taskLevel == TASK_LEVEL__SOURCE && atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
atomic_load_8(&pTask->taskStatus));
taosMsleep(2);
continue;
}
......@@ -236,7 +237,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
qDebug("stream task exec over, queue empty, task: %d", pTask->id.taskId);
qDebug("s-task:%s stream task exec over, queue empty", pTask->id.idStr);
break;
}
......@@ -284,7 +285,19 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamTaskExecImpl(pTask, pInput, pRes);
qDebug("s-task:%s exec end", pTask->id.idStr);
int64_t ckVer = qGetCheckpointVersion(pTask->exec.pExecutor);
if (ckVer > pTask->startVer) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->startVer, ckVer);
pTask->startVer = ckVer;
streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) {
qError("failed to commit stream meta, since %s", terrstr());
return -1;
}
} else {
qDebug("s-task:%s exec end", pTask->id.idStr);
}
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
......@@ -333,6 +346,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
return -1;
}
// todo the task should be commit here
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
if (!taosQueueEmpty(pTask->inputQueue->queue)) {
......
......@@ -195,17 +195,12 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__DROPPING) {
atomic_add_fetch_32(&pTask->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return pTask;
} else {
taosRUnLockLatch(&pMeta->lock);
return NULL;
}
if (ppTask != NULL && (atomic_load_8(&((*ppTask)->taskStatus)) != TASK_STATUS__DROPPING)) {
atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return *ppTask;
}
taosRUnLockLatch(&pMeta->lock);
return NULL;
}
......@@ -219,6 +214,37 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
}
}
SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
SStreamTask* pTask = NULL;
int32_t numOfRestored = taosHashGetSize(pMeta->pRestoreTasks);
if (numOfRestored > 0) {
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pRestoreTasks, &taskId, sizeof(int32_t));
if (p != NULL) {
pTask = *p;
if (pTask != NULL && (atomic_load_8(&(pTask->taskStatus)) != TASK_STATUS__DROPPING)) {
atomic_add_fetch_32(&pTask->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return pTask;
}
}
} else {
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (p != NULL) {
pTask = *p;
if (pTask != NULL && atomic_load_8(&(pTask->taskStatus)) != TASK_STATUS__DROPPING) {
atomic_add_fetch_32(&pTask->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return pTask;
}
}
}
taosRUnLockLatch(&pMeta->lock);
return NULL;
}
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
......
......@@ -218,7 +218,7 @@ STaosQueue *tAutoQWorkerAllocQueue(SAutoQWorkerPool *pool, void *ahandle, FItem
int32_t queueNum = taosGetQueueNumber(pool->qset);
int32_t curWorkerNum = taosArrayGetSize(pool->workers);
int32_t dstWorkerNum = ceil(queueNum * pool->ratio);
if (dstWorkerNum < 1) dstWorkerNum = 1;
if (dstWorkerNum < 2) dstWorkerNum = 2;
// spawn a thread to process queue
while (curWorkerNum < dstWorkerNum) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册