提交 6da9c663 编写于 作者: H Haojun Liao

[td-11818] update log, add sql in the log file. Remove a global configure parameter.

上级 d6d86514
...@@ -64,7 +64,6 @@ extern int8_t tsKeepOriginalColumnName; ...@@ -64,7 +64,6 @@ extern int8_t tsKeepOriginalColumnName;
extern int8_t tsDeadLockKillQuery; extern int8_t tsDeadLockKillQuery;
// client // client
extern int32_t tsMaxSQLStringLen;
extern int32_t tsMaxWildCardsLen; extern int32_t tsMaxWildCardsLen;
extern int32_t tsMaxRegexStringLen; extern int32_t tsMaxRegexStringLen;
extern int8_t tsTscEnableRecordSql; extern int8_t tsTscEnableRecordSql;
......
...@@ -864,13 +864,21 @@ typedef struct { ...@@ -864,13 +864,21 @@ typedef struct {
char desc[TSDB_STEP_DESC_LEN]; char desc[TSDB_STEP_DESC_LEN];
} SStartupReq; } SStartupReq;
/**
* The layout of the query message payload is as following:
* +--------------------+---------------------------------+
* |Sql statement | Physical plan |
* |(denoted by sqlLen) |(In JSON, denoted by contentLen) |
* +--------------------+---------------------------------+
*/
typedef struct SSubQueryMsg { typedef struct SSubQueryMsg {
SMsgHead header; SMsgHead header;
uint64_t sId; uint64_t sId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
int8_t taskType; int8_t taskType;
uint32_t contentLen; uint32_t sqlLen; // the query sql,
uint32_t phyLen;
char msg[]; char msg[];
} SSubQueryMsg; } SSubQueryMsg;
...@@ -1534,7 +1542,8 @@ static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* ...@@ -1534,7 +1542,8 @@ static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg*
tlen += taosEncodeFixedU64(buf, pMsg->sId); tlen += taosEncodeFixedU64(buf, pMsg->sId);
tlen += taosEncodeFixedU64(buf, pMsg->queryId); tlen += taosEncodeFixedU64(buf, pMsg->queryId);
tlen += taosEncodeFixedU64(buf, pMsg->taskId); tlen += taosEncodeFixedU64(buf, pMsg->taskId);
tlen += taosEncodeFixedU32(buf, pMsg->contentLen); tlen += taosEncodeFixedU32(buf, pMsg->sqlLen);
tlen += taosEncodeFixedU32(buf, pMsg->phyLen);
//tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen); //tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen);
return tlen; return tlen;
} }
...@@ -1543,7 +1552,8 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { ...@@ -1543,7 +1552,8 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
buf = taosDecodeFixedU64(buf, &pMsg->sId); buf = taosDecodeFixedU64(buf, &pMsg->sId);
buf = taosDecodeFixedU64(buf, &pMsg->queryId); buf = taosDecodeFixedU64(buf, &pMsg->queryId);
buf = taosDecodeFixedU64(buf, &pMsg->taskId); buf = taosDecodeFixedU64(buf, &pMsg->taskId);
buf = taosDecodeFixedU32(buf, &pMsg->contentLen); buf = taosDecodeFixedU32(buf, &pMsg->sqlLen);
buf = taosDecodeFixedU32(buf, &pMsg->phyLen);
//buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen); //buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen);
return buf; return buf;
} }
......
...@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); ...@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return * @return
*/ */
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes); int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes);
/** /**
* Process the query job, generated according to the query physical plan. * Process the query job, generated according to the query physical plan.
...@@ -80,7 +80,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str ...@@ -80,7 +80,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr * @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return * @return
*/ */
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** pJob); int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob);
/** /**
* Fetch query result from the remote query executor * Fetch query result from the remote query executor
......
...@@ -239,9 +239,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t ...@@ -239,9 +239,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
} }
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res); int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// handle error and retry // handle error and retry
} else { } else {
...@@ -255,7 +256,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) ...@@ -255,7 +256,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
return pRequest->code; return pRequest->code;
} }
return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob); return schedulerAsyncExecJob(pTransporter, pNodeList, pDag, pRequest->sqlstr, &pRequest->body.pQueryJob);
} }
...@@ -532,8 +533,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, ...@@ -532,8 +533,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
goto _return; goto _return;
} }
if (sqlLen > tsMaxSQLStringLen) { if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
goto _return; goto _return;
} }
...@@ -755,8 +756,8 @@ void tmq_message_destroy(tmq_message_t* tmq_message) { ...@@ -755,8 +756,8 @@ void tmq_message_destroy(tmq_message_t* tmq_message) {
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
STscObj *pTscObj = (STscObj *)taos; STscObj *pTscObj = (STscObj *)taos;
if (sqlLen > (size_t) tsMaxSQLStringLen) { if (sqlLen > (size_t) TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL; return NULL;
} }
......
...@@ -78,7 +78,6 @@ int32_t tsCompressColData = -1; ...@@ -78,7 +78,6 @@ int32_t tsCompressColData = -1;
int32_t tsCompatibleModel = 1; int32_t tsCompatibleModel = 1;
// client // client
int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN;
int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_DEFAULT_LEN; int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_DEFAULT_LEN;
int32_t tsMaxRegexStringLen = TSDB_REGEX_STRING_DEFAULT_LEN; int32_t tsMaxRegexStringLen = TSDB_REGEX_STRING_DEFAULT_LEN;
...@@ -594,16 +593,6 @@ static void doInitGlobalConfig(void) { ...@@ -594,16 +593,6 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg); taosAddConfigOption(cfg);
cfg.option = "maxSQLLength";
cfg.ptr = &tsMaxSQLStringLen;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MAX_SQL_LEN;
cfg.maxValue = TSDB_MAX_ALLOWED_SQL_LEN;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosAddConfigOption(cfg);
cfg.option = "maxWildCardsLength"; cfg.option = "maxWildCardsLength";
cfg.ptr = &tsMaxWildCardsLen; cfg.ptr = &tsMaxWildCardsLen;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
......
...@@ -128,7 +128,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp ...@@ -128,7 +128,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp
if (NULL == pMsg) { if (NULL == pMsg) {
qError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType)); qError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1; return terrno;
} }
memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len); memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
......
...@@ -182,9 +182,9 @@ typedef struct SQWorkerMgmt { ...@@ -182,9 +182,9 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId) #define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId) #define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) #define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
......
...@@ -479,7 +479,6 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -479,7 +479,6 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
if (NULL == pRes) { if (NULL == pRes) {
QW_TASK_DLOG("task query done, useconds:%"PRIu64, useconds); QW_TASK_DLOG("task query done, useconds:%"PRIu64, useconds);
dsEndPut(sinkHandle, useconds); dsEndPut(sinkHandle, useconds);
if (TASK_TYPE_TEMP == ctx->taskType) { if (TASK_TYPE_TEMP == ctx->taskType) {
...@@ -493,6 +492,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -493,6 +492,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
break; break;
} }
ASSERT(pRes->info.rows > 0);
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
if (code) { if (code) {
......
...@@ -282,18 +282,21 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -282,18 +282,21 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->sId = be64toh(msg->sId); msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId); msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId); msg->taskId = be64toh(msg->taskId);
msg->contentLen = ntohl(msg->contentLen); msg->phyLen = ntohl(msg->phyLen);
msg->sqlLen = ntohl(msg->sqlLen);
uint64_t sId = msg->sId; uint64_t sId = msg->sId;
uint64_t qId = msg->queryId; uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId; uint64_t tId = msg->taskId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg, .msgLen = msg->contentLen, .connection = pMsg}; SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connection = pMsg};
QW_SCH_TASK_DLOG("processQuery start, node:%p", node); char* sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, sql:%s", node, sql);
tfree(sql);
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType)); QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
......
...@@ -113,7 +113,8 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) { ...@@ -113,7 +113,8 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1)); qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1));
qwtqueryMsg.sId = htobe64(1); qwtqueryMsg.sId = htobe64(1);
qwtqueryMsg.taskId = htobe64(1); qwtqueryMsg.taskId = htobe64(1);
qwtqueryMsg.contentLen = htonl(100); qwtqueryMsg.phyLen = htonl(100);
qwtqueryMsg.sqlLen = 0;
queryRpc->msgType = TDMT_VND_QUERY; queryRpc->msgType = TDMT_VND_QUERY;
queryRpc->pCont = &qwtqueryMsg; queryRpc->pCont = &qwtqueryMsg;
queryRpc->contLen = sizeof(SSubQueryMsg) + 100; queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
......
...@@ -128,6 +128,7 @@ typedef struct SSchJob { ...@@ -128,6 +128,7 @@ typedef struct SSchJob {
int32_t errCode; int32_t errCode;
void *res; //TODO free it or not void *res; //TODO free it or not
int32_t resNumOfRows; int32_t resNumOfRows;
const char *sql;
SQueryProfileSummary summary; SQueryProfileSummary summary;
} SSchJob; } SSchJob;
...@@ -150,11 +151,11 @@ typedef struct SSchJob { ...@@ -150,11 +151,11 @@ typedef struct SSchJob {
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \ #define SCH_TASK_ELOG(param, ...) \
qError("QID:0x%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__) qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \ #define SCH_TASK_DLOG(param, ...) \
qDebug("QID:0x%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__) qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \ #define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__) qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
......
...@@ -345,7 +345,6 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { ...@@ -345,7 +345,6 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
} }
pLevel = taosArrayGet(pJob->levels, i); pLevel = taosArrayGet(pJob->levels, i);
pLevel->level = i; pLevel->level = i;
plans = taosArrayGetP(pDag->pSubplans, i); plans = taosArrayGetP(pDag->pSubplans, i);
...@@ -375,7 +374,7 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { ...@@ -375,7 +374,7 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
SSchTask task = {0}; SSchTask task = {0};
SSchTask *pTask = &task; SSchTask *pTask = &task;
SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel)); SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
void *p = taosArrayPush(pLevel->subTasks, &task); void *p = taosArrayPush(pLevel->subTasks, &task);
...@@ -388,8 +387,6 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { ...@@ -388,8 +387,6 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("task initialized, level:%d", pLevel->level);
} }
SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum); SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
...@@ -716,8 +713,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -716,8 +713,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
int32_t code = 0; int32_t code = 0;
SSchTask *pErrTask = pTask; SSchTask *pErrTask = pTask;
SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask));
code = schMoveTaskToSuccList(pJob, pTask, &moved); code = schMoveTaskToSuccList(pJob, pTask, &moved);
if (code && moved) { if (code && moved) {
SCH_ERR_RET(code); SCH_ERR_RET(code);
...@@ -739,7 +734,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -739,7 +734,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
if (taskDone < pTask->level->taskNum) { if (taskDone < pTask->level->taskNum) {
SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (taskDone > pTask->level->taskNum) { } else if (taskDone > pTask->level->taskNum) {
SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum); SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
...@@ -1031,22 +1025,18 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t ...@@ -1031,22 +1025,18 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
pMsgSendInfo->fp = fp; pMsgSendInfo->fp = fp;
int64_t transporterId = 0; int64_t transporterId = 0;
code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo); code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo);
if (code) { if (code) {
qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code);
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
qDebug("QID:%"PRIx64 ",TID:%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType)); qDebug("QID:0x%"PRIx64 ",TID:0x%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
tfree(param); tfree(param);
tfree(pMsgSendInfo); tfree(pMsgSendInfo);
SCH_RET(code); SCH_RET(code);
} }
...@@ -1057,7 +1047,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1057,7 +1047,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
bool isCandidateAddr = false; bool isCandidateAddr = false;
if (NULL == addr) { if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
isCandidateAddr = true; isCandidateAddr = true;
} }
...@@ -1078,7 +1067,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1078,7 +1067,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
} }
case TDMT_VND_QUERY: { case TDMT_VND_QUERY: {
msgSize = sizeof(SSubQueryMsg) + pTask->msgLen; uint32_t len = strlen(pJob->sql);
msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
msg = calloc(1, msgSize); msg = calloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_TASK_ELOG("calloc %d failed", msgSize);
...@@ -1086,15 +1077,16 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1086,15 +1077,16 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
} }
SSubQueryMsg *pMsg = msg; SSubQueryMsg *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId); pMsg->header.vgId = htonl(addr->nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId);
pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId);
pMsg->taskId = htobe64(pTask->taskId); pMsg->taskType = TASK_TYPE_TEMP;
pMsg->taskType = TASK_TYPE_TEMP; pMsg->phyLen = htonl(pTask->msgLen);
pMsg->contentLen = htonl(pTask->msgLen); pMsg->sqlLen = htonl(len);
memcpy(pMsg->msg, pTask->msg, pTask->msgLen);
memcpy(pMsg->msg, pJob->sql, len);
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
break; break;
} }
...@@ -1205,7 +1197,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -1205,7 +1197,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen); SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen);
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} else { } else {
SCH_TASK_DLOG(" ===physical plan=== len:%d, %s", pTask->msgLen, pTask->msg); SCH_TASK_DLOG("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
} }
} }
...@@ -1214,7 +1206,6 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -1214,7 +1206,6 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
// NOTE: race condition: the task should be put into the hash table before send msg to server // NOTE: race condition: the task should be put into the hash table before send msg to server
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
} }
...@@ -1281,7 +1272,7 @@ void schDropJobAllTasks(SSchJob *pJob) { ...@@ -1281,7 +1272,7 @@ void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->failTasks); schDropTaskInHashList(pJob, pJob->failTasks);
} }
int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) { static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, const char* sql, bool syncSchedule) {
qDebug("QID:0x%"PRIx64" job started", pDag->queryId); qDebug("QID:0x%"PRIx64" job started", pDag->queryId);
if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) { if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) {
...@@ -1297,6 +1288,7 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, stru ...@@ -1297,6 +1288,7 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, stru
pJob->attr.syncSchedule = syncSchedule; pJob->attr.syncSchedule = syncSchedule;
pJob->transport = transport; pJob->transport = transport;
pJob->sql = sql;
if (pNodeList != NULL) { if (pNodeList != NULL) {
pJob->nodeList = taosArrayDup(pNodeList); pJob->nodeList = taosArrayDup(pNodeList);
...@@ -1336,7 +1328,6 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, stru ...@@ -1336,7 +1328,6 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, stru
} }
pJob->status = JOB_TASK_STATUS_NOT_START; pJob->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(pJob)); SCH_ERR_JRET(schLaunchJob(pJob));
*(SSchJob **)job = pJob; *(SSchJob **)job = pJob;
...@@ -1347,15 +1338,12 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, stru ...@@ -1347,15 +1338,12 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, stru
} }
SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob)); SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
*(SSchJob **)job = NULL; *(SSchJob **)job = NULL;
schedulerFreeJob(pJob); schedulerFreeJob(pJob);
SCH_RET(code); SCH_RET(code);
} }
...@@ -1399,29 +1387,24 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { ...@@ -1399,29 +1387,24 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) { int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SSchJob *job = NULL; SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
pRes->code = atomic_load_32(&(*pJob)->errCode);
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true)); pRes->numOfRows = (*pJob)->resNumOfRows;
job = *pJob;
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** pJob) { int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, false)); SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, false));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1456,7 +1439,6 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { ...@@ -1456,7 +1439,6 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SSubplan *plan = taosArrayGetP(plans, i); SSubplan *plan = taosArrayGetP(plans, i);
tInfo.addr = plan->execNode; tInfo.addr = plan->execNode;
code = qSubPlanToString(plan, &msg, &msgLen); code = qSubPlanToString(plan, &msg, &msgLen);
...@@ -1472,16 +1454,16 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { ...@@ -1472,16 +1454,16 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
} }
SSubQueryMsg* pMsg = calloc(1, msgSize); SSubQueryMsg* pMsg = calloc(1, msgSize);
/*SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;*/
memcpy(pMsg->msg, msg, msgLen); memcpy(pMsg->msg, msg, msgLen);
pMsg->header.vgId = tInfo.addr.nodeId; pMsg->header.vgId = tInfo.addr.nodeId;
pMsg->sId = schMgmt.sId; pMsg->sId = schMgmt.sId;
pMsg->queryId = plan->id.queryId; pMsg->queryId = plan->id.queryId;
pMsg->taskId = schGenUUID(); pMsg->taskId = schGenUUID();
pMsg->taskType = TASK_TYPE_PERSISTENT; pMsg->taskType = TASK_TYPE_PERSISTENT;
pMsg->contentLen = msgLen; pMsg->phyLen = msgLen;
pMsg->sqlLen = 0;
/*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/ /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
tInfo.msg = pMsg; tInfo.msg = pMsg;
...@@ -1514,7 +1496,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) { ...@@ -1514,7 +1496,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
int32_t msgSize = src->msg->contentLen + sizeof(*src->msg); int32_t msgSize = src->msg->phyLen + sizeof(*src->msg);
STaskInfo info = {0}; STaskInfo info = {0};
info.addr = src->addr; info.addr = src->addr;
......
...@@ -378,7 +378,7 @@ void* schtRunJobThread(void *aa) { ...@@ -378,7 +378,7 @@ void* schtRunJobThread(void *aa) {
qnodeAddr.port = 6031; qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr); taosArrayPush(qnodeList, &qnodeAddr);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job); code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
assert(code == 0); assert(code == 0);
execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
...@@ -539,7 +539,7 @@ TEST(queryTest, normalCase) { ...@@ -539,7 +539,7 @@ TEST(queryTest, normalCase) {
schtSetExecNode(); schtSetExecNode();
schtSetAsyncSendMsgToServer(); schtSetAsyncSendMsgToServer();
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &pJob);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SSchJob *job = (SSchJob *)pJob; SSchJob *job = (SSchJob *)pJob;
...@@ -649,7 +649,7 @@ TEST(insertTest, normalCase) { ...@@ -649,7 +649,7 @@ TEST(insertTest, normalCase) {
pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob); pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
SQueryResult res = {0}; SQueryResult res = {0};
code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res); code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, "insert into tb values(now,1)", &res);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20); ASSERT_EQ(res.numOfRows, 20);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册