提交 94b26bfe 编写于 作者: H Haojun Liao

[td-225]

上级 2855bd73
...@@ -220,7 +220,7 @@ void tscExprDestroy(SArray* pExprInfo); ...@@ -220,7 +220,7 @@ void tscExprDestroy(SArray* pExprInfo);
int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num); int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num);
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta); void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta, uint64_t id);
SColumn* tscColumnClone(const SColumn* src); SColumn* tscColumnClone(const SColumn* src);
void tscColumnCopy(SColumn* pDest, const SColumn* pSrc); void tscColumnCopy(SColumn* pDest, const SColumn* pSrc);
...@@ -318,7 +318,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex); ...@@ -318,7 +318,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql); bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql); bool hasMoreClauseToTry(SSqlObj* pSql);
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta); void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeCachedMeta, uint64_t id);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
...@@ -356,7 +356,7 @@ char* strdup_throw(const char* str); ...@@ -356,7 +356,7 @@ char* strdup_throw(const char* str);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src); bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg); SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id); void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -368,7 +368,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc ...@@ -368,7 +368,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent); void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
void destroyTableNameList(SInsertStatementParam* pInsertParam); void destroyTableNameList(SInsertStatementParam* pInsertParam);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta, uint64_t id);
/** /**
* free query result of the sql object * free query result of the sql object
......
...@@ -1922,7 +1922,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1922,7 +1922,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
if (pSql->parseRetry < 1 && (ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION)) { if (pSql->parseRetry < 1 && (ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION)) {
tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret)); tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true); tscResetSqlCmd(pCmd, true, pSql->self);
pSql->parseRetry++; pSql->parseRetry++;
if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) { if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
...@@ -1939,7 +1939,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1939,7 +1939,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && sqlInfo.type == TSDB_SQL_SELECT) { if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && sqlInfo.type == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64 " parse query sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret)); tscDebug("0x%"PRIx64 " parse query sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true); tscResetSqlCmd(pCmd, true, pSql->self);
pSql->parseRetry++; pSql->parseRetry++;
ret = tscValidateSqlInfo(pSql, &sqlInfo); ret = tscValidateSqlInfo(pSql, &sqlInfo);
......
...@@ -1694,7 +1694,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags ...@@ -1694,7 +1694,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) { if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) {
SHashObj* hashList = pCmd->insertParam.pTableBlockHashList; SHashObj* hashList = pCmd->insertParam.pTableBlockHashList;
pCmd->insertParam.pTableBlockHashList = NULL; pCmd->insertParam.pTableBlockHashList = NULL;
tscResetSqlCmd(pCmd, false); tscResetSqlCmd(pCmd, false, pSql->self);
pCmd->insertParam.pTableBlockHashList = hashList; pCmd->insertParam.pTableBlockHashList = hashList;
} }
......
...@@ -8483,7 +8483,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8483,7 +8483,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) {
clearAllTableMetaInfo(pQueryInfo, false); clearAllTableMetaInfo(pQueryInfo, false, pSql->self);
pQueryInfo->numOfTables = 0; pQueryInfo->numOfTables = 0;
// parse the subquery in the first place // parse the subquery in the first place
......
...@@ -2581,7 +2581,7 @@ int tscProcessDropDbRsp(SSqlObj *pSql) { ...@@ -2581,7 +2581,7 @@ int tscProcessDropDbRsp(SSqlObj *pSql) {
int tscProcessDropTableRsp(SSqlObj *pSql) { int tscProcessDropTableRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self); tscRemoveCachedTableMeta(pTableMetaInfo, pSql->self);
tfree(pTableMetaInfo->pTableMeta); tfree(pTableMetaInfo->pTableMeta);
return 0; return 0;
} }
...@@ -2967,11 +2967,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { ...@@ -2967,11 +2967,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
// remove stored tableMeta info in hash table // remove stored tableMeta info in hash table
tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self); tscResetSqlCmd(pCmd, true, pSql->self);
tscResetSqlCmd(pCmd, true);
// pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
// pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
SArray* pNameList = taosArrayInit(1, POINTER_BYTES); SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
......
...@@ -142,6 +142,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { ...@@ -142,6 +142,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if(pSql == NULL) { if(pSql == NULL) {
return ; return ;
} }
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
tscDebug("0x%"PRIx64" add into timer", pSql->self); tscDebug("0x%"PRIx64" add into timer", pSql->self);
...@@ -186,7 +187,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { ...@@ -186,7 +187,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
} }
// launch stream computing in a new thread // launch stream computing in a new thread
SSchedMsg schedMsg = { 0 }; SSchedMsg schedMsg = {0};
schedMsg.fp = tscProcessStreamLaunchQuery; schedMsg.fp = tscProcessStreamLaunchQuery;
schedMsg.ahandle = pStream; schedMsg.ahandle = pStream;
schedMsg.thandle = (void *)1; schedMsg.thandle = (void *)1;
...@@ -194,6 +195,8 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { ...@@ -194,6 +195,8 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
taosScheduleTask(tscQhandle, &schedMsg); taosScheduleTask(tscQhandle, &schedMsg);
} }
static void cbParseSql(void* param, TAOS_RES* res, int code);
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) { static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) {
SSqlStream *pStream = (SSqlStream *)param; SSqlStream *pStream = (SSqlStream *)param;
if (tres == NULL || numOfRows < 0) { if (tres == NULL || numOfRows < 0) {
...@@ -201,24 +204,26 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf ...@@ -201,24 +204,26 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self, tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self,
pStream, numOfRows, retryDelay); pStream, numOfRows, retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0); SSqlObj* pSql = pStream->pSql;
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
tfree(pTableMetaInfo->pTableMeta);
tscFreeSqlResult(pStream->pSql); tscFreeSqlResult(pSql);
tscFreeSubobj(pStream->pSql); tscFreeSubobj(pSql);
tfree(pStream->pSql->pSubs); tfree(pSql->pSubs);
pStream->pSql->subState.numOfSub = 0; pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
cbParseSql(pStream, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscDebug("0x%"PRIx64" CQ taso_open_stream IN Process", pSql->self);
} else {
tscError("0x%"PRIx64" open stream failed, code:%s", pSql->self, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self);
free(pStream);
}
tscSetRetryTimer(pStream, pStream->pSql, retryDelay); // tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return; // return;
} }
taos_fetch_rows_a(tres, tscProcessStreamRetrieveResult, param); taos_fetch_rows_a(tres, tscProcessStreamRetrieveResult, param);
...@@ -555,7 +560,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -555,7 +560,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code; pSql->res.code = code;
tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code)); tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
pStream->fp(pStream->param, NULL, NULL); pStream->fp(pStream->param, NULL, NULL);
return; return;
} }
...@@ -582,9 +586,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -582,9 +586,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
// set stime with ltime if ltime > stime // set stime with ltime if ltime > stime
const char* dstTable = pStream->dstTable? pStream->dstTable: ""; const char* dstTable = pStream->dstTable? pStream->dstTable: "";
tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime); tscDebug("0x%"PRIx64" CQ table %s ltime is %"PRId64, pSql->self, dstTable, pStream->ltime);
if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) { if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) {
tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0 ", dstTable, pStream->stime, pStream->ltime); tscWarn("0x%"PRIx64" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime > 0", pSql->self, dstTable, pStream->stime, pStream->ltime);
pStream->stime = pStream->ltime; pStream->stime = pStream->ltime;
} }
...@@ -592,7 +597,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { ...@@ -592,7 +597,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
tscAddIntoStreamList(pStream); tscAddIntoStreamList(pStream);
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer); taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("0x%"PRIx64" stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql->self, tscDebug("0x%"PRIx64" stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql->self,
...@@ -659,10 +663,9 @@ void cbParseSql(void* param, TAOS_RES* res, int code) { ...@@ -659,10 +663,9 @@ void cbParseSql(void* param, TAOS_RES* res, int code) {
char sql[128] = ""; char sql[128] = "";
sprintf(sql, "select last_row(*) from %s;", pStream->dstTable); sprintf(sql, "select last_row(*) from %s;", pStream->dstTable);
taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param); taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param);
return ;
} }
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
int64_t stime, void *param, void (*callback)(void *), void* cqhandle) { int64_t stime, void *param, void (*callback)(void *), void* cqhandle) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL; if (pObj == NULL || pObj->signature != pObj) return NULL;
...@@ -697,14 +700,12 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c ...@@ -697,14 +700,12 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pStream->param = param; pStream->param = param;
pStream->pSql = pSql; pStream->pSql = pSql;
pStream->cqhandle = cqhandle; pStream->cqhandle = cqhandle;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
tscSetStreamDestTable(pStream, dstTable); tscSetStreamDestTable(pStream, dstTable);
pSql->pStream = pStream; pSql->pStream = pStream;
pSql->param = pStream; pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self); tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
...@@ -725,14 +726,13 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c ...@@ -725,14 +726,13 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pSql->fp = cbParseSql; pSql->fp = cbParseSql;
pSql->fetchFp = cbParseSql; pSql->fetchFp = cbParseSql;
registerSqlObj(pSql); registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
cbParseSql(pStream, pSql, code); cbParseSql(pStream, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr); tscDebug("0x%"PRIx64" CQ taso_open_stream IN Process", pSql->self);
} else { } else {
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code)); tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
...@@ -743,7 +743,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c ...@@ -743,7 +743,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
return pStream; return pStream;
} }
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
int64_t stime, void *param, void (*callback)(void *)) { int64_t stime, void *param, void (*callback)(void *)) {
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL); return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL);
} }
......
...@@ -2718,17 +2718,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -2718,17 +2718,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
int32_t code = pParentSql->res.code; int32_t code = pParentSql->res.code;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) { if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) {
// remove the cached tableMeta and vgroup id list, and then parse the sql again // remove the cached tableMeta and vgroup id list, and then parse the sql again
SSqlCmd* pParentCmd = &pParentSql->cmd; tscResetSqlCmd( &pParentSql->cmd, true, pParentSql->self);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
tscResetSqlCmd(pParentCmd, true);
// pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
// pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++; pParentSql->retry++;
pParentSql->res.code = TSDB_CODE_SUCCESS;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry); tstrerror(code), pParentSql->retry);
...@@ -3143,7 +3136,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3143,7 +3136,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
numOfFailed += 1; numOfFailed += 1;
// clean up tableMeta in cache // clean up tableMeta in cache
tscFreeQueryInfo(&pSql->cmd, false); tscFreeQueryInfo(&pSql->cmd, false, pSql->self);
SQueryInfo* pQueryInfo = tscGetQueryInfoS(&pSql->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfoS(&pSql->cmd);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0);
tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
...@@ -3165,7 +3158,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3165,7 +3158,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
} }
pParentObj->res.code = TSDB_CODE_SUCCESS; pParentObj->res.code = TSDB_CODE_SUCCESS;
tscResetSqlCmd(&pParentObj->cmd, false); tscResetSqlCmd(&pParentObj->cmd, false, pParentObj->self);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons: // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly. // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
......
...@@ -1300,12 +1300,13 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) { ...@@ -1300,12 +1300,13 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
} }
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeCachedMeta, uint64_t id) {
if (pCmd == NULL) { if (pCmd == NULL) {
return; return;
} }
SQueryInfo* pQueryInfo = pCmd->pQueryInfo; SQueryInfo* pQueryInfo = pCmd->pQueryInfo;
while(pQueryInfo != NULL) { while(pQueryInfo != NULL) {
SQueryInfo* p = pQueryInfo->sibling; SQueryInfo* p = pQueryInfo->sibling;
...@@ -1314,7 +1315,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { ...@@ -1314,7 +1315,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
SQueryInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pUpstream, i); SQueryInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pUpstream, i);
freeQueryInfoImpl(pUpQueryInfo); freeQueryInfoImpl(pUpQueryInfo);
clearAllTableMetaInfo(pUpQueryInfo, removeMeta); clearAllTableMetaInfo(pUpQueryInfo, removeCachedMeta, id);
if (pUpQueryInfo->pQInfo != NULL) { if (pUpQueryInfo->pQInfo != NULL) {
qDestroyQueryInfo(pUpQueryInfo->pQInfo); qDestroyQueryInfo(pUpQueryInfo->pQInfo);
pUpQueryInfo->pQInfo = NULL; pUpQueryInfo->pQInfo = NULL;
...@@ -1330,7 +1331,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { ...@@ -1330,7 +1331,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
} }
freeQueryInfoImpl(pQueryInfo); freeQueryInfoImpl(pQueryInfo);
clearAllTableMetaInfo(pQueryInfo, removeMeta); clearAllTableMetaInfo(pQueryInfo, removeCachedMeta, id);
if (pQueryInfo->pQInfo != NULL) { if (pQueryInfo->pQInfo != NULL) {
qDestroyQueryInfo(pQueryInfo->pQInfo); qDestroyQueryInfo(pQueryInfo->pQInfo);
...@@ -1359,7 +1360,7 @@ void destroyTableNameList(SInsertStatementParam* pInsertParam) { ...@@ -1359,7 +1360,7 @@ void destroyTableNameList(SInsertStatementParam* pInsertParam) {
tfree(pInsertParam->pTableNameList); tfree(pInsertParam->pTableNameList);
} }
void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) { void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) {
pCmd->command = 0; pCmd->command = 0;
pCmd->numOfCols = 0; pCmd->numOfCols = 0;
pCmd->count = 0; pCmd->count = 0;
...@@ -1373,20 +1374,8 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) { ...@@ -1373,20 +1374,8 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
tfree(pCmd->insertParam.tagData.data); tfree(pCmd->insertParam.tagData.data);
pCmd->insertParam.tagData.dataLen = 0; pCmd->insertParam.tagData.dataLen = 0;
tscFreeQueryInfo(pCmd, clearCachedMeta); tscFreeQueryInfo(pCmd, clearCachedMeta, id);
pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
// if (pCmd->pTableMetaMap != NULL) {
// STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL);
// while (p) {
// taosArrayDestroy(p->vgroupIdList);
// tfree(p->pTableMeta);
// p = taosHashIterate(pCmd->pTableMetaMap, p);
// }
//
// taosHashCleanup(pCmd->pTableMetaMap);
// pCmd->pTableMetaMap = NULL;
// }
} }
void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) { void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) {
...@@ -1501,7 +1490,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -1501,7 +1490,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pSql->self = 0; pSql->self = 0;
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
tscResetSqlCmd(pCmd, false); tscResetSqlCmd(pCmd, false, pSql->self);
memset(pCmd->payload, 0, (size_t)pCmd->allocSize); memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
tfree(pCmd->payload); tfree(pCmd->payload);
...@@ -3369,20 +3358,15 @@ SArray* tscVgroupTableInfoDup(SArray* pVgroupTables) { ...@@ -3369,20 +3358,15 @@ SArray* tscVgroupTableInfoDup(SArray* pVgroupTables) {
return pa; return pa;
} }
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) { void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta, uint64_t id) {
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (removeMeta) { if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0}; tscRemoveCachedTableMeta(pTableMetaInfo, id);
tNameExtractFullName(&pTableMetaInfo->name, name);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
} }
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
tscClearTableMetaInfo(pTableMetaInfo); tscClearTableMetaInfo(pTableMetaInfo);
free(pTableMetaInfo);
} }
tfree(pQueryInfo->pTableMetaInfo); tfree(pQueryInfo->pTableMetaInfo);
...@@ -3449,10 +3433,12 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo) { ...@@ -3449,10 +3433,12 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo) {
} }
tfree(pTableMetaInfo->pTableMeta); tfree(pTableMetaInfo->pTableMeta);
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscColumnListDestroy(pTableMetaInfo->tagColList); tscColumnListDestroy(pTableMetaInfo->tagColList);
pTableMetaInfo->tagColList = NULL; pTableMetaInfo->tagColList = NULL;
free(pTableMetaInfo);
} }
void tscResetForNextRetrieve(SSqlRes* pRes) { void tscResetForNextRetrieve(SSqlRes* pRes) {
...@@ -3845,14 +3831,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3845,14 +3831,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
// todo refactor // todo refactor
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self); tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
tscResetSqlCmd(&pParentSql->cmd, true, pParentSql->self);
SSqlCmd* pParentCmd = &pParentSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
tscResetSqlCmd(pParentCmd, true);
// pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
// pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pParentSql->res.code = TSDB_CODE_SUCCESS; pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++; pParentSql->retry++;
...@@ -3871,7 +3850,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3871,7 +3850,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
return; return;
} }
SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
executeQuery(pParentSql, pQueryInfo); executeQuery(pParentSql, pQueryInfo);
return; return;
} }
...@@ -4995,7 +4974,7 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { ...@@ -4995,7 +4974,7 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
return info; return info;
} }
void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id) { void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id) {
char fname[TSDB_TABLE_FNAME_LEN] = {0}; char fname[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, fname); tNameExtractFullName(&pTableMetaInfo->name, fname);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册