diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index ab27d7f00c4a1c1d55b67240d7d7e641f69e3e6e..43e886faf54c3f3b92fd604369d0e0286dab6133 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -46,11 +46,12 @@ typedef struct SStmtTableCache { void* boundTags; } SStmtTableCache; -typedef struct SQueryFields { +typedef struct SStmtQueryResInfo { TAOS_FIELD* fields; TAOS_FIELD* userFields; uint32_t numOfCols; -} SQueryFields; + int32_t precision; +} SStmtQueryResInfo; typedef struct SStmtBindInfo { bool needParse; @@ -72,17 +73,17 @@ typedef struct SStmtExecInfo { } SStmtExecInfo; typedef struct SStmtSQLInfo { - STMT_TYPE type; - STMT_STATUS status; - bool autoCreate; - uint64_t runTimes; - SHashObj* pTableCache; //SHash - SQuery* pQuery; - char* sqlStr; - int32_t sqlLen; - SArray* nodeList; - SQueryPlan* pQueryPlan; - SQueryFields fields; + STMT_TYPE type; + STMT_STATUS status; + bool autoCreate; + uint64_t runTimes; + SHashObj* pTableCache; //SHash + SQuery* pQuery; + char* sqlStr; + int32_t sqlLen; + SArray* nodeList; + SQueryPlan* pQueryPlan; + SStmtQueryResInfo queryRes; } SStmtSQLInfo; typedef struct STscStmt { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index ca6a11a668aedcf797b31fe6d0f1d615ba2290f9..aad70120561689e46f48cac3b968aaa1a4be2af6 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -74,17 +74,44 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) { } int32_t stmtBackupQueryFields(STscStmt* pStmt) { - SQueryFields *pFields = &pStmt->sql.fields; - int32_t size = pFields->numOfCols * sizeof(TAOS_FIELD); + SStmtQueryResInfo *pRes = &pStmt->sql.queryRes; + pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols; + pRes->precision = pStmt->exec.pRequest->body.resInfo.precision; - pFields->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols; - pFields->fields = taosMemoryMalloc(size); - pFields->userFields = taosMemoryMalloc(size); - if (NULL == pFields->fields || NULL == pFields->userFields) { + int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD); + pRes->fields = taosMemoryMalloc(size); + pRes->userFields = taosMemoryMalloc(size); + if (NULL == pRes->fields || NULL == pRes->userFields) { STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY); } - memcpy(pFields->fields, pStmt->exec.pRequest->body.resInfo.fields, size); - memcpy(pFields->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size); + memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size); + memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size); + + return TSDB_CODE_SUCCESS; +} + +int32_t stmtRestoreQueryFields(STscStmt* pStmt) { + SStmtQueryResInfo *pRes = &pStmt->sql.queryRes; + int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD); + + pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols; + pStmt->exec.pRequest->body.resInfo.precision = pRes->precision; + + if (NULL == pStmt->exec.pRequest->body.resInfo.fields) { + pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size); + if (NULL == pStmt->exec.pRequest->body.resInfo.fields) { + STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY); + } + memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size); + } + + if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) { + pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size); + if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) { + STMT_ERR_RET(TSDB_CODE_TSC_OUT_OF_MEMORY); + } + memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size); + } return TSDB_CODE_SUCCESS; } @@ -235,6 +262,8 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) { } int32_t stmtCleanSQLInfo(STscStmt* pStmt) { + taosMemoryFree(pStmt->sql.queryRes.fields); + taosMemoryFree(pStmt->sql.queryRes.userFields); taosMemoryFree(pStmt->sql.sqlStr); qDestroyQuery(pStmt->sql.pQuery); qDestroyQueryPlan(pStmt->sql.pQueryPlan); @@ -497,6 +526,8 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) { pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag; pStmt->exec.pRequest->body.pDag = NULL; STMT_ERR_RET(stmtBackupQueryFields(pStmt)); + } else { + STMT_ERR_RET(stmtRestoreQueryFields(pStmt)); } STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId)); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 5083104039a71e75df1c7a803d2f2e8b10391ed3..b96444bebcc7d9df5eff88d7dc878b9fa170be5f 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -47,7 +47,7 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); typedef int32_t (*ShowRetrieveFp)(SNodeMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); -typedef struct SQWorkerMgmt SQHandle; +typedef struct SQWorker SQHandle; typedef struct { const char *name; diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index 307bf3efb8913d4fae4966797835e432ebb051c5..c18a43c4fb20df1b939ee0d9f0ef8813abba58c6 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -29,7 +29,7 @@ extern "C" { #endif -typedef struct SQWorkerMgmt SQHandle; +typedef struct SQWorker SQHandle; typedef struct SQnode { int32_t qndId; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b5066799ca172711c3511e93cbe59520fc5093db..b11f9def7ce01999451327d6d4d860a4a3877807 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -52,7 +52,7 @@ typedef struct STsdb STsdb; typedef struct STQ STQ; typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; -typedef struct SQWorkerMgmt SQHandle; +typedef struct SQWorker SQHandle; #define VNODE_META_DIR "meta" #define VNODE_TSDB_DIR "tsdb" diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 042b914927732edb7b8214f2976a230c1f883e8f..52d1980166c9b25a6347f308c8ec2446dc58a5bc 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -206,6 +206,8 @@ static int32_t cpdMergeCond(SNode** pDst, SNode** pSrc) { if (NULL == pLogicCond) { return TSDB_CODE_OUT_OF_MEMORY; } + pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL; + pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; pLogicCond->condType = LOGIC_COND_TYPE_AND; int32_t code = nodesListMakeAppend(&pLogicCond->pParameterList, *pSrc); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index de73f1db0b496fa6fe865831177e150464ef916e..4f2f3febaf4997b8c6b01df4db336b7ef9e2de39 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -23,6 +23,7 @@ extern "C" { #include "qworker.h" #include "tlockfree.h" #include "ttimer.h" +#include "tref.h" #define QW_DEFAULT_SCHEDULER_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000 @@ -85,6 +86,11 @@ typedef struct SQWMsg { SQWConnInfo connInfo; } SQWMsg; +typedef struct SQWHbParam { + int32_t qwrId; + int64_t refId; +} SQWHbParam; + typedef struct SQWHbInfo { SSchedulerHbRsp rsp; SQWConnInfo connInfo; @@ -137,7 +143,8 @@ typedef struct SQWSchStatus { } SQWSchStatus; // Qnode/Vnode level task management -typedef struct SQWorkerMgmt { +typedef struct SQWorker { + int64_t refId; SQWorkerCfg cfg; int8_t nodeType; int32_t nodeId; @@ -148,9 +155,15 @@ typedef struct SQWorkerMgmt { SHashObj *schHash; // key: schedulerId, value: SQWSchStatus SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx SMsgCb msgCb; +} SQWorker; + +typedef struct SQWorkerMgmt { + SRWLatch lock; + int32_t qwRef; + int32_t qwNum; } SQWorkerMgmt; -#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId +#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId #define QW_IDS() sId, qId, tId, rId #define QW_FPARAMS() mgmt, QW_IDS() @@ -209,13 +222,13 @@ typedef struct SQWorkerMgmt { } \ } while (0) -#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__) -#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__) +#define QW_ELOG(_param, ...) qError("QW:%p " _param, mgmt, __VA_ARGS__) +#define QW_DLOG(_param, ...) qDebug("QW:%p " _param, mgmt, __VA_ARGS__) -#define QW_DUMP(param, ...) \ +#define QW_DUMP(_param, ...) \ do { \ if (gQWDebug.dumpEnable) { \ - qDebug("QW:%p " param, mgmt, __VA_ARGS__); \ + qDebug("QW:%p " _param, mgmt, __VA_ARGS__); \ } \ } while (0) @@ -282,6 +295,14 @@ typedef struct SQWorkerMgmt { } \ } while (0) + +extern SQWorkerMgmt gQwMgmt; + +FORCE_INLINE SQWorker *qwAcquire(int64_t refId) { return (SQWorker *)taosAcquireRef(atomic_load_32(&gQwMgmt.qwRef), refId); } + +FORCE_INLINE int32_t qwRelease(int64_t refId) { return taosReleaseRef(gQwMgmt.qwRef, refId); } + + #ifdef __cplusplus } #endif diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index fb1950d16bb6dafcbf8c44537e94a33d51b58cf3..fbd9ce91bc96369ef6d21f63b0094efe05a33b93 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -28,7 +28,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); -int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); +int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code); @@ -41,10 +41,10 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); -int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); +int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); -int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn); +int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pConn); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index b51bc5083e826eeeda453f6a5eb2fd7de51aec84..c94dd29ea15d152e6831a2de6dfd72eeb53b5806 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -10,6 +10,11 @@ #include "tname.h" SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true}; +SQWorkerMgmt gQwMgmt = { + .lock = 0, + .qwRef = -1, + .qwNum = 0, +}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -98,7 +103,7 @@ _return: void qwDbgDumpSchInfo(SQWSchStatus *sch, int32_t i) {} -void qwDbgDumpMgmtInfo(SQWorkerMgmt *mgmt) { +void qwDbgDumpMgmtInfo(SQWorker *mgmt) { if (!gQWDebug.dumpEnable) { return; } @@ -186,7 +191,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { return TSDB_CODE_SUCCESS; } -int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) { +int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) { SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -213,7 +218,7 @@ int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) { return TSDB_CODE_SUCCESS; } -int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { +int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { while (true) { QW_LOCK(rwType, &mgmt->schLock); *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); @@ -240,15 +245,15 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, return TSDB_CODE_SUCCESS; } -int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { +int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD); } -int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { +int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR); } -void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } +void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) { char id[sizeof(qId) + sizeof(tId)] = {0}; @@ -384,7 +389,7 @@ int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), fal int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); } -void qwReleaseTaskCtx(SQWorkerMgmt *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } +void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) { // Note: free/kill may in RC @@ -606,7 +611,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { QW_RET(code); } -int32_t qwGenerateSchHbRsp(SQWorkerMgmt *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { +int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) { int32_t taskNum = 0; hbInfo->connInfo = sch->hbConnInfo; @@ -1262,7 +1267,7 @@ _return: QW_RET(TSDB_CODE_SUCCESS); } -int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { +int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; SQWSchStatus * sch = NULL; @@ -1288,7 +1293,7 @@ int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq QW_RET(TSDB_CODE_SUCCESS); } -int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { +int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; SQWSchStatus * sch = NULL; @@ -1333,7 +1338,20 @@ _return: } void qwProcessHbTimerEvent(void *param, void *tmrId) { - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)param; + SQWHbParam* hbParam = (SQWHbParam*)param; + if (hbParam->qwrId != atomic_load_32(&gQwMgmt.qwRef)) { + taosMemoryFree(param); + return; + } + + int64_t refId = hbParam->refId; + SQWorker *mgmt = qwAcquire(refId); + if (NULL == mgmt) { + QW_DLOG("qwAcquire %" PRIx64 "failed", refId); + taosMemoryFree(param); + return; + } + SQWSchStatus *sch = NULL; int32_t taskNum = 0; SQWHbInfo * rspList = NULL; @@ -1347,6 +1365,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { if (schNum <= 0) { QW_UNLOCK(QW_READ, &mgmt->schLock); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); + qwRelease(refId); return; } @@ -1355,6 +1374,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { QW_UNLOCK(QW_READ, &mgmt->schLock); QW_ELOG("calloc %d SQWHbInfo failed", schNum); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); + qwRelease(refId); return; } @@ -1396,18 +1416,72 @@ _return: taosMemoryFreeClear(rspList); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); + qwRelease(refId); +} + +void qwCloseRef(void) { + taosWLockLatch(&gQwMgmt.lock); + if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { + taosCloseRef(gQwMgmt.qwRef); + gQwMgmt.qwRef= -1; + } + taosWUnLockLatch(&gQwMgmt.lock); } +void qwDestroyImpl(void *pMgmt) { + SQWorker *mgmt = (SQWorker *)pMgmt; + + taosTmrStopA(&mgmt->hbTimer); + taosTmrCleanUp(mgmt->timer); + + // TODO STOP ALL QUERY + + // TODO FREE ALL + + taosHashCleanup(mgmt->ctxHash); + taosHashCleanup(mgmt->schHash); + + taosMemoryFree(mgmt); + + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + + qwCloseRef(); +} + +int32_t qwOpenRef(void) { + taosWLockLatch(&gQwMgmt.lock); + if (gQwMgmt.qwRef < 0) { + gQwMgmt.qwRef= taosOpenRef(100, qwDestroyImpl); + if (gQwMgmt.qwRef < 0) { + taosWUnLockLatch(&gQwMgmt.lock); + qError("init qworker ref failed"); + QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + taosWUnLockLatch(&gQwMgmt.lock); + + return TSDB_CODE_SUCCESS; +} + + int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) { qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t code = 0; - SQWorkerMgmt *mgmt = taosMemoryCalloc(1, sizeof(SQWorkerMgmt)); + atomic_add_fetch_32(&gQwMgmt.qwNum, 1); + + int32_t code = qwOpenRef(); + if (code) { + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + QW_RET(code); + } + + SQWorker *mgmt = taosMemoryCalloc(1, sizeof(SQWorker)); if (NULL == mgmt) { - qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt)); + qError("calloc %d failed", (int32_t)sizeof(SQWorker)); + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1449,16 +1523,30 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, mgmt, mgmt->timer); - if (NULL == mgmt->hbTimer) { - qError("start hb timer failed"); - QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - mgmt->nodeType = nodeType; mgmt->nodeId = nodeId; mgmt->msgCb = *pMsgCb; + mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt); + if (mgmt->refId < 0) { + qError("taosAddRef qw failed, error:%s", tstrerror(terrno)); + QW_ERR_JRET(terrno); + } + + SQWHbParam *param = taosMemoryMalloc(sizeof(SQWHbParam)); + if (NULL == param) { + qError("malloc hb param failed, error:%s", tstrerror(terrno)); + QW_ERR_JRET(terrno); + } + param->qwrId = gQwMgmt.qwRef; + param->refId = mgmt->refId; + + mgmt->hbTimer = taosTmrStart(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, (void*)param, mgmt->timer); + if (NULL == mgmt->hbTimer) { + qError("start hb timer failed"); + QW_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + *qWorkerMgmt = mgmt; qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt); @@ -1467,13 +1555,17 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW _return: - taosHashCleanup(mgmt->schHash); - taosHashCleanup(mgmt->ctxHash); - - taosTmrCleanUp(mgmt->timer); - - taosMemoryFreeClear(mgmt); + if (mgmt->refId >= 0) { + qwRelease(mgmt->refId); + } else { + taosHashCleanup(mgmt->schHash); + taosHashCleanup(mgmt->ctxHash); + taosTmrCleanUp(mgmt->timer); + taosMemoryFreeClear(mgmt); + atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); + } + QW_RET(code); } @@ -1482,22 +1574,14 @@ void qWorkerDestroy(void **qWorkerMgmt) { return; } - SQWorkerMgmt *mgmt = *qWorkerMgmt; - - taosTmrStopA(&mgmt->hbTimer); - taosTmrCleanUp(mgmt->timer); - - // TODO STOP ALL QUERY - - // TODO FREE ALL - - taosHashCleanup(mgmt->ctxHash); - taosHashCleanup(mgmt->schHash); + SQWorker *mgmt = *qWorkerMgmt; - taosMemoryFreeClear(*qWorkerMgmt); + if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) { + qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId); + } } -int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) { +int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) { /* SQWSchStatus *sch = NULL; int32_t taskNum = 0; @@ -1544,7 +1628,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs return TSDB_CODE_SUCCESS; } -int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { +int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { SQWSchStatus *sch = NULL; /* @@ -1557,7 +1641,7 @@ int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, ui return TSDB_CODE_SUCCESS; } -int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) { +int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t *taskStatus) { SQWSchStatus * sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -1584,7 +1668,7 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t QW_RET(code); } -int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { +int32_t qwCancelTask(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { SQWSchStatus * sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 64df4f02bf7e826589f8b2b3d61f9972328dacd4..454db7fb951006cd10d233d99766ca04f9a57721 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -319,7 +319,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { return TSDB_CODE_SUCCESS; } -int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn) { +int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pConn) { SSchedulerHbReq req = {0}; req.header.vgId = mgmt->nodeId; req.sId = sId; @@ -362,7 +362,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SSubQueryMsg *msg = pMsg->pCont; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { QW_ELOG("invalid query msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -404,7 +404,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; bool needStop = false; SQWTaskCtx *handles = NULL; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -435,7 +435,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SResReadyReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task ready msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -477,7 +477,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; msg->sId = htobe64(msg->sId); uint64_t sId = msg->sId; @@ -498,7 +498,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } SResFetchReq *msg = pMsg->pCont; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid fetch msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -538,7 +538,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -578,7 +578,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; STaskDropReq *msg = pMsg->pCont; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen); @@ -620,7 +620,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SSchedulerHbReq req = {0}; - SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (NULL == pMsg->pCont) { QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen); diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index b1ab5253addda28221b88929e45677438d29f790..28d27050f09e118ddba9a6f50e7aec9c6ddd9067 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -188,8 +188,6 @@ CaseCtrl gCaseCtrl = { .caseRunIdx = -1, // .caseRunNum = -1, - .bindColTypeNum = tListLen(bindColTypeList), - .bindColTypeList = bindColTypeList, .caseIdx = 22, .caseNum = 1, .caseRunNum = 1, @@ -318,7 +316,7 @@ void generateInsertSQL(BindData *data) { len += sprintf(data->sql + len, "ubigdata"); break; default: - printf("invalid col type:%d", data->pBind[c].buffer_type); + printf("!!!invalid col type:%d", data->pBind[c].buffer_type); exit(1); } } @@ -336,7 +334,7 @@ void generateInsertSQL(BindData *data) { len += sprintf(data->sql + len, ")"); if (gCaseCtrl.printStmtSql) { - printf("SQL: %s\n", data->sql); + printf("\tSQL: %s\n", data->sql); } } @@ -358,7 +356,7 @@ void bpAppendOperatorParam(BindData *data, int32_t *len, int32_t dataType) { } break; default: - printf("invalid paramNum:%d\n", pInfo->paramNum); + printf("!!!invalid paramNum:%d\n", pInfo->paramNum); exit(1); } } @@ -414,7 +412,7 @@ void generateQuerySQL(BindData *data, int32_t tblIdx) { len += sprintf(data->sql + len, "ubigdata"); break; default: - printf("invalid col type:%d", data->pBind[c].buffer_type); + printf("!!!invalid col type:%d", data->pBind[c].buffer_type); exit(1); } @@ -423,7 +421,7 @@ void generateQuerySQL(BindData *data, int32_t tblIdx) { } if (gCaseCtrl.printStmtSql) { - printf("SQL: %s\n", data->sql); + printf("\tSTMT SQL: %s\n", data->sql); } } @@ -551,7 +549,7 @@ int32_t prepareColData(BindData *data, int32_t bindIdx, int32_t rowIdx, int32_t data->pBind[bindIdx].is_null = data->isNull ? (data->isNull + rowIdx) : NULL; break; default: - printf("invalid col type:%d", dataType); + printf("!!!invalid col type:%d", dataType); exit(1); } @@ -709,7 +707,7 @@ void bpFetchRows(TAOS_RES *result, bool printr, int32_t *rows) { if (printr) { memset(temp, 0, sizeof(temp)); taos_print_row(temp, row, fields, num_fields); - printf("[%s]\n", temp); + printf("\t[%s]\n", temp); } } } @@ -718,7 +716,7 @@ void bpExecQuery(TAOS * taos, char* sql, bool printr, int32_t *rows) { TAOS_RES *result = taos_query(taos, sql); int code = taos_errno(result); if (code != 0) { - printf("failed to query table, reason:%s\n", taos_errstr(result)); + printf("!!!failed to query table, reason:%s\n", taos_errstr(result)); taos_free_result(result); exit(1); } @@ -791,7 +789,7 @@ int32_t bpAppendValueString(char *buf, int type, void *value, int32_t valueLen, break; default: - printf("invalid data type:%d\n", type); + printf("!!!invalid data type:%d\n", type); exit(1); } } @@ -803,13 +801,13 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { if (gCurCase->bindRowNum > 1) { if (0 == (n++%2)) { if (taos_stmt_bind_param_batch(stmt, bind)) { - printf("taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } else { for (int32_t i = 0; i < gCurCase->bindColNum; ++i) { if (taos_stmt_bind_single_param_batch(stmt, bind++, i)) { - printf("taos_stmt_bind_single_param_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_bind_single_param_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -817,12 +815,12 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { } else { if (0 == (n++%2)) { if (taos_stmt_bind_param_batch(stmt, bind)) { - printf("taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_bind_param_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } else { if (taos_stmt_bind_param(stmt, bind)) { - printf("taos_stmt_bind_param error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_bind_param error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -834,12 +832,12 @@ int32_t bpBindParam(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { void bpCheckIsInsert(TAOS_STMT *stmt, int32_t insert) { int32_t isInsert = 0; if (taos_stmt_is_insert(stmt, &isInsert)) { - printf("taos_stmt_is_insert error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_is_insert error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (insert != isInsert) { - printf("is insert failed\n"); + printf("!!!is insert failed\n"); exit(1); } } @@ -847,12 +845,12 @@ void bpCheckIsInsert(TAOS_STMT *stmt, int32_t insert) { void bpCheckParamNum(TAOS_STMT *stmt) { int32_t num = 0; if (taos_stmt_num_params(stmt, &num)) { - printf("taos_stmt_num_params error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_num_params error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (gCurCase->bindColNum != num) { - printf("is insert failed\n"); + printf("!!!is insert failed\n"); exit(1); } } @@ -861,7 +859,7 @@ void bpCheckAffectedRows(TAOS_STMT *stmt, int32_t times) { int32_t rows = taos_stmt_affected_rows(stmt); int32_t insertNum = gCurCase->rowNum * gCurCase->tblNum * times; if (insertNum != rows) { - printf("affected rows %d mis-match with insert num %d\n", rows, insertNum); + printf("!!!affected rows %d mis-match with insert num %d\n", rows, insertNum); exit(1); } } @@ -869,7 +867,7 @@ void bpCheckAffectedRows(TAOS_STMT *stmt, int32_t times) { void bpCheckAffectedRowsOnce(TAOS_STMT *stmt, int32_t expectedNum) { int32_t rows = taos_stmt_affected_rows_once(stmt); if (expectedNum != rows) { - printf("affected rows %d mis-match with expected num %d\n", rows, expectedNum); + printf("!!!affected rows %d mis-match with expected num %d\n", rows, expectedNum); exit(1); } } @@ -904,16 +902,16 @@ void bpCheckQueryResult(TAOS_STMT *stmt, TAOS *taos, char *stmtSql, TAOS_MULTI_B } if (gCaseCtrl.printQuerySql) { - printf("Query SQL: %s\n", sql); + printf("\tQuery SQL: %s\n", sql); } bpExecQuery(taos, sql, gCaseCtrl.printRes, &sqlResNum); if (sqlResNum != stmtResNum) { - printf("sql res num %d mis-match stmt res num %d\n", sqlResNum, stmtResNum); + printf("!!!sql res num %d mis-match stmt res num %d\n", sqlResNum, stmtResNum); exit(1); } - printf("sql res num match stmt res num %d\n", stmtResNum); + printf("***sql res num match stmt res num %d\n", stmtResNum); } /* prepare [settbname [bind add]] exec */ @@ -923,7 +921,7 @@ int insertMBSETest1(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -936,7 +934,7 @@ int insertMBSETest1(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -951,14 +949,14 @@ int insertMBSETest1(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -978,7 +976,7 @@ int insertMBSETest2(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -993,7 +991,7 @@ int insertMBSETest2(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1007,14 +1005,14 @@ int insertMBSETest2(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1033,7 +1031,7 @@ int insertMBMETest1(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1046,7 +1044,7 @@ int insertMBMETest1(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1061,13 +1059,13 @@ int insertMBMETest1(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1087,7 +1085,7 @@ int insertMBMETest2(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1100,7 +1098,7 @@ int insertMBMETest2(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1115,12 +1113,12 @@ int insertMBMETest2(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1141,7 +1139,7 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1154,7 +1152,7 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1169,7 +1167,7 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1179,12 +1177,12 @@ int insertMBMETest3(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1206,7 +1204,7 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1221,7 +1219,7 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1235,12 +1233,12 @@ int insertMBMETest4(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1264,7 +1262,7 @@ int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1277,7 +1275,7 @@ int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos) { sprintf(buf, "t%d", t); code = taos_stmt_set_tbname(stmt, buf); if (code != 0){ - printf("taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_set_tbname error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1292,13 +1290,13 @@ int insertMPMETest1(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } } @@ -1328,7 +1326,7 @@ int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos) { int code = taos_stmt_prepare(stmt, data.sql, 0); if (code != 0){ - printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -1344,12 +1342,12 @@ int querySUBTTest1(TAOS_STMT *stmt, TAOS *taos) { } if (taos_stmt_add_batch(stmt)) { - printf("taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_add_batch error:%s\n", taos_stmt_errstr(stmt)); exit(1); } if (taos_stmt_execute(stmt) != 0) { - printf("taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_execute error:%s\n", taos_stmt_errstr(stmt)); exit(1); } @@ -4249,10 +4247,10 @@ void prepareCheckResultImpl(TAOS * taos, char *tname, bool printr, int expec if (rows == expected) { if (!silent) { - printf("%d rows are fetched as expected from %s\n", rows, tname); + printf("***%d rows are fetched as expected from %s\n", rows, tname); } } else { - printf("!!!expect %d rows, but %d rows are fetched from %s\n", expected, rows, tname); + printf("!!!expect rows %d mis-match rows %d fetched from %s\n", expected, rows, tname); exit(1); } } @@ -4302,7 +4300,7 @@ int sql_perf1(TAOS *taos) { result = taos_query(taos, sql[i]); int code = taos_errno(result); if (code != 0) { - printf("failed to query table, reason:%s\n", taos_errstr(result)); + printf("%d failed to query table, reason:%s\n", taos_errstr(result)); taos_free_result(result); exit(1); } @@ -4539,7 +4537,7 @@ void generateCreateTableSQL(char *buf, int32_t tblIdx, int32_t colNum, int32_t * blen += sprintf(buf + blen, ")"); if (gCaseCtrl.printCreateTblSql) { - printf("Create Table SQL:%s\n", buf); + printf("\tCreate Table SQL:%s\n", buf); } } @@ -4553,7 +4551,7 @@ void prepare(TAOS *taos, int32_t colNum, int32_t *colList, int prepareStb) { result = taos_query(taos, "create database demo keep 36500"); code = taos_errno(result); if (code != 0) { - printf("failed to create database, reason:%s\n", taos_errstr(result)); + printf("!!!failed to create database, reason:%s\n", taos_errstr(result)); taos_free_result(result); exit(1); } @@ -4570,7 +4568,7 @@ void prepare(TAOS *taos, int32_t colNum, int32_t *colList, int prepareStb) { result = taos_query(taos, buf); code = taos_errno(result); if (code != 0) { - printf("failed to create table, reason:%s\n", taos_errstr(result)); + printf("!!!failed to create table, reason:%s\n", taos_errstr(result)); taos_free_result(result); exit(1); } @@ -4583,7 +4581,7 @@ void prepare(TAOS *taos, int32_t colNum, int32_t *colList, int prepareStb) { result = taos_query(taos, buf); code = taos_errno(result); if (code != 0) { - printf("failed to create table, reason:%s\n", taos_errstr(result)); + printf("!!!failed to create table, reason:%s\n", taos_errstr(result)); taos_free_result(result); exit(1); } @@ -4654,7 +4652,7 @@ int32_t runCase(TAOS *taos, int32_t caseIdx, int32_t caseRunIdx, bool silent) { stmt = taos_stmt_init(taos); if (NULL == stmt) { - printf("taos_stmt_init failed, error:%s\n", taos_stmt_errstr(stmt)); + printf("!!!taos_stmt_init failed, error:%s\n", taos_stmt_errstr(stmt)); exit(1); }