提交 7032e5a0 编写于 作者: S shenglian zhou

[TD-5235]<feature>:offload msg processing from rpc thread to tsc scheduler

上级 da3e7847
......@@ -484,7 +484,7 @@ int32_t loadTableMeta(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSm
size_t tagIndex = taosArrayGetSize(schema->tags) - 1;
taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex));
}
tscDebug("SML:0x%"PRIx64 "load table meta succeed. %s, columns number: %d, tag number: %d, precision: %d",
tscDebug("SML:0x%"PRIx64 " load table meta succeed. table name: %s, columns number: %d, tag number: %d, precision: %d",
info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision);
free(tableMeta); tableMeta = NULL;
return code;
......
......@@ -337,11 +337,16 @@ int tscSendMsgToServer(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
SRpcMsg* rpcMsg = pSchedMsg->ahandle;
SRpcEpSet* pEpSet = pSchedMsg->thandle;
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pSql == NULL) {
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
......@@ -359,17 +364,21 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
......@@ -393,13 +402,13 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// single table query error need to be handled here.
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
(((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || // change the retry procedure
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || // change the retry procedure
rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) {
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) {
// do nothing in case of super table subquery
} else {
pSql->retry += 1;
......@@ -422,6 +431,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
return;
}
}
......@@ -429,7 +440,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
}
pRes->rspLen = 0;
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code));
} else {
......@@ -473,12 +484,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
pRes->numOfRows += pMsg->affectedRows;
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command],
tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
} else {
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
}
}
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
}
......@@ -499,6 +510,31 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
free(rpcMsg);
free(pEpSet);
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doProcessMsgFromServer;
SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg));
memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg));
rpcMsgCopy->pCont = rpcMallocCont(rpcMsg->contLen);
memcpy(rpcMsgCopy->pCont, rpcMsg->pCont, rpcMsg->contLen);
schedMsg.ahandle = (void*)rpcMsgCopy;
SRpcEpSet* pEpSetCopy = NULL;
if (pEpSet != NULL) {
pEpSetCopy = calloc(1, sizeof(SRpcEpSet));
memcpy(pEpSetCopy, pEpSet, sizeof(SRpcEpSet));
}
schedMsg.thandle = (void*)pEpSetCopy;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
int doBuildAndSendMsg(SSqlObj *pSql) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册