提交 6b49352a 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 4fd2fb06
...@@ -100,7 +100,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in ...@@ -100,7 +100,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
*/ */
int32_t schedulerFetchRows(int64_t job, void **data); int32_t schedulerFetchRows(int64_t job, void **data);
int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param); void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param);
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
......
...@@ -1086,7 +1086,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc ...@@ -1086,7 +1086,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc
tsem_wait(&pParam->sem); tsem_wait(&pParam->sem);
} }
if (setupOneRowPtr) { if (pRequest->code == TSDB_CODE_SUCCESS && setupOneRowPtr) {
doSetOneRowPtr(pResultInfo); doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1; pResultInfo->current += 1;
} }
......
...@@ -793,7 +793,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -793,7 +793,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
} }
} }
pRequest->code = schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest); schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
} }
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp,
......
...@@ -474,7 +474,7 @@ TEST(testCase, create_multiple_tables) { ...@@ -474,7 +474,7 @@ TEST(testCase, create_multiple_tables) {
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); pRes = taos_query(pConn, "create table if not exists t_2 using st1 tags(1)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
...@@ -482,7 +482,7 @@ TEST(testCase, create_multiple_tables) { ...@@ -482,7 +482,7 @@ TEST(testCase, create_multiple_tables) {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); pRes = taos_query(pConn, "create table if not exists t_3 using st1 tags(2)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
...@@ -590,6 +590,7 @@ TEST(testCase, generated_request_id_test) { ...@@ -590,6 +590,7 @@ TEST(testCase, generated_request_id_test) {
taosHashCleanup(phash); taosHashCleanup(phash);
} }
TEST(testCase, insert_test) { TEST(testCase, insert_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
...@@ -645,7 +646,7 @@ TEST(testCase, projection_query_tables) { ...@@ -645,7 +646,7 @@ TEST(testCase, projection_query_tables) {
} }
taos_free_result(pRes); taos_free_result(pRes);
for(int32_t i = 0; i < 10000; i += 20) { for(int32_t i = 0; i < 1000; i += 20) {
char sql[1024] = {0}; char sql[1024] = {0};
sprintf(sql, sprintf(sql,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" "insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
......
...@@ -1034,10 +1034,10 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmt ...@@ -1034,10 +1034,10 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmt
CTG_ERR_JRET(ctgLaunchJob(pJob)); CTG_ERR_JRET(ctgLaunchJob(pJob));
*jobId = pJob->refId; // NOTE: here the assignment of jobId is invalid, may over-write the true scheduler created query job.
// *jobId = pJob->refId;
_return: _return:
if (pJob) { if (pJob) {
taosReleaseRef(gCtgMgmt.jobPool, pJob->refId); taosReleaseRef(gCtgMgmt.jobPool, pJob->refId);
......
...@@ -21,9 +21,9 @@ ...@@ -21,9 +21,9 @@
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { qDebug("acquire job:0x%"PRIx64, refId); return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { qDebug("acquire jobId:0x%"PRIx64, refId); return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("release job:0x%"PRIx64, refId); return taosReleaseRef(schMgmt.jobRef, refId); } FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("release jobId:0x%"PRIx64, refId); return taosReleaseRef(schMgmt.jobRef, refId); }
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan; pTask->plan = pPlan;
...@@ -1365,8 +1365,6 @@ void schFreeJobImpl(void *job) { ...@@ -1365,8 +1365,6 @@ void schFreeJobImpl(void *job) {
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
SSchResInfo *pRes, int64_t startTs, bool sync) { SSchResInfo *pRes, int64_t startTs, bool sync) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) { if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId); qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
} }
...@@ -1375,6 +1373,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_ ...@@ -1375,6 +1373,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_
SSchJob *pJob = NULL; SSchJob *pJob = NULL;
SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync)); SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync));
qDebug("QID:0x%" PRIx64 " jobId:0x%"PRIx64 " started", pDag->queryId, pJob->refId);
*job = pJob->refId; *job = pJob->refId;
SCH_ERR_JRET(schLaunchJob(pJob)); SCH_ERR_JRET(schLaunchJob(pJob));
...@@ -1386,7 +1385,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_ ...@@ -1386,7 +1385,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_
pJob->userCb = SCH_EXEC_CB; pJob->userCb = SCH_EXEC_CB;
} }
SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%"PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
_return: _return:
......
...@@ -102,7 +102,7 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { ...@@ -102,7 +102,7 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
int32_t code = 0; int32_t code = 0;
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) { if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
...@@ -115,28 +115,26 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { ...@@ -115,28 +115,26 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
SCH_RET(code); SCH_RET(code);
} }
int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param) { void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param) {
if (NULL == fp || NULL == param) { if (NULL == fp || NULL == param) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); fp(NULL, param, TSDB_CODE_QRY_INVALID_INPUT);
return;
} }
int32_t code = 0;
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) { if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job);
fp(NULL, param, TSDB_CODE_SCH_STATUS_ERROR); fp(NULL, param, TSDB_CODE_SCH_STATUS_ERROR);
return TSDB_CODE_SCH_STATUS_ERROR; return;
} }
pJob->attr.syncSchedule = false; pJob->attr.syncSchedule = false;
pJob->userRes.fetchFp = fp; pJob->userRes.fetchFp = fp;
pJob->userRes.userParam = param; pJob->userRes.userParam = param;
code = schAsyncFetchRows(pJob); /*code = */schAsyncFetchRows(pJob);
schReleaseJob(job); schReleaseJob(job);
SCH_RET(code);
} }
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
...@@ -173,7 +171,7 @@ _return: ...@@ -173,7 +171,7 @@ _return:
int32_t scheduleCancelJob(int64_t job) { int32_t scheduleCancelJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) { if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
...@@ -187,7 +185,7 @@ int32_t scheduleCancelJob(int64_t job) { ...@@ -187,7 +185,7 @@ int32_t scheduleCancelJob(int64_t job) {
void schedulerFreeJob(int64_t job) { void schedulerFreeJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) { if (NULL == pJob) {
qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job);
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册