提交 52b63db9 编写于 作者: S Shengliang Guan

fix: invalid vnode ref while drop stream

上级 c1498e4a
...@@ -44,6 +44,8 @@ typedef struct STaosQset STaosQset; ...@@ -44,6 +44,8 @@ typedef struct STaosQset STaosQset;
typedef struct STaosQall STaosQall; typedef struct STaosQall STaosQall;
typedef struct { typedef struct {
void *ahandle; void *ahandle;
void *fp;
void *queue;
int32_t workerId; int32_t workerId;
int32_t threadNum; int32_t threadNum;
int64_t timestamp; int64_t timestamp;
...@@ -81,8 +83,8 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); ...@@ -81,8 +83,8 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle);
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue);
int32_t taosGetQueueNumber(STaosQset *qset); int32_t taosGetQueueNumber(STaosQset *qset);
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo);
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo);
void taosResetQsetThread(STaosQset *qset, void *pItem); void taosResetQsetThread(STaosQset *qset, void *pItem);
extern int64_t tsRpcQueueMemoryAllowed; extern int64_t tsRpcQueueMemoryAllowed;
......
...@@ -242,13 +242,13 @@ static int32_t syncIOStopInternal(SSyncIO *io) { ...@@ -242,13 +242,13 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
} }
static void *syncIOConsumerFunc(void *param) { static void *syncIOConsumerFunc(void *param) {
SSyncIO * io = param; SSyncIO *io = param;
STaosQall *qall; STaosQall *qall = taosAllocateQall();
SRpcMsg * pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
qall = taosAllocateQall(); SQueueInfo qinfo = {0};
while (1) { while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, &qinfo);
sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs); sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs);
if (numOfMsgs <= 0) { if (numOfMsgs <= 0) {
break; break;
......
...@@ -115,7 +115,7 @@ bool taosQueueEmpty(STaosQueue *queue) { ...@@ -115,7 +115,7 @@ bool taosQueueEmpty(STaosQueue *queue) {
bool empty = false; bool empty = false;
taosThreadMutexLock(&queue->mutex); taosThreadMutexLock(&queue->mutex);
if (queue->head == NULL && queue->tail == NULL) { if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 && queue->memOfItems == 0) {
empty = true; empty = true;
} }
taosThreadMutexUnlock(&queue->mutex); taosThreadMutexUnlock(&queue->mutex);
...@@ -397,7 +397,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { ...@@ -397,7 +397,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp) { int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) {
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -417,9 +417,10 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void ...@@ -417,9 +417,10 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
*ppItem = pNode->item; *ppItem = pNode->item;
if (ahandle) *ahandle = queue->ahandle; qinfo->ahandle = queue->ahandle;
if (itemFp) *itemFp = queue->itemFp; qinfo->fp = queue->itemFp;
if (ts) *ts = pNode->timestamp; qinfo->queue = queue;
qinfo->timestamp = pNode->timestamp;
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL; if (queue->head == NULL) queue->tail = NULL;
...@@ -440,7 +441,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void ...@@ -440,7 +441,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void
return code; return code;
} }
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) { int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) {
STaosQueue *queue; STaosQueue *queue;
int32_t code = 0; int32_t code = 0;
...@@ -461,13 +462,16 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand ...@@ -461,13 +462,16 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
qall->start = queue->head; qall->start = queue->head;
qall->numOfItems = queue->numOfItems; qall->numOfItems = queue->numOfItems;
code = qall->numOfItems; code = qall->numOfItems;
if (ahandle) *ahandle = queue->ahandle; qinfo->ahandle = queue->ahandle;
if (itemsFp) *itemsFp = queue->itemsFp; qinfo->fp = queue->itemsFp;
qinfo->queue = queue;
queue->head = NULL; queue->head = NULL;
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; queue->numOfItems = 0;
queue->memOfItems = 0; queue->memOfItems = 0;
uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems);
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
for (int32_t j = 1; j < qall->numOfItems; ++j) { for (int32_t j = 1; j < qall->numOfItems; ++j) {
tsem_wait(&qset->sem); tsem_wait(&qset->sem);
......
...@@ -70,26 +70,24 @@ void tQWorkerCleanup(SQWorkerPool *pool) { ...@@ -70,26 +70,24 @@ void tQWorkerCleanup(SQWorkerPool *pool) {
static void *tQWorkerThreadFp(SQWorker *worker) { static void *tQWorkerThreadFp(SQWorker *worker) {
SQWorkerPool *pool = worker->pool; SQWorkerPool *pool = worker->pool;
FItem fp = NULL; SQueueInfo qinfo = {0};
void *msg = NULL;
void *msg = NULL; int32_t code = 0;
void *ahandle = NULL;
int32_t code = 0;
int64_t ts = 0;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
setThreadName(pool->name); setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id); uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) { while (1) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ts, &ahandle, &fp) == 0) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break; break;
} }
if (fp != NULL) { if (qinfo.fp != NULL) {
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num, .timestamp = ts}; qinfo.workerId = worker->id;
(*fp)(&info, msg); qinfo.threadNum = pool->num;
(*((FItem)qinfo.fp))(&qinfo, msg);
} }
} }
...@@ -195,27 +193,26 @@ void tWWorkerCleanup(SWWorkerPool *pool) { ...@@ -195,27 +193,26 @@ void tWWorkerCleanup(SWWorkerPool *pool) {
static void *tWWorkerThreadFp(SWWorker *worker) { static void *tWWorkerThreadFp(SWWorker *worker) {
SWWorkerPool *pool = worker->pool; SWWorkerPool *pool = worker->pool;
FItems fp = NULL; SQueueInfo qinfo = {0};
void *msg = NULL;
void *msg = NULL; int32_t code = 0;
void *ahandle = NULL; int32_t numOfMsgs = 0;
int32_t numOfMsgs = 0;
int32_t qtype = 0;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
setThreadName(pool->name); setThreadName(pool->name);
uDebug("worker:%s:%d is running", pool->name, worker->id); uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp); numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset);
break; break;
} }
if (fp != NULL) { if (qinfo.fp != NULL) {
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; qinfo.workerId = worker->id;
(*fp)(&info, worker->qall, numOfMsgs); qinfo.threadNum = pool->num;
(*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册