diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 66b3093a8d88bc17c785e1217268894e01aee18b..f1f7b3cfbcfc671985c306387c6407d1fd2a45f2 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3880,11 +3880,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { if (pCtx->end.key != INT64_MIN) { pInfo->min = pCtx->end.key; } else { - pInfo->min = ptsList[0]; + pInfo->min = ptsList[start]; } } else { if (pCtx->start.key == INT64_MIN) { - pInfo->min = (pInfo->min > ptsList[0]) ? ptsList[0] : pInfo->min; + pInfo->min = (pInfo->min > ptsList[start]) ? ptsList[start] : pInfo->min; } else { pInfo->min = pCtx->start.key; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index de9f16f1717e90739c80d4ae07b63271a0b45ab7..6a30f32f2b81ff8f83137a8c5c99dc017f6697e1 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -787,6 +787,14 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return code; } +static bool isPrimaryKeySort(SNodeList* pOrderByList) { + SNode* pExpr = ((SOrderByExprNode*)nodesListGetNode(pOrderByList, 0))->pExpr; + if (QUERY_NODE_COLUMN != nodeType(pExpr)) { + return false; + } + return PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId; +} + static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { if (NULL == pSelect->pOrderByList) { return TSDB_CODE_SUCCESS; @@ -800,7 +808,9 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pSort->groupSort = pSelect->groupSort; pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; - pSort->node.resultDataOrder = pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL; + pSort->node.resultDataOrder = isPrimaryKeySort(pSelect->pOrderByList) + ? (pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL) + : DATA_ORDER_LEVEL_NONE; int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, COLLECT_COL_TYPE_ALL, &pSort->node.pTargets); if (TSDB_CODE_SUCCESS == code && NULL == pSort->node.pTargets) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 9e844dd6a2ca8564764b1556250d0304ea09571b..8586234b7e28342b0e51890306a37c4b22e56487 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -215,6 +215,14 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)); } +static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) { + if (1 != LIST_LENGTH(pNode->pChildren)) { + return false; + } + SNode* pChild = nodesListGetNode(pNode->pChildren, 0); + return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)); +} + static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) { SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; if (WINDOW_TYPE_INTERVAL == pWindow->winType) { @@ -247,7 +255,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { case QUERY_NODE_LOGIC_PLAN_JOIN: return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); case QUERY_NODE_LOGIC_PLAN_PARTITION: - return stbSplHasMultiTbScan(streamQuery, pNode); + return stbSplIsMultiTbScanChild(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_AGG: return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: @@ -969,8 +977,28 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } +static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) { + SNode* pPrimaryKey = + nodesCloneNode(stbSplFindPrimaryKeyFromScan((SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0))); + if (NULL == pPrimaryKey) { + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pMergeKeys); + } + return code; +} + static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true); + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pMergeKeys = NULL; + if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) { + code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true); + } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); diff --git a/source/libs/planner/test/planPartByTest.cpp b/source/libs/planner/test/planPartByTest.cpp index 256960a15ef70c6c5484b20a7022ff2e8f79abd9..48ab1e1ac2cc5cf9f8b836b9d7940df047019f6e 100644 --- a/source/libs/planner/test/planPartByTest.cpp +++ b/source/libs/planner/test/planPartByTest.cpp @@ -67,6 +67,8 @@ TEST_F(PlanPartitionByTest, withTimeLineFunc) { useDb("root", "test"); run("SELECT TWA(c1) FROM st1 PARTITION BY c1"); + + run("SELECT MAVG(c1, 2) FROM st1 PARTITION BY c1"); } TEST_F(PlanPartitionByTest, withSlimit) { diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 3fa6344009b50ccb89f27b6d56ce9b24d282ed16..9fd4e483c28d1e6a0c08618e98bd5a30aae93807 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -396,6 +396,7 @@ typedef struct SDelayQueue { int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); +void transDQCancel(SDelayQueue* queue, SDelayTask* task); bool transEpSetIsEqual(SEpSet* a, SEpSet* b); /* diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9de8c273d95d29fb66b467a1013052953d9d51ce..638067e7ef8e2b9bf4ecbafc4f4b5570b00537a8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -27,7 +27,6 @@ typedef struct SCliConn { SConnBuffer readBuf; STransQueue cliMsgs; queue q; - uint64_t expireTime; STransCtx ctx; bool broken; // link broken or not @@ -37,6 +36,7 @@ typedef struct SCliConn { char* ip; uint32_t port; + SDelayTask* task; // debug and log info struct sockaddr_in addr; struct sockaddr_in localAddr; @@ -65,6 +65,7 @@ typedef struct SCliThrd { queue msg; TdThreadMutex msgMtx; SDelayQueue* delayQueue; + SDelayQueue* timeoutQueue; uint64_t nextTimeout; // next timeout void* pTransInst; // @@ -92,9 +93,10 @@ static void* createConnPool(int size); static void* destroyConnPool(void* pool); static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, SCliConn* conn); +static void doCloseIdleConn(void* param); // register timer in each thread to clear expire conn -static void cliTimeoutCb(uv_timer_t* handle); +// static void cliTimeoutCb(uv_timer_t* handle); // alloc buf for recv static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // callback after read nbytes from socket @@ -184,7 +186,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { pThrd = (SCliThrd*)(exh)->pThrd; \ } \ } while (0) -#define CONN_PERSIST_TIME(para) (para * 1000 * 10) +#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ @@ -384,10 +386,6 @@ void cliHandleResp(SCliConn* conn) { } uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); - // start thread's timer of conn pool if not active - if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { - // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); - } } void cliHandleExcept(SCliConn* pConn) { @@ -441,30 +439,30 @@ void cliHandleExcept(SCliConn* pConn) { transUnrefCliHandle(pConn); } -void cliTimeoutCb(uv_timer_t* handle) { - SCliThrd* pThrd = handle->data; - STrans* pTransInst = pThrd->pTransInst; - int64_t currentTime = pThrd->nextTimeout; - tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); - - SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); - while (p != NULL) { - while (!QUEUE_IS_EMPTY(&p->conn)) { - queue* h = QUEUE_HEAD(&p->conn); - SCliConn* c = QUEUE_DATA(h, SCliConn, q); - if (c->expireTime < currentTime) { - QUEUE_REMOVE(h); - transUnrefCliHandle(c); - } else { - break; - } - } - p = taosHashIterate((SHashObj*)pThrd->pool, p); - } - - pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); - uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0); -} +// void cliTimeoutCb(uv_timer_t* handle) { +// SCliThrd* pThrd = handle->data; +// STrans* pTransInst = pThrd->pTransInst; +// int64_t currentTime = pThrd->nextTimeout; +// tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); +// +// SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); +// while (p != NULL) { +// while (!QUEUE_IS_EMPTY(&p->conn)) { +// queue* h = QUEUE_HEAD(&p->conn); +// SCliConn* c = QUEUE_DATA(h, SCliConn, q); +// if (c->expireTime < currentTime) { +// QUEUE_REMOVE(h); +// transUnrefCliHandle(c); +// } else { +// break; +// } +// } +// p = taosHashIterate((SHashObj*)pThrd->pool, p); +// } +// +// pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); +// uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0); +// } void* createConnPool(int size) { // thread local, no lock @@ -506,6 +504,10 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); assert(h == &conn->q); + + transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + conn->task = NULL; + return conn; } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -537,6 +539,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { transReleaseExHandle(transGetRefMgt(), handle); return 0; } + static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; @@ -547,7 +550,6 @@ static void addConnToPool(void* pool, SCliConn* conn) { allocConnRef(conn, true); STrans* pTransInst = thrd->pTransInst; - conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); cliReleaseUnfinishedMsg(conn); transQueueClear(&conn->cliMsgs); transCtxCleanup(&conn->ctx); @@ -562,7 +564,13 @@ static void addConnToPool(void* pool, SCliConn* conn) { assert(plist != NULL); QUEUE_INIT(&conn->q); QUEUE_PUSH(&plist->conn, &conn->q); + assert(!QUEUE_IS_EMPTY(&plist->conn)); + + STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); + arg->param1 = conn; + arg->param2 = thrd; + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); } static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; @@ -631,6 +639,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; + if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + if (clear) { if (!uv_is_closing((uv_handle_t*)conn->stream)) { uv_read_stop(conn->stream); @@ -997,6 +1007,8 @@ static SCliThrd* createThrdObj() { pThrd->pool = createConnPool(4); transDQCreate(pThrd->loop, &pThrd->delayQueue); + transDQCreate(pThrd->loop, &pThrd->timeoutQueue); + pThrd->quit = false; return pThrd; } @@ -1012,6 +1024,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { transAsyncPoolDestroy(pThrd->asyncPool); transDQDestroy(pThrd->delayQueue, destroyCmsg); + transDQDestroy(pThrd->timeoutQueue, NULL); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } @@ -1058,6 +1071,10 @@ static void doCloseIdleConn(void* param) { STaskArg* arg = param; SCliConn* conn = arg->param1; SCliThrd* pThrd = arg->param2; + tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); + conn->task = NULL; + cliDestroyConn(conn, true); + taosMemoryFree(arg); } static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { @@ -1248,11 +1265,17 @@ int transReleaseCliHandle(void* handle) { if (pThrd == NULL) { return -1; } + STransMsg tmsg = {.info.handle = handle}; - SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); + + SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cmsg->msg = tmsg; cmsg->type = Release; + STraceId* trace = &tmsg.info.traceId; + tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid); + if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { return -1; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 34849df2b277c0a770677b28a4bd6056666febfd..6ac75a75e15aa068ce5cbed3aa17c14583a74ad7 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -480,7 +480,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { SDelayTask* task = container_of(minNode, SDelayTask, node); STaskArg* arg = task->arg; - freeFunc(arg->param1); + if (freeFunc) freeFunc(arg->param1); taosMemoryFree(arg); taosMemoryFree(task); @@ -491,9 +491,16 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { void transDQCancel(SDelayQueue* queue, SDelayTask* task) { uv_timer_stop(queue->timer); - if (heapSize(queue->heap) <= 0) return; + if (heapSize(queue->heap) <= 0) { + taosMemoryFree(task->arg); + taosMemoryFree(task); + return; + } heapRemove(queue->heap, &task->node); + taosMemoryFree(task->arg); + taosMemoryFree(task); + if (heapSize(queue->heap) != 0) { HeapNode* minNode = heapMin(queue->heap); if (minNode != NULL) return; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 3fb947bdba975055e0fad34c8e92d1683fb94b10..9b898474776a10a299c05ea5cc59e9bf5f1f9a71 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -149,32 +149,34 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \ - SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - reallocConnRef(conn); \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - if (conn->regArg.init) { \ - tTrace("conn %p release, notify server app", conn); \ - STrans* pTransInst = conn->pTransInst; \ - (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ - memset(&conn->regArg, 0, sizeof(conn->regArg)); \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + reallocConnRef(conn); \ + tTrace("conn %p received release request", conn); \ + \ + STraceId traceId = head->traceId; \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + \ + STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = NULL}; \ + SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + if (conn->regArg.init) { \ + tTrace("conn %p release, notify server app", conn); \ + STrans* pTransInst = conn->pTransInst; \ + (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ + memset(&conn->regArg, 0, sizeof(conn->regArg)); \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ } while (0) #define SRV_RELEASE_UV(loop) \ diff --git a/tests/system-test/7-tmq/tmqAutoCreateTbl.py b/tests/system-test/7-tmq/tmqAutoCreateTbl.py index ea100ae0d3d16838c2c8f683780458f0d9eabb0a..277fdf7afb226a49de00312b3240c70106ae4a4d 100644 --- a/tests/system-test/7-tmq/tmqAutoCreateTbl.py +++ b/tests/system-test/7-tmq/tmqAutoCreateTbl.py @@ -18,7 +18,7 @@ class TDTestCase: def __init__(self): self.snapshot = 0 self.vgroups = 4 - self.ctbNum = 1000 + self.ctbNum = 500 self.rowsPerTbl = 1000 def init(self, conn, logSql): @@ -38,9 +38,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1000, + 'ctbNum': 500, 'rowsPerTbl': 1000, - 'batchNum': 400, + 'batchNum': 500, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 3, 'showMsg': 1, @@ -83,9 +83,9 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1000, + 'ctbNum': 500, 'rowsPerTbl': 1000, - 'batchNum': 1000, + 'batchNum': 500, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'pollDelay': 5, 'showMsg': 1, @@ -156,7 +156,7 @@ class TDTestCase: 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'ctbPrefix': 'ctb', 'ctbStartIdx': 0, - 'ctbNum': 1000, + 'ctbNum': 500, 'rowsPerTbl': 1000, 'batchNum': 1000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index c8ddc8d3c2721127704e5a26d7727be72788efb7..20cffed486a5ad592c92a4b1e259170764f46c03 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -539,16 +539,30 @@ class TMQCom: insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});' tsql.execute(insert_sql) - def waitSubscriptionExit(self, tsql, max_wait_count=20): + def waitSubscriptionExit(self, tsql, topicName): wait_cnt = 0 - while (wait_cnt < max_wait_count): + while True: + exit_flag = 1 tsql.query("show subscriptions") - if tsql.getRows() == 0: - break - else: - time.sleep(2) - wait_cnt += 1 + rows = tsql.getRows() + for idx in range (rows): + if tsql.getData(idx, 0) != topicName: + continue + if tsql.getData(idx, 3) == None: + continue + else: + time.sleep(0.5) + wait_cnt += 1 + exit_flag = 0 + break + + if exit_flag == 1: + break + + tsql.query("show subscriptions") + tdLog.info("show subscriptions:") + tdLog.info(tsql.queryResult) tdLog.info("wait subscriptions exit for %d s"%wait_cnt) def close(self): diff --git a/tests/system-test/7-tmq/tmqDropNtb.py b/tests/system-test/7-tmq/tmqDropNtb.py index 5d58c38690d48c6952977691b5aa97c531a1ca12..e1f5794ce278d05cf316365fff0a3ffa91f58e27 100644 --- a/tests/system-test/7-tmq/tmqDropNtb.py +++ b/tests/system-test/7-tmq/tmqDropNtb.py @@ -103,7 +103,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - tmqCom.waitSubscriptionExit(tdSql) + tmqCom.waitSubscriptionExit(tdSql, topicFromDb) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) @@ -196,7 +196,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - tmqCom.waitSubscriptionExit(tdSql) + tmqCom.waitSubscriptionExit(tdSql, topicFromDb) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) diff --git a/tests/system-test/7-tmq/tmqDropStbCtb.py b/tests/system-test/7-tmq/tmqDropStbCtb.py index e6783a281566621fb62fa32cdacee6bd62e4c669..f86d1295f4ff774473e56079d5b3d00b6ae0d2d1 100644 --- a/tests/system-test/7-tmq/tmqDropStbCtb.py +++ b/tests/system-test/7-tmq/tmqDropStbCtb.py @@ -51,7 +51,7 @@ class TDTestCase: paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl - tmqCom.initConsumerTable() + # tmqCom.initConsumerTable() tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) tdLog.info("create stb") tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) @@ -98,6 +98,8 @@ class TDTestCase: paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() + # again create one new stb1 paraDict["stbName"] = 'stb1' paraDict['ctbPrefix'] = 'ctb1n_' @@ -157,7 +159,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - tmqCom.waitSubscriptionExit(tdSql) + tmqCom.waitSubscriptionExit(tdSql, topicFromDb) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) @@ -191,6 +193,8 @@ class TDTestCase: paraDict['ctbNum'] = self.ctbNum paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.initConsumerTable() + # again create one new stb1 paraDict["stbName"] = 'stb2' paraDict['ctbPrefix'] = 'ctb2n_' @@ -209,9 +213,9 @@ class TDTestCase: tdSql.execute("create topic %s as database %s" %(topicFromDb, paraDict['dbName'])) if self.snapshot == 0: - consumerId = 0 + consumerId = 2 elif self.snapshot == 1: - consumerId = 1 + consumerId = 3 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2) topicList = topicFromDb @@ -246,7 +250,7 @@ class TDTestCase: tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") - tmqCom.waitSubscriptionExit(tdSql) + tmqCom.waitSubscriptionExit(tdSql, topicFromDb) tdSql.query("drop topic %s"%topicFromDb) tdLog.info("success dorp topic: %s"%topicFromDb) diff --git a/tests/system-test/7-tmq/tmqUpdateWithConsume.py b/tests/system-test/7-tmq/tmqUpdateWithConsume.py new file mode 100644 index 0000000000000000000000000000000000000000..4f21beffc40e825ce143bf23060f566048ec16b4 --- /dev/null +++ b/tests/system-test/7-tmq/tmqUpdateWithConsume.py @@ -0,0 +1,190 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.snapshot = 0 + self.vgroups = 4 + self.ctbNum = 100 + self.rowsPerTbl = 1000 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # tdLog.info("restart taosd to ensure that the data falls into the disk") + tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + # update to half tables + paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2) + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + consumerId = 0 + + if self.snapshot == 0: + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/2 + 1)) + elif self.snapshot == 1: + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (2)) + + topicList = topicFromStb1 + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + # update to half data + paraDict["startTs"] = paraDict["startTs"] + int(self.rowsPerTbl / 2) + tmqCom.asyncInsertDataByInterlace(paraDict) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) + if self.snapshot == 0: + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + elif self.snapshot == 1: + if not (totalConsumeRows < expectrowcnt and totalConsumeRows >= totalRowsInserted): + tdLog.exit("tmq consume rows error!") + + # tmqCom.checkFileContent(consumerId, queryString) + + tdSql.query("drop topic %s"%topicFromStb1) + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") + self.ctbNum = 1 + self.snapshot = 0 + self.prepareTestEnv() + self.tmqCase1() + + tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") + self.prepareTestEnv() + self.snapshot = 1 + self.tmqCase1() + + tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") + self.ctbNum = 100 + self.snapshot = 0 + self.prepareTestEnv() + self.tmqCase1() + tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") + self.prepareTestEnv() + self.snapshot = 1 + self.tmqCase1() + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 410a7e210f10b73903f8a089076b3f22745c13fa..37b6db3f3c11cc3b77f216f98e955a4df329835a 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -209,17 +209,20 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py -#python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py +python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py #python3 ./test.py -f 7-tmq/tmqDnodeRestart.py python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py +python3 ./test.py -f 7-tmq/tmqUpdateWithConsume.py python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot1.py python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py python3 ./test.py -f 7-tmq/tmqDelete-multiCtb.py python3 ./test.py -f 7-tmq/tmqDropStb.py +python3 ./test.py -f 7-tmq/tmqDropStbCtb.py +python3 ./test.py -f 7-tmq/tmqDropNtb.py python3 ./test.py -f 7-tmq/tmqUdf.py # python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py -# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py +python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py # python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py diff --git a/tools/shell/inc/shellInt.h b/tools/shell/inc/shellInt.h index 825866e16361b35ca86abeac62ff5f765ce67a10..358377f8049f4b8aaf5c1499a21e1e566dac5490 100644 --- a/tools/shell/inc/shellInt.h +++ b/tools/shell/inc/shellInt.h @@ -26,6 +26,10 @@ #include "ttypes.h" #include "tutil.h" +#ifdef WEBSOCKET +#include "taosws.h" +#endif + #define SHELL_MAX_HISTORY_SIZE 1000 #define SHELL_MAX_COMMAND_SIZE 1048586 #define SHELL_HISTORY_FILE ".taos_history" @@ -67,6 +71,12 @@ typedef struct { int32_t pktNum; int32_t displayWidth; int32_t abort; +#ifdef WEBSOCKET + bool restful; + bool cloud; + char* dsn; + int32_t timeout; +#endif } SShellArgs; typedef struct { @@ -85,6 +95,10 @@ typedef struct { TAOS* conn; TdThread pid; tsem_t cancelSem; +#ifdef WEBSOCKET + WS_TAOS* ws_conn; + bool stop_query; +#endif } SShellObj; // shellArguments.c @@ -95,7 +109,10 @@ int32_t shellReadCommand(char* command); // shellEngine.c int32_t shellExecute(); - +int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision); +void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields); +void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision); +void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision); // shellUtil.c int32_t shellCheckIntSize(); void shellPrintVersion(); @@ -109,6 +126,14 @@ void shellExit(); // shellNettest.c void shellTestNetWork(); +#ifdef WEBSOCKET +void shellCheckConnectMode(); +// shellWebsocket.c +int shell_conn_ws_server(bool first); +int32_t shell_run_websocket(); +void shellRunSingleCommandWebsocketImp(char *command); +#endif + // shellMain.c extern SShellObj shell; diff --git a/tools/shell/src/shellArguments.c b/tools/shell/src/shellArguments.c index 466aa523903b6d8ae3270a6cfd7cae0e39d69257..88ef46c5d64317ef55129e0cc27a7daa449d6e3f 100644 --- a/tools/shell/src/shellArguments.c +++ b/tools/shell/src/shellArguments.c @@ -43,6 +43,12 @@ #define SHELL_VERSION "Print program version." #define SHELL_EMAIL "" +#ifdef WEBSOCKET +#define SHELL_DSN "The dsn to use when connecting to cloud server." +#define SHELL_REST "Use restful mode when connecting." +#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 10." +#endif + static int32_t shellParseSingleOpt(int32_t key, char *arg); void shellPrintHelp() { @@ -65,6 +71,11 @@ void shellPrintHelp() { printf("%s%s%s%s\r\n", indent, "-s,", indent, SHELL_CMD); printf("%s%s%s%s\r\n", indent, "-t,", indent, SHELL_STARTUP); printf("%s%s%s%s\r\n", indent, "-u,", indent, SHELL_USER); +#ifdef WEBSOCKET + printf("%s%s%s%s\r\n", indent, "-E,", indent, SHELL_DSN); + printf("%s%s%s%s\r\n", indent, "-R,", indent, SHELL_REST); + printf("%s%s%s%s\r\n", indent, "-T,", indent, SHELL_TIMEOUT); +#endif printf("%s%s%s%s\r\n", indent, "-w,", indent, SHELL_WIDTH); printf("%s%s%s%s\r\n", indent, "-V,", indent, SHELL_VERSION); printf("\r\n\r\nReport bugs to %s.\r\n", SHELL_EMAIL); @@ -95,6 +106,11 @@ static struct argp_option shellOptions[] = { {"display-width", 'w', "WIDTH", 0, SHELL_WIDTH}, {"netrole", 'n', "NETROLE", 0, SHELL_NET_ROLE}, {"pktlen", 'l', "PKTLEN", 0, SHELL_PKG_LEN}, +#ifdef WEBSOCKET + {"dsn", 'E', "DSN", 0, SHELL_DSN}, + {"restful", 'R', 0, 0, SHELL_REST}, + {"timeout", 'T', "SECONDS", 0, SHELL_TIMEOUT}, +#endif {"pktnum", 'N', "PKTNUM", 0, SHELL_PKT_NUM}, {0}, }; @@ -120,9 +136,15 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) { switch (key) { case 'h': pArgs->host = arg; +#ifdef WEBSOCKET + pArgs->cloud = false; +#endif break; case 'P': pArgs->port = atoi(arg); +#ifdef WEBSOCKET + pArgs->cloud = false; +#endif if (pArgs->port == 0) pArgs->port = -1; break; case 'u': @@ -137,6 +159,9 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) { pArgs->is_gen_auth = true; break; case 'c': +#ifdef WEBSOCKET + pArgs->cloud = false; +#endif pArgs->cfgdir = arg; break; case 'C': @@ -172,6 +197,18 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) { case 'N': pArgs->pktNum = atoi(arg); break; +#ifdef WEBSOCKET + case 'R': + pArgs->restful = true; + break; + case 'E': + pArgs->dsn = arg; + pArgs->cloud = true; + break; + case 'T': + pArgs->timeout = atoi(arg); + break; +#endif case 'V': pArgs->is_version = true; break; @@ -208,7 +245,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) { } if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' || key[1] == 'a' || key[1] == 'c' || key[1] == 's' || - key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N') { + key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N' +#ifdef WEBSOCKET + || key[1] == 'E' || key[1] == 'T' +#endif + ) { if (i + 1 >= argc) { fprintf(stderr, "option %s requires an argument\r\n", key); return -1; @@ -221,7 +262,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) { shellParseSingleOpt(key[1], val); i++; } else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' || key[1] == 'r' || key[1] == 'k' || - key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1) { + key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1 +#ifdef WEBSOCKET + ||key[1] == 'R' +#endif + ) { shellParseSingleOpt(key[1], NULL); } else { fprintf(stderr, "invalid option %s\r\n", key); diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 56bc1ed6cc98468d87516236ef08b99aaa653936..a2310ea9c931afb648f2ba9dab2ed4c0ab6dcf69 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -25,14 +25,9 @@ static int32_t shellRunSingleCommand(char *command); static int32_t shellRunCommand(char *command); static void shellRunSingleCommandImp(char *command); static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision); -static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, - int32_t precision); static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres); static void shellPrintNChar(const char *str, int32_t length, int32_t width); -static void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision); static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql); -static int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision); -static void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields); static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql); static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql); static void shellReadHistory(); @@ -94,8 +89,15 @@ int32_t shellRunSingleCommand(char *command) { shellSourceFile(c_ptr); return 0; } - - shellRunSingleCommandImp(command); +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + shellRunSingleCommandWebsocketImp(command); + } else { +#endif + shellRunSingleCommandImp(command); +#ifdef WEBSOCKET + } +#endif return 0; } @@ -937,7 +939,16 @@ void *shellCancelHandler(void *arg) { taosMsleep(10); continue; } - taos_kill_query(shell.conn); + +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + shell.stop_query = true; + } else { +#endif + taos_kill_query(shell.conn); +#ifdef WEBSOCKET + } +#endif #ifdef WINDOWS printf("\n%s", shell.info.promptHeader); #endif @@ -981,16 +992,26 @@ int32_t shellExecute() { fflush(stdout); SShellArgs *pArgs = &shell.args; - if (shell.args.auth == NULL) { - shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port); +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + if (shell_conn_ws_server(1)) { + return -1; + } } else { - shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port); - } - - if (shell.conn == NULL) { - fflush(stdout); - return -1; +#endif + if (shell.args.auth == NULL) { + shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port); + } else { + shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port); + } + + if (shell.conn == NULL) { + fflush(stdout); + return -1; + } +#ifdef WEBSOCKET } +#endif shellReadHistory(); @@ -1005,8 +1026,16 @@ int32_t shellExecute() { if (pArgs->file[0] != 0) { shellSourceFile(pArgs->file); } +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + ws_close(shell.ws_conn); + } else { +#endif + taos_close(shell.conn); +#ifdef WEBSOCKET + } +#endif - taos_close(shell.conn); shellWriteHistory(); shellCleanupHistory(); return 0; @@ -1026,10 +1055,15 @@ int32_t shellExecute() { taosSetSignal(SIGINT, shellQueryInterruptHandler); - shellGetGrantInfo(); - +#ifdef WEBSOCKET + if (!shell.args.restful && !shell.args.cloud) { +#endif + shellGetGrantInfo(); +#ifdef WEBSOCKET + } +#endif while (1) { - taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn); + taosThreadCreate(&shell.pid, NULL, shellThreadLoop, NULL); taosThreadJoin(shell.pid, NULL); taosThreadClear(&shell.pid); } diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index 6672cee367b04d1a45460a87502c86480d65969b..703533f8a9d0772ccb466e48dbdf3319b020f8d3 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -19,6 +19,11 @@ SShellObj shell = {0}; int main(int argc, char *argv[]) { +#ifdef WEBSOCKET + shell.args.timeout = 10; + shell.args.cloud = true; +#endif + if (shellCheckIntSize() != 0) { return -1; } @@ -41,7 +46,9 @@ int main(int argc, char *argv[]) { shellPrintHelp(); return 0; } - +#ifdef WEBSOCKET + shellCheckConnectMode(); +#endif taos_init(); if (shell.args.is_dump_config) { diff --git a/tools/shell/src/shellUtil.c b/tools/shell/src/shellUtil.c index e96e3d361940dc1f3f981a74d7122d364e72216c..e5e61e0b24d4a93031b8a502882d65badd56e192 100644 --- a/tools/shell/src/shellUtil.c +++ b/tools/shell/src/shellUtil.c @@ -121,6 +121,36 @@ void shellCheckServerStatus() { } } while (1); } +#ifdef WEBSOCKET +void shellCheckConnectMode() { + if (shell.args.dsn) { + shell.args.cloud = true; + shell.args.restful = false; + return; + } + if (shell.args.cloud) { + shell.args.dsn = getenv("TDENGINE_CLOUD_DSN"); + if (shell.args.dsn) { + shell.args.cloud = true; + shell.args.restful = false; + return; + } + if (shell.args.restful) { + if (!shell.args.host) { + shell.args.host = "localhost"; + } + if (!shell.args.port) { + shell.args.port = 6041; + } + shell.args.dsn = taosMemoryCalloc(1, 1024); + snprintf(shell.args.dsn, 1024, "ws://%s:%d/rest/ws", + shell.args.host, shell.args.port); + } + shell.args.cloud = false; + return; + } +} +#endif void shellExit() { if (shell.conn != NULL) { @@ -129,4 +159,4 @@ void shellExit() { } taos_cleanup(); exit(EXIT_FAILURE); -} \ No newline at end of file +} diff --git a/tools/shell/src/shellWebsocket.c b/tools/shell/src/shellWebsocket.c new file mode 100644 index 0000000000000000000000000000000000000000..fee2325c34c0ee3fe5f753575a1e9eea8cb0866f --- /dev/null +++ b/tools/shell/src/shellWebsocket.c @@ -0,0 +1,260 @@ + +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifdef WEBSOCKET +#include "taosws.h" +#include "shellInt.h" + +int shell_conn_ws_server(bool first) { + shell.ws_conn = ws_connect_with_dsn(shell.args.dsn); + if (!shell.ws_conn) { + fprintf(stderr, "failed to connect %s, reason: %s\n", + shell.args.dsn, ws_errstr(NULL)); + return -1; + } + if (first && shell.args.restful) { + fprintf(stdout, "successfully connect to %s\n\n", + shell.args.dsn); + } else if (first && shell.args.cloud) { + fprintf(stdout, "successfully connect to cloud service\n"); + } + return 0; +} + +static int horizontalPrintWebsocket(WS_RES* wres) { + const void* data = NULL; + int rows; + ws_fetch_block(wres, &data, &rows); + if (!rows) { + return 0; + } + int num_fields = ws_field_count(wres); + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres); + int precision = ws_result_precision(wres); + + int width[TSDB_MAX_COLUMNS]; + for (int col = 0; col < num_fields; col++) { + width[col] = shellCalcColWidth(fields + col, precision); + } + + shellPrintHeader(fields, width, num_fields); + + int numOfRows = 0; + do { + uint8_t ty; + uint32_t len; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < num_fields; j++) { + putchar(' '); + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + shellPrintField((const char*)value, fields+j, width[j], len, precision); + putchar(' '); + putchar('|'); + } + putchar('\r'); + putchar('\n'); + } + numOfRows += rows; + ws_fetch_block(wres, &data, &rows); + } while (rows && !shell.stop_query); + return numOfRows; +} + +static int verticalPrintWebsocket(WS_RES* wres) { + int rows = 0; + const void* data = NULL; + ws_fetch_block(wres, &data, &rows); + if (!rows) { + return 0; + } + int num_fields = ws_field_count(wres); + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres); + int precision = ws_result_precision(wres); + + int maxColNameLen = 0; + for (int col = 0; col < num_fields; col++) { + int len = (int)strlen(fields[col].name); + if (len > maxColNameLen) { + maxColNameLen = len; + } + } + int numOfRows = 0; + do { + uint8_t ty; + uint32_t len; + for (int i = 0; i < rows; i++) { + printf("*************************** %d.row ***************************\n", + numOfRows + 1); + for (int j = 0; j < num_fields; j++) { + TAOS_FIELD* field = fields + j; + int padding = (int)(maxColNameLen - strlen(field->name)); + printf("%*.s%s: ", padding, " ", field->name); + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + shellPrintField((const char*)value, field, 0, len, precision); + putchar('\n'); + } + numOfRows++; + } + ws_fetch_block(wres, &data, &rows); + } while (rows && !shell.stop_query); + return numOfRows; +} + +static int dumpWebsocketToFile(const char* fname, WS_RES* wres) { + char fullname[PATH_MAX] = {0}; + if (taosExpandDir(fname, fullname, PATH_MAX) != 0) { + tstrncpy(fullname, fname, PATH_MAX); + } + + TdFilePtr pFile = taosOpenFile(fullname, + TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + if (pFile == NULL) { + fprintf(stderr, "failed to open file: %s\r\n", fullname); + return -1; + } + int rows = 0; + const void* data = NULL; + ws_fetch_block(wres, &data, &rows); + if (!rows) { + taosCloseFile(&pFile); + return 0; + } + int numOfRows = 0; + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres); + int num_fields = ws_field_count(wres); + int precision = ws_result_precision(wres); + for (int col = 0; col < num_fields; col++) { + if (col > 0) { + taosFprintfFile(pFile, ","); + } + taosFprintfFile(pFile, "%s", fields[col].name); + } + taosFprintfFile(pFile, "\r\n"); + do { + uint8_t ty; + uint32_t len; + numOfRows += rows; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < num_fields; j++) { + if (j > 0) { + taosFprintfFile(pFile, ","); + } + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + shellDumpFieldToFile(pFile, (const char*)value, fields + j, len, precision); + } + taosFprintfFile(pFile, "\r\n"); + } + ws_fetch_block(wres, &data, &rows); + } while (rows && !shell.stop_query); + taosCloseFile(&pFile); + return numOfRows; +} + +static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical) { + int numOfRows = 0; + if (fname != NULL) { + numOfRows = dumpWebsocketToFile(fname, wres); + } else if (vertical) { + numOfRows = verticalPrintWebsocket(wres); + } else { + numOfRows = horizontalPrintWebsocket(wres); + } + *error_no = ws_errno(wres); + return numOfRows; +} + +void shellRunSingleCommandWebsocketImp(char *command) { + int64_t st, et; + char *sptr = NULL; + char *cptr = NULL; + char *fname = NULL; + bool printMode = false; + + if ((sptr = strstr(command, ">>")) != NULL) { + cptr = strstr(command, ";"); + if (cptr != NULL) { + *cptr = '\0'; + } + + fname = sptr + 2; + while (*fname == ' ') fname++; + *sptr = '\0'; + } + + if ((sptr = strstr(command, "\\G")) != NULL) { + cptr = strstr(command, ";"); + if (cptr != NULL) { + *cptr = '\0'; + } + + *sptr = '\0'; + printMode = true; // When output to a file, the switch does not work. + } + + if (!shell.ws_conn && shell_conn_ws_server(0)) { + return; + } + + shell.stop_query = false; + st = taosGetTimestampUs(); + + WS_RES* res = ws_query_timeout(shell.ws_conn, command, shell.args.timeout); + int code = ws_errno(res); + if (code != 0) { + et = taosGetTimestampUs(); + fprintf(stderr, "\nDB: error: %s (%.6fs)\n", ws_errstr(res), (et - st)/1E6); + if (code == TSDB_CODE_WS_SEND_TIMEOUT || code == TSDB_CODE_WS_RECV_TIMEOUT) { + fprintf(stderr, "Hint: use -t to increase the timeout in seconds\n"); + } else if (code == TSDB_CODE_WS_INTERNAL_ERRO || code == TSDB_CODE_WS_CLOSED) { + fprintf(stderr, "TDengine server is down, will try to reconnect\n"); + shell.ws_conn = NULL; + } + ws_free_result(res); + return; + } + + if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { + fprintf(stdout, "Database changed.\r\n\r\n"); + fflush(stdout); + ws_free_result(res); + return; + } + + int numOfRows = 0; + if (ws_is_update_query(res)) { + numOfRows = ws_affected_rows(res); + et = taosGetTimestampUs(); + printf("Query Ok, %d of %d row(s) in database (%.6fs)\n", numOfRows, numOfRows, + (et - st)/1E6); + } else { + int error_no = 0; + numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode); + if (numOfRows < 0) { + ws_free_result(res); + return; + } + et = taosGetTimestampUs(); + if (error_no == 0 && !shell.stop_query) { + printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, + (et - st)/1E6); + } else { + printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows, + (et - st)/1E6); + } + } + printf("\n"); + ws_free_result(res); +} +#endif diff --git a/tools/taosws-rs b/tools/taosws-rs index c5fded266d3b10508e38bf3285bb7ecf798bc343..9de599dc5293e9c90bc00bc4a03f8b91ba756bc3 160000 --- a/tools/taosws-rs +++ b/tools/taosws-rs @@ -1 +1 @@ -Subproject commit c5fded266d3b10508e38bf3285bb7ecf798bc343 +Subproject commit 9de599dc5293e9c90bc00bc4a03f8b91ba756bc3