提交 47b492b3 编写于 作者: dengyihao's avatar dengyihao

avoid race

上级 21f84693
...@@ -2180,43 +2180,56 @@ static void doFreeInsertSupporter(SSqlObj* pSqlObj) { ...@@ -2180,43 +2180,56 @@ static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
tfree(pSql->param); tfree(pSql->param);
} }
} }
static bool doCheckAllSubObj(SSqlObj* pSqlObj, int *numOfRows, int8_t *buildSchema) {
assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0);
bool res = true;
for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) {
SSqlObj* pSql = pSqlObj->pSubs[i];
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
res = false;
if (pSql->cmd.submitSchema) {
*buildSchema = 1;
}
} else {
*numOfRows += pSql->res.numOfRows;
}
}
return res;
}
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param; SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql; SSqlObj* pParentObj = pSupporter->pSql;
SSqlObj* pSubObj = (SSqlObj*) tres;
// record the total inserted rows // record the total inserted rows
if (numOfRows > 0) { if (numOfRows > 0) {
pParentObj->res.numOfRows += numOfRows; pSubObj->res.numOfRows = numOfRows;
}
if (taos_errno(tres) != TSDB_CODE_SUCCESS) {
SSqlObj* pSql = (SSqlObj*) tres;
assert(pSql != NULL && pSql->res.code == numOfRows);
pParentObj->res.code = pSql->res.code;
// set the flag in the parent sqlObj
if (pSql->cmd.submitSchema) {
pParentObj->cmd.submitSchema = 1;
}
} }
if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) { if (atomic_sub_fetch_32(&pParentObj->subState.numOfRemain, 1) > 0) {
return; return;
} }
if (0 != atomic_load_32(&pParentObj->subState.numOfRemain)) {
tscError("%p impossible", pParentObj);
}
int totalRows = 0;
int8_t submitSchema = 0;
bool res = doCheckAllSubObj(pParentObj, &totalRows, &submitSchema);
// restore user defined fp // restore user defined fp
pParentObj->fp = pParentObj->fetchFp; pParentObj->fp = pParentObj->fetchFp;
pParentObj->cmd.submitSchema = submitSchema;
pParentObj->res.numOfRows = totalRows;
int32_t numOfSub = pParentObj->subState.numOfSub; int32_t numOfSub = pParentObj->subState.numOfSub;
if (res == true) {
if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
doFreeInsertSupporter(pParentObj); doFreeInsertSupporter(pParentObj);
// todo remove this parameter in async callback function definition. // todo remove this parameter in async callback function definition.
// all data has been sent to vnode, call user function // all data has been sent to vnode, call user function
int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows; //int32_t v = (pParentObj->res.code != TSDB_CODE_SUCCESS) ? pParentObj->res.code : (int32_t)pParentObj->res.numOfRows;
int32_t v = totalRows;
(*pParentObj->fp)(pParentObj->param, pParentObj, v); (*pParentObj->fp)(pParentObj->param, pParentObj, v);
} else { } else {
if (!needRetryInsert(pParentObj, numOfSub)) { if (!needRetryInsert(pParentObj, numOfSub)) {
...@@ -2242,7 +2255,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2242,7 +2255,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
} }
tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj, tscError("%p Async insertion completed, total inserted:%d rows, numOfFailed:%d, numOfTotal:%d", pParentObj,
pParentObj->res.numOfRows, numOfFailed, numOfSub); totalRows, numOfFailed, numOfSub);
tscDebug("%p cleanup %d tableMeta in cache", pParentObj, pParentObj->cmd.numOfTables); tscDebug("%p cleanup %d tableMeta in cache", pParentObj, pParentObj->cmd.numOfTables);
for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) { for(int32_t i = 0; i < pParentObj->cmd.numOfTables; ++i) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册