提交 1599c404 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

......@@ -37,7 +37,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId);
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId);
#ifdef __cplusplus
}
......
......@@ -1144,7 +1144,7 @@ static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj
}
col_id_t colId = pOld->pTags[tag].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
return -1;
}
......@@ -1179,7 +1179,7 @@ static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj *
}
col_id_t colId = pOld->pTags[tag].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
return -1;
}
......@@ -1213,7 +1213,7 @@ static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj
}
col_id_t colId = pOld->pTags[tag].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
return -1;
}
......@@ -1295,7 +1295,7 @@ static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStb
}
col_id_t colId = pOld->pColumns[col].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
return -1;
}
......@@ -1329,7 +1329,7 @@ static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbO
}
col_id_t colId = pOld->pColumns[col].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
return -1;
}
......
......@@ -72,13 +72,16 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
return strchr(topic, '.') + 1;
}
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) {
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SMqTopicObj *pTopic = NULL;
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break;
mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql);
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
sdbRelease(pSdb, pTopic);
continue;
......@@ -95,14 +98,20 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId
SNode *pNode = NULL;
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) goto NEXT;
mDebug("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId, pTopic->ctbStbUid);
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
goto NEXT;
}
if (pCol->colId > 0 && pCol->colId == colId) {
sdbRelease(pSdb, pTopic);
nodesDestroyNode(pAst);
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
return -1;
}
mTrace("topic:%s, colId:%d is used", pTopic->name, pCol->colId);
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
}
NEXT:
......
......@@ -67,8 +67,6 @@ struct STSmaStat {
struct SRSmaStat {
SSma *pSma;
int64_t commitAppliedVer; // vnode applied version for async commit
int64_t commitSubmitVer; // rsma submit version for async commit
int64_t submitVer; // latest submit version
int64_t refId; // shared by fetch tasks
int8_t triggerStat; // shared by fetch tasks
int8_t commitStat; // 0 not in committing, 1 in committing
......@@ -91,7 +89,6 @@ struct SSmaStat {
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
#define RSMA_COMMIT_STAT(r) (&(r)->commitStat)
#define RSMA_REF_ID(r) ((r)->refId)
#define RSMA_SUBMIT_VER(r) ((r)->submitVer)
struct SRSmaInfoItem {
void *taskInfo; // qTaskInfo_t
......@@ -223,13 +220,6 @@ struct STFInfo {
uint32_t ftype;
uint32_t fver;
int64_t fsize;
// specific fields
union {
struct {
int64_t submitVer;
} qTaskInfo;
};
};
enum {
......
......@@ -178,7 +178,6 @@ int32_t smaAsyncPostCommit(SSma* pSma);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level);
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
......
......@@ -146,7 +146,6 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
// step 3: perform persist task for qTaskInfo
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
pRSmaStat->commitSubmitVer = pRSmaStat->submitVer;
tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma));
......@@ -317,7 +316,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
// step 4: others
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
pRSmaStat->commitSubmitVer = pRSmaStat->submitVer;
return TSDB_CODE_SUCCESS;
}
......
......@@ -560,17 +560,6 @@ static void tdDestroySDataBlockArray(SArray *pArray) {
taosArrayDestroy(pArray);
}
int64_t tdRSmaGetMaxSubmitVer(SSma *pSma, int8_t level) {
if (level == TSDB_RETENTION_L0) {
return pSma->pVnode->state.applied;
}
SSmaEnv *pRSmaEnv = SMA_RSMA_ENV(pSma);
SRSmaStat *pRSmaStat = (SRSmaStat *)(SMA_ENV_STAT(pRSmaEnv));
return atomic_load_64(&pRSmaStat->submitVer);
}
static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat,
int8_t blkType) {
SArray *pResult = NULL;
......@@ -615,13 +604,16 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
goto _err;
}
if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pStat->submitVer, 1), pReq) < 0) {
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err;
}
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%"PRIi64, SMA_VID(pSma),
suid, pItem->level, output->info.version);
taosMemoryFreeClear(pReq);
taosArrayClear(pResult);
} else if (terrno == 0) {
......@@ -908,12 +900,8 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) {
goto _err;
}
ASSERT(tFileInfo.qTaskInfo.submitVer > 0);
SSmaEnv *pRSmaEnv = pSma->pRSmaEnv;
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv);
atomic_store_64(&pRSmaStat->submitVer, tFileInfo.qTaskInfo.submitVer);
smaDebug("%s:%d tFileInfo.qTaskInfo.submitVer = %" PRIi64, __func__, __LINE__, tFileInfo.qTaskInfo.submitVer);
SRSmaQTaskInfoIter fIter = {0};
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
......@@ -1266,7 +1254,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
}
if (isFileCreated) {
tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->commitSubmitVer);
if (tdUpdateTFileHeader(&tFile) < 0) {
smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile),
tstrerror(terrno));
......@@ -1346,6 +1333,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK);
tdUnRefRSmaInfo(pSma, pRSmaInfo);
// atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE);
// taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
} break;
case TASK_TRIGGER_STAT_PAUSED: {
smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused",
......
......@@ -32,9 +32,6 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) {
tlen += taosEncodeFixedU32(buf, pInfo->ftype);
tlen += taosEncodeFixedU32(buf, pInfo->fver);
tlen += taosEncodeFixedI64(buf, pInfo->fsize);
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
tlen += taosEncodeFixedI64(buf, pInfo->qTaskInfo.submitVer);
}
return tlen;
}
......@@ -44,10 +41,6 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) {
buf = taosDecodeFixedU32(buf, &(pInfo->ftype));
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
buf = taosDecodeFixedI64(buf, &(pInfo->fsize));
// specific
if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) {
buf = taosDecodeFixedI64(buf, &(pInfo->qTaskInfo.submitVer));
}
return buf;
}
......
......@@ -2265,10 +2265,6 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
if (VND_IS_RSMA(pVnode)) {
return (SVersionRange){.minVer = startVer, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
}
int64_t endVer = 0;
if (pCond->endVersion ==
-1) { // user not specified end version, set current maximum version of vnode as the endVersion
......
......@@ -3880,11 +3880,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
if (pCtx->end.key != INT64_MIN) {
pInfo->min = pCtx->end.key;
} else {
pInfo->min = ptsList[0];
pInfo->min = ptsList[start];
}
} else {
if (pCtx->start.key == INT64_MIN) {
pInfo->min = (pInfo->min > ptsList[0]) ? ptsList[0] : pInfo->min;
pInfo->min = (pInfo->min > ptsList[start]) ? ptsList[start] : pInfo->min;
} else {
pInfo->min = pCtx->start.key;
}
......
......@@ -787,6 +787,14 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return code;
}
static bool isPrimaryKeySort(SNodeList* pOrderByList) {
SNode* pExpr = ((SOrderByExprNode*)nodesListGetNode(pOrderByList, 0))->pExpr;
if (QUERY_NODE_COLUMN != nodeType(pExpr)) {
return false;
}
return PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId;
}
static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (NULL == pSelect->pOrderByList) {
return TSDB_CODE_SUCCESS;
......@@ -800,7 +808,9 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pSort->groupSort = pSelect->groupSort;
pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
pSort->node.resultDataOrder = pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL;
pSort->node.resultDataOrder = isPrimaryKeySort(pSelect->pOrderByList)
? (pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL)
: DATA_ORDER_LEVEL_NONE;
int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, COLLECT_COL_TYPE_ALL, &pSort->node.pTargets);
if (TSDB_CODE_SUCCESS == code && NULL == pSort->node.pTargets) {
......
......@@ -215,6 +215,14 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
}
static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) {
if (1 != LIST_LENGTH(pNode->pChildren)) {
return false;
}
SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
}
static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
......@@ -247,7 +255,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
case QUERY_NODE_LOGIC_PLAN_JOIN:
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
case QUERY_NODE_LOGIC_PLAN_PARTITION:
return stbSplHasMultiTbScan(streamQuery, pNode);
return stbSplIsMultiTbScanChild(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
......@@ -969,8 +977,28 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code;
}
static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) {
SNode* pPrimaryKey =
nodesCloneNode(stbSplFindPrimaryKeyFromScan((SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0)));
if (NULL == pPrimaryKey) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pMergeKeys);
}
return code;
}
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true);
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pMergeKeys = NULL;
if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) {
code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
......
......@@ -67,6 +67,8 @@ TEST_F(PlanPartitionByTest, withTimeLineFunc) {
useDb("root", "test");
run("SELECT TWA(c1) FROM st1 PARTITION BY c1");
run("SELECT MAVG(c1, 2) FROM st1 PARTITION BY c1");
}
TEST_F(PlanPartitionByTest, withSlimit) {
......
......@@ -3671,6 +3671,22 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
SNode *t = node->pLeft;
node->pLeft = node->pRight;
node->pRight = t;
switch (node->opType) {
case OP_TYPE_GREATER_THAN:
node->opType = OP_TYPE_LOWER_THAN;
break;
case OP_TYPE_LOWER_THAN:
node->opType = OP_TYPE_GREATER_THAN;
break;
case OP_TYPE_GREATER_EQUAL:
node->opType = OP_TYPE_LOWER_EQUAL;
break;
case OP_TYPE_LOWER_EQUAL:
node->opType = OP_TYPE_GREATER_EQUAL;
break;
default:
break;
}
}
if (OP_TYPE_IN == node->opType && QUERY_NODE_NODE_LIST != nodeType(node->pRight)) {
......
......@@ -396,6 +396,7 @@ typedef struct SDelayQueue {
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
void transDQCancel(SDelayQueue* queue, SDelayTask* task);
bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
/*
......
......@@ -27,7 +27,6 @@ typedef struct SCliConn {
SConnBuffer readBuf;
STransQueue cliMsgs;
queue q;
uint64_t expireTime;
STransCtx ctx;
bool broken; // link broken or not
......@@ -37,6 +36,7 @@ typedef struct SCliConn {
char* ip;
uint32_t port;
SDelayTask* task;
// debug and log info
struct sockaddr_in addr;
struct sockaddr_in localAddr;
......@@ -65,6 +65,7 @@ typedef struct SCliThrd {
queue msg;
TdThreadMutex msgMtx;
SDelayQueue* delayQueue;
SDelayQueue* timeoutQueue;
uint64_t nextTimeout; // next timeout
void* pTransInst; //
......@@ -92,9 +93,10 @@ static void* createConnPool(int size);
static void* destroyConnPool(void* pool);
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param);
// register timer in each thread to clear expire conn
static void cliTimeoutCb(uv_timer_t* handle);
// static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
// callback after read nbytes from socket
......@@ -184,7 +186,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
pThrd = (SCliThrd*)(exh)->pThrd; \
} \
} while (0)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
......@@ -384,10 +386,6 @@ void cliHandleResp(SCliConn* conn) {
}
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
// start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
}
void cliHandleExcept(SCliConn* pConn) {
......@@ -441,30 +439,30 @@ void cliHandleExcept(SCliConn* pConn) {
transUnrefCliHandle(pConn);
}
void cliTimeoutCb(uv_timer_t* handle) {
SCliThrd* pThrd = handle->data;
STrans* pTransInst = pThrd->pTransInst;
int64_t currentTime = pThrd->nextTimeout;
tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
while (p != NULL) {
while (!QUEUE_IS_EMPTY(&p->conn)) {
queue* h = QUEUE_HEAD(&p->conn);
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
if (c->expireTime < currentTime) {
QUEUE_REMOVE(h);
transUnrefCliHandle(c);
} else {
break;
}
}
p = taosHashIterate((SHashObj*)pThrd->pool, p);
}
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
}
// void cliTimeoutCb(uv_timer_t* handle) {
// SCliThrd* pThrd = handle->data;
// STrans* pTransInst = pThrd->pTransInst;
// int64_t currentTime = pThrd->nextTimeout;
// tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
//
// SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
// while (p != NULL) {
// while (!QUEUE_IS_EMPTY(&p->conn)) {
// queue* h = QUEUE_HEAD(&p->conn);
// SCliConn* c = QUEUE_DATA(h, SCliConn, q);
// if (c->expireTime < currentTime) {
// QUEUE_REMOVE(h);
// transUnrefCliHandle(c);
// } else {
// break;
// }
// }
// p = taosHashIterate((SHashObj*)pThrd->pool, p);
// }
//
// pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
// uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
// }
void* createConnPool(int size) {
// thread local, no lock
......@@ -506,6 +504,10 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
assert(h == &conn->q);
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
return conn;
}
static int32_t allocConnRef(SCliConn* conn, bool update) {
......@@ -537,6 +539,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
transReleaseExHandle(transGetRefMgt(), handle);
return 0;
}
static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
......@@ -547,7 +550,6 @@ static void addConnToPool(void* pool, SCliConn* conn) {
allocConnRef(conn, true);
STrans* pTransInst = thrd->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
transCtxCleanup(&conn->ctx);
......@@ -562,7 +564,13 @@ static void addConnToPool(void* pool, SCliConn* conn) {
assert(plist != NULL);
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&plist->conn, &conn->q);
assert(!QUEUE_IS_EMPTY(&plist->conn));
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
}
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
......@@ -631,6 +639,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
uv_read_stop(conn->stream);
......@@ -997,6 +1007,8 @@ static SCliThrd* createThrdObj() {
pThrd->pool = createConnPool(4);
transDQCreate(pThrd->loop, &pThrd->delayQueue);
transDQCreate(pThrd->loop, &pThrd->timeoutQueue);
pThrd->quit = false;
return pThrd;
}
......@@ -1012,6 +1024,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transAsyncPoolDestroy(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue, destroyCmsg);
transDQDestroy(pThrd->timeoutQueue, NULL);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
}
......@@ -1058,6 +1071,10 @@ static void doCloseIdleConn(void* param) {
STaskArg* arg = param;
SCliConn* conn = arg->param1;
SCliThrd* pThrd = arg->param2;
tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn);
conn->task = NULL;
cliDestroyConn(conn, true);
taosMemoryFree(arg);
}
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
......@@ -1248,11 +1265,17 @@ int transReleaseCliHandle(void* handle) {
if (pThrd == NULL) {
return -1;
}
STransMsg tmsg = {.info.handle = handle};
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg;
cmsg->type = Release;
STraceId* trace = &tmsg.info.traceId;
tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid);
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
return -1;
}
......
......@@ -480,7 +480,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
SDelayTask* task = container_of(minNode, SDelayTask, node);
STaskArg* arg = task->arg;
freeFunc(arg->param1);
if (freeFunc) freeFunc(arg->param1);
taosMemoryFree(arg);
taosMemoryFree(task);
......@@ -491,9 +491,16 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
void transDQCancel(SDelayQueue* queue, SDelayTask* task) {
uv_timer_stop(queue->timer);
if (heapSize(queue->heap) <= 0) return;
if (heapSize(queue->heap) <= 0) {
taosMemoryFree(task->arg);
taosMemoryFree(task);
return;
}
heapRemove(queue->heap, &task->node);
taosMemoryFree(task->arg);
taosMemoryFree(task);
if (heapSize(queue->heap) != 0) {
HeapNode* minNode = heapMin(queue->heap);
if (minNode != NULL) return;
......
......@@ -149,32 +149,34 @@ static void* transAcceptThread(void* arg);
static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("conn %p received release request", conn); \
\
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
reallocConnRef(conn); \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
if (conn->regArg.init) { \
tTrace("conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
reallocConnRef(conn); \
tTrace("conn %p received release request", conn); \
\
STraceId traceId = head->traceId; \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
\
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = NULL}; \
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
if (conn->regArg.init) { \
tTrace("conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
#define SRV_RELEASE_UV(loop) \
......
......@@ -62,7 +62,7 @@ python3 ./test.py -f 2-query/char_length.py -R
python3 ./test.py -f 2-query/check_tsdb.py
python3 ./test.py -f 2-query/check_tsdb.py -R
# python3 ./test.py -f 1-insert/update_data.py
python3 ./test.py -f 1-insert/update_data.py
python3 ./test.py -f 1-insert/delete_data.py
python3 ./test.py -f 2-query/db.py
......@@ -222,7 +222,7 @@ python3 ./test.py -f 7-tmq/tmqDropStbCtb.py
python3 ./test.py -f 7-tmq/tmqDropNtb.py
python3 ./test.py -f 7-tmq/tmqUdf.py
# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
# python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
......
......@@ -26,6 +26,10 @@
#include "ttypes.h"
#include "tutil.h"
#ifdef WEBSOCKET
#include "taosws.h"
#endif
#define SHELL_MAX_HISTORY_SIZE 1000
#define SHELL_MAX_COMMAND_SIZE 1048586
#define SHELL_HISTORY_FILE ".taos_history"
......@@ -67,6 +71,12 @@ typedef struct {
int32_t pktNum;
int32_t displayWidth;
int32_t abort;
#ifdef WEBSOCKET
bool restful;
bool cloud;
char* dsn;
int32_t timeout;
#endif
} SShellArgs;
typedef struct {
......@@ -85,6 +95,10 @@ typedef struct {
TAOS* conn;
TdThread pid;
tsem_t cancelSem;
#ifdef WEBSOCKET
WS_TAOS* ws_conn;
bool stop_query;
#endif
} SShellObj;
// shellArguments.c
......@@ -95,7 +109,10 @@ int32_t shellReadCommand(char* command);
// shellEngine.c
int32_t shellExecute();
int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision);
void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields);
void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision);
void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision);
// shellUtil.c
int32_t shellCheckIntSize();
void shellPrintVersion();
......@@ -109,6 +126,14 @@ void shellExit();
// shellNettest.c
void shellTestNetWork();
#ifdef WEBSOCKET
void shellCheckConnectMode();
// shellWebsocket.c
int shell_conn_ws_server(bool first);
int32_t shell_run_websocket();
void shellRunSingleCommandWebsocketImp(char *command);
#endif
// shellMain.c
extern SShellObj shell;
......
......@@ -43,6 +43,12 @@
#define SHELL_VERSION "Print program version."
#define SHELL_EMAIL "<support@taosdata.com>"
#ifdef WEBSOCKET
#define SHELL_DSN "The dsn to use when connecting to cloud server."
#define SHELL_REST "Use restful mode when connecting."
#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 10."
#endif
static int32_t shellParseSingleOpt(int32_t key, char *arg);
void shellPrintHelp() {
......@@ -65,6 +71,11 @@ void shellPrintHelp() {
printf("%s%s%s%s\r\n", indent, "-s,", indent, SHELL_CMD);
printf("%s%s%s%s\r\n", indent, "-t,", indent, SHELL_STARTUP);
printf("%s%s%s%s\r\n", indent, "-u,", indent, SHELL_USER);
#ifdef WEBSOCKET
printf("%s%s%s%s\r\n", indent, "-E,", indent, SHELL_DSN);
printf("%s%s%s%s\r\n", indent, "-R,", indent, SHELL_REST);
printf("%s%s%s%s\r\n", indent, "-T,", indent, SHELL_TIMEOUT);
#endif
printf("%s%s%s%s\r\n", indent, "-w,", indent, SHELL_WIDTH);
printf("%s%s%s%s\r\n", indent, "-V,", indent, SHELL_VERSION);
printf("\r\n\r\nReport bugs to %s.\r\n", SHELL_EMAIL);
......@@ -95,6 +106,11 @@ static struct argp_option shellOptions[] = {
{"display-width", 'w', "WIDTH", 0, SHELL_WIDTH},
{"netrole", 'n', "NETROLE", 0, SHELL_NET_ROLE},
{"pktlen", 'l', "PKTLEN", 0, SHELL_PKG_LEN},
#ifdef WEBSOCKET
{"dsn", 'E', "DSN", 0, SHELL_DSN},
{"restful", 'R', 0, 0, SHELL_REST},
{"timeout", 'T', "SECONDS", 0, SHELL_TIMEOUT},
#endif
{"pktnum", 'N', "PKTNUM", 0, SHELL_PKT_NUM},
{0},
};
......@@ -120,9 +136,15 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
switch (key) {
case 'h':
pArgs->host = arg;
#ifdef WEBSOCKET
pArgs->cloud = false;
#endif
break;
case 'P':
pArgs->port = atoi(arg);
#ifdef WEBSOCKET
pArgs->cloud = false;
#endif
if (pArgs->port == 0) pArgs->port = -1;
break;
case 'u':
......@@ -137,6 +159,9 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
pArgs->is_gen_auth = true;
break;
case 'c':
#ifdef WEBSOCKET
pArgs->cloud = false;
#endif
pArgs->cfgdir = arg;
break;
case 'C':
......@@ -172,6 +197,18 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) {
case 'N':
pArgs->pktNum = atoi(arg);
break;
#ifdef WEBSOCKET
case 'R':
pArgs->restful = true;
break;
case 'E':
pArgs->dsn = arg;
pArgs->cloud = true;
break;
case 'T':
pArgs->timeout = atoi(arg);
break;
#endif
case 'V':
pArgs->is_version = true;
break;
......@@ -208,7 +245,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
}
if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' || key[1] == 'a' || key[1] == 'c' || key[1] == 's' ||
key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N') {
key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N'
#ifdef WEBSOCKET
|| key[1] == 'E' || key[1] == 'T'
#endif
) {
if (i + 1 >= argc) {
fprintf(stderr, "option %s requires an argument\r\n", key);
return -1;
......@@ -221,7 +262,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
shellParseSingleOpt(key[1], val);
i++;
} else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' || key[1] == 'r' || key[1] == 'k' ||
key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1) {
key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1
#ifdef WEBSOCKET
||key[1] == 'R'
#endif
) {
shellParseSingleOpt(key[1], NULL);
} else {
fprintf(stderr, "invalid option %s\r\n", key);
......
......@@ -25,14 +25,9 @@ static int32_t shellRunSingleCommand(char *command);
static int32_t shellRunCommand(char *command);
static void shellRunSingleCommandImp(char *command);
static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision);
static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length,
int32_t precision);
static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres);
static void shellPrintNChar(const char *str, int32_t length, int32_t width);
static void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision);
static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql);
static int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision);
static void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields);
static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql);
static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql);
static void shellReadHistory();
......@@ -94,8 +89,15 @@ int32_t shellRunSingleCommand(char *command) {
shellSourceFile(c_ptr);
return 0;
}
shellRunSingleCommandImp(command);
#ifdef WEBSOCKET
if (shell.args.restful || shell.args.cloud) {
shellRunSingleCommandWebsocketImp(command);
} else {
#endif
shellRunSingleCommandImp(command);
#ifdef WEBSOCKET
}
#endif
return 0;
}
......@@ -937,7 +939,16 @@ void *shellCancelHandler(void *arg) {
taosMsleep(10);
continue;
}
taos_kill_query(shell.conn);
#ifdef WEBSOCKET
if (shell.args.restful || shell.args.cloud) {
shell.stop_query = true;
} else {
#endif
taos_kill_query(shell.conn);
#ifdef WEBSOCKET
}
#endif
#ifdef WINDOWS
printf("\n%s", shell.info.promptHeader);
#endif
......@@ -981,16 +992,26 @@ int32_t shellExecute() {
fflush(stdout);
SShellArgs *pArgs = &shell.args;
if (shell.args.auth == NULL) {
shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port);
#ifdef WEBSOCKET
if (shell.args.restful || shell.args.cloud) {
if (shell_conn_ws_server(1)) {
return -1;
}
} else {
shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port);
}
if (shell.conn == NULL) {
fflush(stdout);
return -1;
#endif
if (shell.args.auth == NULL) {
shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port);
} else {
shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port);
}
if (shell.conn == NULL) {
fflush(stdout);
return -1;
}
#ifdef WEBSOCKET
}
#endif
shellReadHistory();
......@@ -1005,8 +1026,16 @@ int32_t shellExecute() {
if (pArgs->file[0] != 0) {
shellSourceFile(pArgs->file);
}
#ifdef WEBSOCKET
if (shell.args.restful || shell.args.cloud) {
ws_close(shell.ws_conn);
} else {
#endif
taos_close(shell.conn);
#ifdef WEBSOCKET
}
#endif
taos_close(shell.conn);
shellWriteHistory();
shellCleanupHistory();
return 0;
......@@ -1026,10 +1055,15 @@ int32_t shellExecute() {
taosSetSignal(SIGINT, shellQueryInterruptHandler);
shellGetGrantInfo();
#ifdef WEBSOCKET
if (!shell.args.restful && !shell.args.cloud) {
#endif
shellGetGrantInfo();
#ifdef WEBSOCKET
}
#endif
while (1) {
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn);
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, NULL);
taosThreadJoin(shell.pid, NULL);
taosThreadClear(&shell.pid);
}
......
......@@ -19,6 +19,11 @@
SShellObj shell = {0};
int main(int argc, char *argv[]) {
#ifdef WEBSOCKET
shell.args.timeout = 10;
shell.args.cloud = true;
#endif
if (shellCheckIntSize() != 0) {
return -1;
}
......@@ -41,7 +46,9 @@ int main(int argc, char *argv[]) {
shellPrintHelp();
return 0;
}
#ifdef WEBSOCKET
shellCheckConnectMode();
#endif
taos_init();
if (shell.args.is_dump_config) {
......
......@@ -121,6 +121,36 @@ void shellCheckServerStatus() {
}
} while (1);
}
#ifdef WEBSOCKET
void shellCheckConnectMode() {
if (shell.args.dsn) {
shell.args.cloud = true;
shell.args.restful = false;
return;
}
if (shell.args.cloud) {
shell.args.dsn = getenv("TDENGINE_CLOUD_DSN");
if (shell.args.dsn) {
shell.args.cloud = true;
shell.args.restful = false;
return;
}
if (shell.args.restful) {
if (!shell.args.host) {
shell.args.host = "localhost";
}
if (!shell.args.port) {
shell.args.port = 6041;
}
shell.args.dsn = taosMemoryCalloc(1, 1024);
snprintf(shell.args.dsn, 1024, "ws://%s:%d/rest/ws",
shell.args.host, shell.args.port);
}
shell.args.cloud = false;
return;
}
}
#endif
void shellExit() {
if (shell.conn != NULL) {
......@@ -129,4 +159,4 @@ void shellExit() {
}
taos_cleanup();
exit(EXIT_FAILURE);
}
\ No newline at end of file
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef WEBSOCKET
#include "taosws.h"
#include "shellInt.h"
int shell_conn_ws_server(bool first) {
shell.ws_conn = ws_connect_with_dsn(shell.args.dsn);
if (!shell.ws_conn) {
fprintf(stderr, "failed to connect %s, reason: %s\n",
shell.args.dsn, ws_errstr(NULL));
return -1;
}
if (first && shell.args.restful) {
fprintf(stdout, "successfully connect to %s\n\n",
shell.args.dsn);
} else if (first && shell.args.cloud) {
fprintf(stdout, "successfully connect to cloud service\n");
}
return 0;
}
static int horizontalPrintWebsocket(WS_RES* wres) {
const void* data = NULL;
int rows;
ws_fetch_block(wres, &data, &rows);
if (!rows) {
return 0;
}
int num_fields = ws_field_count(wres);
TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
int precision = ws_result_precision(wres);
int width[TSDB_MAX_COLUMNS];
for (int col = 0; col < num_fields; col++) {
width[col] = shellCalcColWidth(fields + col, precision);
}
shellPrintHeader(fields, width, num_fields);
int numOfRows = 0;
do {
uint8_t ty;
uint32_t len;
for (int i = 0; i < rows; i++) {
for (int j = 0; j < num_fields; j++) {
putchar(' ');
const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
shellPrintField((const char*)value, fields+j, width[j], len, precision);
putchar(' ');
putchar('|');
}
putchar('\r');
putchar('\n');
}
numOfRows += rows;
ws_fetch_block(wres, &data, &rows);
} while (rows && !shell.stop_query);
return numOfRows;
}
static int verticalPrintWebsocket(WS_RES* wres) {
int rows = 0;
const void* data = NULL;
ws_fetch_block(wres, &data, &rows);
if (!rows) {
return 0;
}
int num_fields = ws_field_count(wres);
TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
int precision = ws_result_precision(wres);
int maxColNameLen = 0;
for (int col = 0; col < num_fields; col++) {
int len = (int)strlen(fields[col].name);
if (len > maxColNameLen) {
maxColNameLen = len;
}
}
int numOfRows = 0;
do {
uint8_t ty;
uint32_t len;
for (int i = 0; i < rows; i++) {
printf("*************************** %d.row ***************************\n",
numOfRows + 1);
for (int j = 0; j < num_fields; j++) {
TAOS_FIELD* field = fields + j;
int padding = (int)(maxColNameLen - strlen(field->name));
printf("%*.s%s: ", padding, " ", field->name);
const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
shellPrintField((const char*)value, field, 0, len, precision);
putchar('\n');
}
numOfRows++;
}
ws_fetch_block(wres, &data, &rows);
} while (rows && !shell.stop_query);
return numOfRows;
}
static int dumpWebsocketToFile(const char* fname, WS_RES* wres) {
char fullname[PATH_MAX] = {0};
if (taosExpandDir(fname, fullname, PATH_MAX) != 0) {
tstrncpy(fullname, fname, PATH_MAX);
}
TdFilePtr pFile = taosOpenFile(fullname,
TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (pFile == NULL) {
fprintf(stderr, "failed to open file: %s\r\n", fullname);
return -1;
}
int rows = 0;
const void* data = NULL;
ws_fetch_block(wres, &data, &rows);
if (!rows) {
taosCloseFile(&pFile);
return 0;
}
int numOfRows = 0;
TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres);
int num_fields = ws_field_count(wres);
int precision = ws_result_precision(wres);
for (int col = 0; col < num_fields; col++) {
if (col > 0) {
taosFprintfFile(pFile, ",");
}
taosFprintfFile(pFile, "%s", fields[col].name);
}
taosFprintfFile(pFile, "\r\n");
do {
uint8_t ty;
uint32_t len;
numOfRows += rows;
for (int i = 0; i < rows; i++) {
for (int j = 0; j < num_fields; j++) {
if (j > 0) {
taosFprintfFile(pFile, ",");
}
const void *value = ws_get_value_in_block(wres, i, j, &ty, &len);
shellDumpFieldToFile(pFile, (const char*)value, fields + j, len, precision);
}
taosFprintfFile(pFile, "\r\n");
}
ws_fetch_block(wres, &data, &rows);
} while (rows && !shell.stop_query);
taosCloseFile(&pFile);
return numOfRows;
}
static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical) {
int numOfRows = 0;
if (fname != NULL) {
numOfRows = dumpWebsocketToFile(fname, wres);
} else if (vertical) {
numOfRows = verticalPrintWebsocket(wres);
} else {
numOfRows = horizontalPrintWebsocket(wres);
}
*error_no = ws_errno(wres);
return numOfRows;
}
void shellRunSingleCommandWebsocketImp(char *command) {
int64_t st, et;
char *sptr = NULL;
char *cptr = NULL;
char *fname = NULL;
bool printMode = false;
if ((sptr = strstr(command, ">>")) != NULL) {
cptr = strstr(command, ";");
if (cptr != NULL) {
*cptr = '\0';
}
fname = sptr + 2;
while (*fname == ' ') fname++;
*sptr = '\0';
}
if ((sptr = strstr(command, "\\G")) != NULL) {
cptr = strstr(command, ";");
if (cptr != NULL) {
*cptr = '\0';
}
*sptr = '\0';
printMode = true; // When output to a file, the switch does not work.
}
if (!shell.ws_conn && shell_conn_ws_server(0)) {
return;
}
shell.stop_query = false;
st = taosGetTimestampUs();
WS_RES* res = ws_query_timeout(shell.ws_conn, command, shell.args.timeout);
int code = ws_errno(res);
if (code != 0) {
et = taosGetTimestampUs();
fprintf(stderr, "\nDB: error: %s (%.6fs)\n", ws_errstr(res), (et - st)/1E6);
if (code == TSDB_CODE_WS_SEND_TIMEOUT || code == TSDB_CODE_WS_RECV_TIMEOUT) {
fprintf(stderr, "Hint: use -t to increase the timeout in seconds\n");
} else if (code == TSDB_CODE_WS_INTERNAL_ERRO || code == TSDB_CODE_WS_CLOSED) {
fprintf(stderr, "TDengine server is down, will try to reconnect\n");
shell.ws_conn = NULL;
}
ws_free_result(res);
return;
}
if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
fprintf(stdout, "Database changed.\r\n\r\n");
fflush(stdout);
ws_free_result(res);
return;
}
int numOfRows = 0;
if (ws_is_update_query(res)) {
numOfRows = ws_affected_rows(res);
et = taosGetTimestampUs();
printf("Query Ok, %d of %d row(s) in database (%.6fs)\n", numOfRows, numOfRows,
(et - st)/1E6);
} else {
int error_no = 0;
numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode);
if (numOfRows < 0) {
ws_free_result(res);
return;
}
et = taosGetTimestampUs();
if (error_no == 0 && !shell.stop_query) {
printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows,
(et - st)/1E6);
} else {
printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows,
(et - st)/1E6);
}
}
printf("\n");
ws_free_result(res);
}
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册