未验证 提交 4f88eef7 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #5065 from taosdata/hotfix/TD-2844

[TD-2844]taosd crashes when creating stream
......@@ -50,10 +50,13 @@ typedef struct {
void *dbConn;
void *tmrCtrl;
pthread_mutex_t mutex;
int32_t delete;
int32_t cqObjNum;
} SCqContext;
typedef struct SCqObj {
tmr_h tmrId;
int64_t rid;
uint64_t uid;
int32_t tid; // table ID
int32_t rowSize; // bytes of a row
......@@ -69,6 +72,84 @@ typedef struct SCqObj {
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
int32_t cqObjRef = -1;
void cqRmFromList(SCqObj *pObj) {
//LOCK in caller
SCqContext *pContext = pObj->pContext;
if (pObj->prev) {
pObj->prev->next = pObj->next;
} else {
pContext->pHead = pObj->next;
}
if (pObj->next) {
pObj->next->prev = pObj->prev;
}
}
void cqFree(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqObj *pObj = handle;
SCqContext *pContext = pObj->pContext;
int32_t delete = 0;
pthread_mutex_lock(&pContext->mutex);
// free the resources associated
if (pObj->pStream) {
taos_close_stream(pObj->pStream);
pObj->pStream = NULL;
} else {
taosTmrStop(pObj->tmrId);
pObj->tmrId = 0;
}
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema);
free(pObj->dstTable);
free(pObj->sqlStr);
free(pObj);
pContext->cqObjNum--;
if (pContext->cqObjNum <= 0 && pContext->delete) {
delete = 1;
}
pthread_mutex_unlock(&pContext->mutex);
if (delete) {
pthread_mutex_unlock(&pContext->mutex);
pthread_mutex_destroy(&pContext->mutex);
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
cDebug("vgId:%d, CQ is closed", pContext->vgId);
free(pContext);
}
}
void cqCreateRef() {
int32_t ref = atomic_load_32(&cqObjRef);
if (ref == -1) {
ref = taosOpenRef(4096, cqFree);
if (atomic_val_compare_exchange_32(&cqObjRef, -1, ref) != -1) {
taosCloseRef(ref);
}
}
}
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
if (tsEnableStream == 0) {
return NULL;
......@@ -79,6 +160,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
return NULL;
}
cqCreateRef();
pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");
tstrncpy(pContext->user, pCfg->user, sizeof(pContext->user));
......@@ -97,6 +180,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
pthread_mutex_init(&pContext->mutex, NULL);
cDebug("vgId:%d, CQ is opened", pContext->vgId);
return pContext;
......@@ -109,30 +193,30 @@ void cqClose(void *handle) {
SCqContext *pContext = handle;
if (handle == NULL) return;
pContext->delete = 1;
// stop all CQs
cqStop(pContext);
// free all resources
pthread_mutex_lock(&pContext->mutex);
SCqObj *pObj = pContext->pHead;
while (pObj) {
SCqObj *pTemp = pObj;
pObj = pObj->next;
tdFreeSchema(pTemp->pSchema);
tfree(pTemp->sqlStr);
free(pTemp);
}
pthread_mutex_unlock(&pContext->mutex);
int64_t rid = 0;
pthread_mutex_destroy(&pContext->mutex);
while (1) {
pthread_mutex_lock(&pContext->mutex);
taosTmrCleanUp(pContext->tmrCtrl);
pContext->tmrCtrl = NULL;
SCqObj *pObj = pContext->pHead;
if (pObj) {
cqRmFromList(pObj);
cDebug("vgId:%d, CQ is closed", pContext->vgId);
free(pContext);
rid = pObj->rid;
} else {
pthread_mutex_unlock(&pContext->mutex);
break;
}
pthread_mutex_unlock(&pContext->mutex);
taosRemoveRef(cqObjRef, rid);
}
}
void cqStart(void *handle) {
......@@ -191,7 +275,8 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
return NULL;
}
SCqContext *pContext = handle;
int64_t rid = 0;
SCqObj *pObj = calloc(sizeof(SCqObj), 1);
if (pObj == NULL) return NULL;
......@@ -213,32 +298,36 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
if (pContext->pHead) pContext->pHead->prev = pObj;
pContext->pHead = pObj;
pContext->cqObjNum++;
pObj->rid = taosAddRef(cqObjRef, pObj);
cqCreateStream(pContext, pObj);
rid = pObj->rid;
pthread_mutex_unlock(&pContext->mutex);
return pObj;
return (void *)rid;
}
void cqDrop(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqObj *pObj = handle;
SCqContext *pContext = pObj->pContext;
pthread_mutex_lock(&pContext->mutex);
if (pObj->prev) {
pObj->prev->next = pObj->next;
} else {
pContext->pHead = pObj->next;
}
if (pObj->next) {
pObj->next->prev = pObj->prev;
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)handle);
if (pObj == NULL) {
return;
}
SCqContext *pContext = pObj->pContext;
pthread_mutex_lock(&pContext->mutex);
cqRmFromList(pObj);
// free the resources associated
if (pObj->pStream) {
taos_close_stream(pObj->pStream);
......@@ -248,17 +337,18 @@ void cqDrop(void *handle) {
pObj->tmrId = 0;
}
cInfo("vgId:%d, id:%d CQ:%s is dropped", pContext->vgId, pObj->tid, pObj->sqlStr);
tdFreeSchema(pObj->pSchema);
free(pObj->dstTable);
free(pObj->sqlStr);
free(pObj);
pthread_mutex_unlock(&pContext->mutex);
taosRemoveRef(cqObjRef, (int64_t)handle);
taosReleaseRef(cqObjRef, (int64_t)handle);
}
static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
SCqObj* pObj = (SCqObj*)param;
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
if (pObj == NULL) {
return;
}
SCqContext* pContext = pObj->pContext;
SSqlObj* pSql = (SSqlObj*)result;
if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) {
......@@ -267,10 +357,16 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {
pthread_mutex_lock(&pContext->mutex);
cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex);
taosReleaseRef(cqObjRef, (int64_t)param);
}
static void cqProcessCreateTimer(void *param, void *tmrId) {
SCqObj* pObj = (SCqObj*)param;
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
if (pObj == NULL) {
return;
}
SCqContext* pContext = pObj->pContext;
if (pContext->dbConn == NULL) {
......@@ -281,6 +377,8 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex);
}
taosReleaseRef(cqObjRef, (int64_t)param);
}
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
......@@ -288,13 +386,13 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
if (pContext->dbConn == NULL) {
cDebug("vgId:%d, create dbConn after 1000 ms", pContext->vgId);
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, pObj, pContext->tmrCtrl);
pObj->tmrId = taosTmrStart(cqProcessCreateTimer, 1000, (void *)pObj->rid, pContext->tmrCtrl);
return;
}
pObj->tmrId = 0;
if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL);
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, (void *)pObj->rid, NULL);
// TODO the pObj->pStream may be released if error happens
if (pObj->pStream) {
......@@ -308,18 +406,28 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
}
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SCqObj *pObj = (SCqObj *)param;
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)param);
if (pObj == NULL) {
return;
}
if (tres == NULL && row == NULL) {
taos_close_stream(pObj->pStream);
pObj->pStream = NULL;
taosReleaseRef(cqObjRef, (int64_t)param);
return;
}
SCqContext *pContext = pObj->pContext;
STSchema *pSchema = pObj->pSchema;
if (pObj->pStream == NULL) return;
if (pObj->pStream == NULL) {
taosReleaseRef(cqObjRef, (int64_t)param);
return;
}
cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
......@@ -370,5 +478,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
// write into vnode write queue
pContext->cqWrite(pContext->vgId, pHead, TAOS_QTYPE_CQ, NULL);
free(buffer);
taosReleaseRef(cqObjRef, (int64_t)param);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册