未验证 提交 c14cb952 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #10067 from taosdata/feature/3.0_liaohj

[td-11818] update log, add sql in the log file.
......@@ -64,7 +64,6 @@ extern int8_t tsKeepOriginalColumnName;
extern int8_t tsDeadLockKillQuery;
// client
extern int32_t tsMaxSQLStringLen;
extern int32_t tsMaxWildCardsLen;
extern int32_t tsMaxRegexStringLen;
extern int8_t tsTscEnableRecordSql;
......
......@@ -876,13 +876,21 @@ typedef struct {
char desc[TSDB_STEP_DESC_LEN];
} SStartupReq;
/**
* The layout of the query message payload is as following:
* +--------------------+---------------------------------+
* |Sql statement | Physical plan |
* |(denoted by sqlLen) |(In JSON, denoted by contentLen) |
* +--------------------+---------------------------------+
*/
typedef struct SSubQueryMsg {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int8_t taskType;
uint32_t contentLen;
uint32_t sqlLen; // the query sql,
uint32_t phyLen;
char msg[];
} SSubQueryMsg;
......@@ -1578,7 +1586,8 @@ static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg*
tlen += taosEncodeFixedU64(buf, pMsg->sId);
tlen += taosEncodeFixedU64(buf, pMsg->queryId);
tlen += taosEncodeFixedU64(buf, pMsg->taskId);
tlen += taosEncodeFixedU32(buf, pMsg->contentLen);
tlen += taosEncodeFixedU32(buf, pMsg->sqlLen);
tlen += taosEncodeFixedU32(buf, pMsg->phyLen);
//tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen);
return tlen;
}
......@@ -1587,7 +1596,8 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
buf = taosDecodeFixedU64(buf, &pMsg->sId);
buf = taosDecodeFixedU64(buf, &pMsg->queryId);
buf = taosDecodeFixedU64(buf, &pMsg->taskId);
buf = taosDecodeFixedU32(buf, &pMsg->contentLen);
buf = taosDecodeFixedU32(buf, &pMsg->sqlLen);
buf = taosDecodeFixedU32(buf, &pMsg->phyLen);
//buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen);
return buf;
}
......
......@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes);
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
......@@ -80,7 +80,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, str
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** pJob);
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob);
/**
* Fetch query result from the remote query executor
......
......@@ -239,9 +239,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res);
if (code != TSDB_CODE_SUCCESS) {
// handle error and retry
} else {
......@@ -255,7 +256,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
return pRequest->code;
}
return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
return schedulerAsyncExecJob(pTransporter, pNodeList, pDag, pRequest->sqlstr, &pRequest->body.pQueryJob);
}
......@@ -527,8 +528,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
goto _return;
}
if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
goto _return;
}
......@@ -771,8 +772,8 @@ void tmq_message_destroy(tmq_message_t* tmq_message) {
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
STscObj *pTscObj = (STscObj *)taos;
if (sqlLen > (size_t) tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
if (sqlLen > (size_t) TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
......
......@@ -78,7 +78,6 @@ int32_t tsCompressColData = -1;
int32_t tsCompatibleModel = 1;
// client
int32_t tsMaxSQLStringLen = TSDB_MAX_ALLOWED_SQL_LEN;
int32_t tsMaxWildCardsLen = TSDB_PATTERN_STRING_DEFAULT_LEN;
int32_t tsMaxRegexStringLen = TSDB_REGEX_STRING_DEFAULT_LEN;
......@@ -594,16 +593,6 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosAddConfigOption(cfg);
cfg.option = "maxSQLLength";
cfg.ptr = &tsMaxSQLStringLen;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MAX_SQL_LEN;
cfg.maxValue = TSDB_MAX_ALLOWED_SQL_LEN;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
taosAddConfigOption(cfg);
cfg.option = "maxWildCardsLength";
cfg.ptr = &tsMaxWildCardsLen;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......
......@@ -128,7 +128,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp
if (NULL == pMsg) {
qError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1;
return terrno;
}
memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
......
......@@ -182,9 +182,9 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
......
......@@ -479,7 +479,6 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
if (NULL == pRes) {
QW_TASK_DLOG("task query done, useconds:%"PRIu64, useconds);
dsEndPut(sinkHandle, useconds);
if (TASK_TYPE_TEMP == ctx->taskType) {
......@@ -493,6 +492,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
break;
}
ASSERT(pRes->info.rows > 0);
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
if (code) {
......
......@@ -282,18 +282,21 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = be64toh(msg->sId);
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->contentLen = ntohl(msg->contentLen);
msg->taskId = be64toh(msg->taskId);
msg->phyLen = ntohl(msg->phyLen);
msg->sqlLen = ntohl(msg->sqlLen);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg, .msgLen = msg->contentLen, .connection = pMsg};
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connection = pMsg};
QW_SCH_TASK_DLOG("processQuery start, node:%p", node);
char* sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, sql:%s", node, sql);
tfree(sql);
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
......
......@@ -113,7 +113,8 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
qwtqueryMsg.queryId = htobe64(atomic_add_fetch_64(&qwtTestQueryId, 1));
qwtqueryMsg.sId = htobe64(1);
qwtqueryMsg.taskId = htobe64(1);
qwtqueryMsg.contentLen = htonl(100);
qwtqueryMsg.phyLen = htonl(100);
qwtqueryMsg.sqlLen = 0;
queryRpc->msgType = TDMT_VND_QUERY;
queryRpc->pCont = &qwtqueryMsg;
queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
......
......@@ -128,6 +128,7 @@ typedef struct SSchJob {
int32_t errCode;
void *res; //TODO free it or not
int32_t resNumOfRows;
const char *sql;
SQueryProfileSummary summary;
} SSchJob;
......@@ -150,11 +151,11 @@ typedef struct SSchJob {
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \
qError("QID:0x%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
qDebug("QID:0x%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:%" PRId64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
......
......@@ -345,7 +345,6 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
}
pLevel = taosArrayGet(pJob->levels, i);
pLevel->level = i;
plans = taosArrayGetP(pDag->pSubplans, i);
......@@ -375,7 +374,7 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
SSchTask task = {0};
SSchTask *pTask = &task;
SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel));
void *p = taosArrayPush(pLevel->subTasks, &task);
......@@ -388,8 +387,6 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task initialized, level:%d", pLevel->level);
}
SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
......@@ -716,8 +713,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
int32_t code = 0;
SSchTask *pErrTask = pTask;
SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask));
code = schMoveTaskToSuccList(pJob, pTask, &moved);
if (code && moved) {
SCH_ERR_RET(code);
......@@ -739,7 +734,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
if (taskDone < pTask->level->taskNum) {
SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
return TSDB_CODE_SUCCESS;
} else if (taskDone > pTask->level->taskNum) {
SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
......@@ -1031,22 +1025,18 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
pMsgSendInfo->fp = fp;
int64_t transporterId = 0;
code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo);
if (code) {
qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code);
SCH_ERR_JRET(code);
}
qDebug("QID:%"PRIx64 ",TID:%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType));
qDebug("QID:0x%"PRIx64 ",TID:0x%"PRIx64 " req msg sent, type:%d, %s", qId, tId, msgType, TMSG_INFO(msgType));
return TSDB_CODE_SUCCESS;
_return:
tfree(param);
tfree(pMsgSendInfo);
SCH_RET(code);
}
......@@ -1057,7 +1047,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
bool isCandidateAddr = false;
if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
isCandidateAddr = true;
}
......@@ -1078,7 +1067,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
}
case TDMT_VND_QUERY: {
msgSize = sizeof(SSubQueryMsg) + pTask->msgLen;
uint32_t len = strlen(pJob->sql);
msgSize = sizeof(SSubQueryMsg) + pTask->msgLen + len;
msg = calloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
......@@ -1086,15 +1077,16 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
}
SSubQueryMsg *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->taskType = TASK_TYPE_TEMP;
pMsg->contentLen = htonl(pTask->msgLen);
memcpy(pMsg->msg, pTask->msg, pTask->msgLen);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->taskType = TASK_TYPE_TEMP;
pMsg->phyLen = htonl(pTask->msgLen);
pMsg->sqlLen = htonl(len);
memcpy(pMsg->msg, pJob->sql, len);
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
break;
}
......@@ -1205,7 +1197,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, pTask->msgLen);
SCH_ERR_JRET(code);
} else {
SCH_TASK_DLOG(" ===physical plan=== len:%d, %s", pTask->msgLen, pTask->msg);
SCH_TASK_DLOG("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
}
}
......@@ -1214,7 +1206,6 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
// NOTE: race condition: the task should be put into the hash table before send msg to server
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING);
}
......@@ -1281,7 +1272,7 @@ void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->failTasks);
}
int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** job, const char* sql, bool syncSchedule) {
qDebug("QID:0x%"PRIx64" job started", pDag->queryId);
if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) {
......@@ -1297,6 +1288,7 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, stru
pJob->attr.syncSchedule = syncSchedule;
pJob->transport = transport;
pJob->sql = sql;
if (pNodeList != NULL) {
pJob->nodeList = taosArrayDup(pNodeList);
......@@ -1336,7 +1328,6 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, stru
}
pJob->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(pJob));
*(SSchJob **)job = pJob;
......@@ -1347,15 +1338,12 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryDag* pDag, stru
}
SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob));
return TSDB_CODE_SUCCESS;
_return:
*(SSchJob **)job = NULL;
schedulerFreeJob(pJob);
SCH_RET(code);
}
......@@ -1399,29 +1387,24 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) {
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, const char* sql, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchJob *job = NULL;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true));
job = *pJob;
pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
pRes->code = atomic_load_32(&(*pJob)->errCode);
pRes->numOfRows = (*pJob)->resNumOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, struct SSchJob** pJob) {
int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDag, const char* sql, struct SSchJob** pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, false));
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, false));
return TSDB_CODE_SUCCESS;
}
......@@ -1456,7 +1439,6 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
for (int32_t i = 0; i < taskNum; ++i) {
SSubplan *plan = taosArrayGetP(plans, i);
tInfo.addr = plan->execNode;
code = qSubPlanToString(plan, &msg, &msgLen);
......@@ -1472,16 +1454,16 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
}
SSubQueryMsg* pMsg = calloc(1, msgSize);
/*SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;*/
memcpy(pMsg->msg, msg, msgLen);
pMsg->header.vgId = tInfo.addr.nodeId;
pMsg->sId = schMgmt.sId;
pMsg->queryId = plan->id.queryId;
pMsg->taskId = schGenUUID();
pMsg->sId = schMgmt.sId;
pMsg->queryId = plan->id.queryId;
pMsg->taskId = schGenUUID();
pMsg->taskType = TASK_TYPE_PERSISTENT;
pMsg->contentLen = msgLen;
pMsg->phyLen = msgLen;
pMsg->sqlLen = 0;
/*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
tInfo.msg = pMsg;
......@@ -1514,7 +1496,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t msgSize = src->msg->contentLen + sizeof(*src->msg);
int32_t msgSize = src->msg->phyLen + sizeof(*src->msg);
STaskInfo info = {0};
info.addr = src->addr;
......
......@@ -378,7 +378,7 @@ void* schtRunJobThread(void *aa) {
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
assert(code == 0);
execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
......@@ -539,7 +539,7 @@ TEST(queryTest, normalCase) {
schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &pJob);
ASSERT_EQ(code, 0);
SSchJob *job = (SSchJob *)pJob;
......@@ -649,7 +649,7 @@ TEST(insertTest, normalCase) {
pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob);
SQueryResult res = {0};
code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &res);
code = schedulerExecJob(mockPointer, qnodeList, &dag, &pInsertJob, "insert into tb values(now,1)", &res);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20);
......
......@@ -966,7 +966,7 @@ int isCommentLine(char *line) {
void source_file(TAOS *con, char *fptr) {
wordexp_t full_path;
int read_len = 0;
char *cmd = calloc(1, tsMaxSQLStringLen + 1);
char *cmd = calloc(1, TSDB_MAX_ALLOWED_SQL_LEN + 1);
size_t cmd_len = 0;
char *line = NULL;
size_t line_len = 0;
......@@ -998,7 +998,7 @@ void source_file(TAOS *con, char *fptr) {
}
while ((read_len = tgetline(&line, &line_len, f)) != -1) {
if (read_len >= tsMaxSQLStringLen) continue;
if (read_len >= TSDB_MAX_ALLOWED_SQL_LEN) continue;
line[--read_len] = '\0';
if (read_len == 0 || isCommentLine(line)) { // line starts with #
......@@ -1015,7 +1015,7 @@ void source_file(TAOS *con, char *fptr) {
memcpy(cmd + cmd_len, line, read_len);
printf("%s%s\n", PROMPT_HEADER, cmd);
shellRunCommand(con, cmd);
memset(cmd, 0, tsMaxSQLStringLen);
memset(cmd, 0, TSDB_MAX_ALLOWED_SQL_LEN);
cmd_len = 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册