提交 c9ce01c8 编写于 作者: H Haojun Liao

Merge branch 'main' into feature/3_liaohj

......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG e04f39b
GIT_TAG 22627d7
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -1772,6 +1772,7 @@ typedef struct {
SArray* pTags; // array of SField
// 3.0.20
int64_t checkpointFreq; // ms
int64_t deleteMark;
int8_t igUpdate;
} SCMCreateStreamReq;
......
......@@ -110,6 +110,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
#if 0
char* streamStateSessionDump(SStreamState* pState);
char* streamStateIntervalDump(SStreamState* pState);
#endif
#ifdef __cplusplus
......
......@@ -5425,6 +5425,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1;
tEndEncode(&encoder);
......@@ -5487,6 +5488,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
}
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1;
tEndDecode(&decoder);
......
......@@ -297,6 +297,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory;
pObj->deleteMark = pCreate->deleteMark;
pObj->igCheckUpdate = pCreate->igUpdate;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
......@@ -346,6 +347,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark,
.igExpired = pObj->igExpired,
.deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate,
};
......
......@@ -101,7 +101,8 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
}
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
ASSERT(0);
taosMemoryFree(fname);
return NULL;
}
taosMemoryFree(fname);
return pStore;
......
......@@ -99,6 +99,38 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
return TSDB_CODE_SUCCESS;
}
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
int32_t numOfTables = p->numOfTables;
if (suid != 0) {
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
if (p->pSchema == NULL) {
taosMemoryFree(p);
tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
} else {
for (int32_t i = 0; i < numOfTables; ++i) {
uint64_t uid = p->pTableList[i].uid;
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
if (p->pSchema != NULL) {
break;
}
tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
}
// all queried tables have been dropped already, return immediately.
if (p->pSchema == NULL) {
taosMemoryFree(p);
tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL;
......@@ -117,11 +149,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
return TSDB_CODE_SUCCESS;
}
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0];
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
p->pTableList = pTableIdList;
p->numOfTables = numOfTables;
int32_t code = setTableSchema(p, suid, idstr);
if (code != TSDB_CODE_SUCCESS) {
tsdbCacherowsReaderClose(p);
return code;
}
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
if (p->transferBuf == NULL) {
tsdbCacherowsReaderClose(p);
......
......@@ -21,12 +21,34 @@
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeCommitImpl(SCommitInfo *pInfo);
#define WAIT_TIME_MILI_SEC 50
int vnodeBegin(SVnode *pVnode) {
// alloc buffer pool
int32_t nTry = 0;
taosThreadMutexLock(&pVnode->mutex);
while (pVnode->pPool == NULL) {
taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex);
vInfo("vgId:%d no free buffer pool on %d try, wait %d ms...", TD_VID(pVnode), ++nTry, WAIT_TIME_MILI_SEC);
struct timeval tv;
struct timespec ts;
taosGetTimeOfDay(&tv);
ts.tv_nsec = tv.tv_usec * 1000 + WAIT_TIME_MILI_SEC * 1000000;
if (ts.tv_nsec > 999999999l) {
ts.tv_sec = tv.tv_sec + 1;
ts.tv_nsec -= 1000000000l;
} else {
ts.tv_sec = tv.tv_sec;
}
int32_t rc = taosThreadCondTimedWait(&pVnode->poolNotEmpty, &pVnode->mutex, &ts);
if (rc && rc != ETIMEDOUT) {
terrno = TAOS_SYSTEM_ERROR(rc);
taosThreadMutexUnlock(&pVnode->mutex);
return -1;
}
}
pVnode->inUse = pVnode->pPool;
......@@ -70,7 +92,7 @@ int vnodeShouldCommit(SVnode *pVnode) {
}
SVCommitSched *pSched = &pVnode->commitSched;
int64_t nowMs = taosGetMonoTimestampMs();
int64_t nowMs = taosGetMonoTimestampMs();
return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
......
......@@ -312,10 +312,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
walApplyVer(pVnode->pWal, version);
/*vInfo("vgId:%d, push msg begin", pVnode->config.vgId);*/
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
/*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
/*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
// commit if need
if (needCommit) {
......@@ -1019,7 +1022,7 @@ _exit:
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, statis.nBatchInsert);
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, statis.nBatchInsertSuccess);
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
vDebug("vgId:%d %s done, index:%" PRId64, TD_VID(pVnode), __func__, version);
return 0;
}
......
......@@ -149,6 +149,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
// check if it is a group by tbname
if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) {
blockDataCleanup(pInfo->pBufferredRes);
taosArrayClear(pInfo->pUidList);
......@@ -207,6 +211,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
size_t totalGroups = tableListGetOutputGroups(pTableList);
while (pInfo->currentGroupIndex < totalGroups) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
STableKeyInfo* pList = NULL;
int32_t num = 0;
......@@ -215,8 +223,15 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1;
taosArrayClear(pInfo->pUidList);
continue;
}
taosArrayClear(pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
......
......@@ -2752,6 +2752,10 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) {
if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
pOperator);
if (pBlock != NULL) {
......
......@@ -119,8 +119,8 @@ static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsL
pRowSup->groupId = groupId;
}
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey,
int32_t pos, int32_t order, int64_t* pData) {
FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
int32_t order, int64_t* pData) {
int32_t forwardRows = 0;
if (order == TSDB_ORDER_ASC) {
......@@ -639,7 +639,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
if (NULL == pr) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
if (pr->closed) {
......@@ -1318,11 +1318,11 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
}
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
if (NULL == pResult) {
return;
}
SqlFunctionCtx* pCtx = pSup->pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
......@@ -4816,6 +4816,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
taosHashCleanup(pUpdatedMap);
#if 0
char* pBuf = streamStateIntervalDump(pInfo->pState);
qDebug("===stream===interval state%s", pBuf);
taosMemoryFree(pBuf);
#endif
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single interval delete");
......
......@@ -374,7 +374,7 @@ static void handleInt8Col(const void* data, int32_t start, int32_t numOfRows, SM
pBuf->v = i8VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
pBuf->v = ((int8_t*)data)[0];
pBuf->v = ((int8_t*)data)[start];
}
if (signVal) {
......@@ -408,7 +408,7 @@ static void handleInt16Col(const void* data, int32_t start, int32_t numOfRows, S
pBuf->v = i16VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
pBuf->v = ((int16_t*)data)[0];
pBuf->v = ((int16_t*)data)[start];
}
if (signVal) {
......@@ -442,7 +442,7 @@ static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, S
pBuf->v = i32VectorCmpAVX2(data, numOfRows, isMinFunc, signVal);
} else {
if (!pBuf->assign) {
pBuf->v = ((int32_t*)data)[0];
pBuf->v = ((int32_t*)data)[start];
}
if (signVal) {
......@@ -472,7 +472,7 @@ static void handleInt32Col(const void* data, int32_t start, int32_t numOfRows, S
static void handleInt64Col(const void* data, int32_t start, int32_t numOfRows, SMinmaxResInfo* pBuf, bool isMinFunc,
bool signVal) {
if (!pBuf->assign) {
pBuf->v = ((int64_t*)data)[0];
pBuf->v = ((int64_t*)data)[start];
}
if (signVal) {
......@@ -506,7 +506,7 @@ static void handleFloatCol(SColumnInfoData* pCol, int32_t start, int32_t numOfRo
*val = floatVectorCmpAVX(pData, numOfRows, isMinFunc);
} else {
if (!pBuf->assign) {
*val = pData[0];
*val = pData[start];
}
if (isMinFunc) { // min
......@@ -537,7 +537,7 @@ static void handleDoubleCol(SColumnInfoData* pCol, int32_t start, int32_t numOfR
*val = (double)doubleVectorCmpAVX(pData, numOfRows, isMinFunc);
} else {
if (!pBuf->assign) {
*val = pData[0];
*val = pData[start];
}
if (isMinFunc) { // min
......
......@@ -544,6 +544,7 @@ stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C).
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; }
subtable_opt(A) ::= . { A = NULL; }
......
......@@ -666,6 +666,9 @@ static uint8_t getPrecisionFromCurrStmt(SNode* pCurrStmt, uint8_t defaultVal) {
if (isSetOperator(pCurrStmt)) {
return ((SSetOperator*)pCurrStmt)->precision;
}
if (NULL != pCurrStmt && QUERY_NODE_CREATE_STREAM_STMT == nodeType(pCurrStmt)) {
return getPrecisionFromCurrStmt(((SCreateStreamStmt*)pCurrStmt)->pQuery, defaultVal);
}
return defaultVal;
}
......@@ -5512,16 +5515,6 @@ static bool crossTableWithUdaf(SSelectStmt* pSelect) {
}
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
if (NULL != pStmt->pOptions->pWatermark &&
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) {
return pCxt->errCode;
}
if (NULL != pStmt->pOptions->pDelay &&
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pDelay))) {
return pCxt->errCode;
}
if (NULL == pStmt->pQuery) {
return TSDB_CODE_SUCCESS;
}
......@@ -5714,6 +5707,17 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
return code;
}
static int32_t translateStreamOptions(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
pCxt->pCurrStmt = (SNode*)pStmt;
SStreamOptions* pOptions = pStmt->pOptions;
if ((NULL != pOptions->pWatermark && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pOptions->pWatermark))) ||
(NULL != pOptions->pDeleteMark && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pOptions->pDeleteMark))) ||
(NULL != pOptions->pDelay && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pOptions->pDelay)))) {
return pCxt->errCode;
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pReq->igExists = pStmt->ignoreExists;
......@@ -5735,10 +5739,16 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
}
}
if (TSDB_CODE_SUCCESS == code) {
code = translateStreamOptions(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
pReq->triggerType = pStmt->pOptions->triggerType;
pReq->maxDelay = (NULL != pStmt->pOptions->pDelay ? ((SValueNode*)pStmt->pOptions->pDelay)->datum.i : 0);
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
pReq->deleteMark =
(NULL != pStmt->pOptions->pDeleteMark ? ((SValueNode*)pStmt->pOptions->pDeleteMark)->datum.i : 0);
pReq->fillHistory = pStmt->pOptions->fillHistory;
pReq->igExpired = pStmt->pOptions->ignoreExpired;
pReq->igUpdate = pStmt->pOptions->ignoreUpdate;
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -258,7 +258,6 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) {
int32_t streamLoadTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
ASSERT(0);
return -1;
}
......
......@@ -107,8 +107,6 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
}
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
szPage = szPage < 0 ? 4096 : szPage;
pages = pages < 0 ? 256 : pages;
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -128,6 +126,28 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024);
}
char cfgPath[1030];
sprintf(cfgPath, "%s/cfg", statePath);
char cfg[1024];
memset(cfg, 0, 1024);
TdFilePtr pCfgFile = taosOpenFile(cfgPath, TD_FILE_READ);
if (pCfgFile != NULL) {
int64_t size;
taosFStatFile(pCfgFile, &size, NULL);
taosReadFile(pCfgFile, cfg, size);
sscanf(cfg, "%d\n%d\n", &szPage, &pages);
} else {
taosMulModeMkDir(statePath, 0755);
pCfgFile = taosOpenFile(cfgPath, TD_FILE_WRITE | TD_FILE_CREATE);
szPage = szPage < 0 ? 4096 : szPage;
pages = pages < 0 ? 256 : pages;
sprintf(cfg, "%d\n%d\n", szPage, pages);
taosWriteFile(pCfgFile, cfg, strlen(cfg));
}
taosCloseFile(&pCfgFile);
if (tdbOpen(statePath, szPage, pages, &pState->pTdbState->db, 1) < 0) {
goto _err;
}
......@@ -879,4 +899,47 @@ char* streamStateSessionDump(SStreamState* pState) {
streamStateFreeCur(pCur);
return dumpBuf;
}
char* streamStateIntervalDump(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
if (tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL) < 0) {
streamStateFreeCur(pCur);
return NULL;
}
tdbTbcMoveToFirst(pCur->pCur);
SWinKey key = {0};
void* buf = NULL;
int32_t bufSize = 0;
int32_t code = streamStateGetKVByCur(pCur, &key, (const void **)&buf, &bufSize);
if (code != 0) {
streamStateFreeCur(pCur);
return NULL;
}
int32_t size = 2048;
char* dumpBuf = taosMemoryCalloc(size, 1);
int64_t len = 0;
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
// len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
while (1) {
tdbTbcMoveToNext(pCur->pCur);
key = (SWinKey){0};
code = streamStateGetKVByCur(pCur, &key, NULL, 0);
if (code != 0) {
streamStateFreeCur(pCur);
return dumpBuf;
}
len += snprintf(dumpBuf + len, size - len, "||s:%15" PRId64 ",", key.ts);
// len += snprintf(dumpBuf + len, size - len, "e:%15" PRId64 ",", key.win.ekey);
len += snprintf(dumpBuf + len, size - len, "g:%15" PRId64 "||", key.groupId);
}
streamStateFreeCur(pCur);
return dumpBuf;
}
#endif
......@@ -549,6 +549,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/mavg.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/min.py
......@@ -834,6 +835,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 2
......@@ -930,6 +932,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3
......@@ -1027,6 +1030,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 4
......
此差异已折叠。
......@@ -32,6 +32,8 @@ void shellShowOnScreen(SShellCmd* cmd);
void shellInsertChar(SShellCmd* cmd, char* c, int size);
void shellInsertStr(SShellCmd* cmd, char* str, int size);
bool appendAfterSelect(TAOS* con, SShellCmd* cmd, char* p, int32_t len);
char* tireSearchWord(int type, char* pre);
bool updateTireValue(int type, bool autoFill) ;
typedef struct SAutoPtr {
STire* p;
......@@ -60,23 +62,22 @@ SWords shellCommands[] = {
{"alter database <db_name> <alter_db_options> <anyword> <alter_db_options> <anyword> <alter_db_options> <anyword> "
"<alter_db_options> <anyword> <alter_db_options> <anyword> ;",
0, 0, NULL},
{"alter dnode <dnode_id> balance ", 0, 0, NULL},
{"alter dnode <dnode_id> resetlog;", 0, 0, NULL},
{"alter dnode <dnode_id> debugFlag 141;", 0, 0, NULL},
{"alter dnode <dnode_id> monitor 1;", 0, 0, NULL},
{"alter all dnodes monitor ", 0, 0, NULL},
{"alter alldnodes balance ", 0, 0, NULL},
{"alter alldnodes resetlog;", 0, 0, NULL},
{"alter alldnodes debugFlag 141;", 0, 0, NULL},
{"alter alldnodes monitor 1;", 0, 0, NULL},
{"alter dnode <dnode_id> \"resetlog\";", 0, 0, NULL},
{"alter dnode <dnode_id> \"debugFlag\" \"141\";", 0, 0, NULL},
{"alter dnode <dnode_id> \"monitor\" \"0\";", 0, 0, NULL},
{"alter dnode <dnode_id> \"monitor\" \"1\";", 0, 0, NULL},
{"alter all dnodes \"resetlog\";", 0, 0, NULL},
{"alter all dnodes \"debugFlag\" \"141\";", 0, 0, NULL},
{"alter all dnodes \"monitor\" \"0\";", 0, 0, NULL},
{"alter all dnodes \"monitor\" \"1\";", 0, 0, NULL},
{"alter table <tb_name> <tb_actions> <anyword> ;", 0, 0, NULL},
{"alter table modify column", 0, 0, NULL},
{"alter local resetlog;", 0, 0, NULL},
{"alter local DebugFlag 143;", 0, 0, NULL},
{"alter local cDebugFlag 143;", 0, 0, NULL},
{"alter local uDebugFlag 143;", 0, 0, NULL},
{"alter local rpcDebugFlag 143;", 0, 0, NULL},
{"alter local tmrDebugFlag 143;", 0, 0, NULL},
{"alter local \"resetlog\";", 0, 0, NULL},
{"alter local \"DebugFlag\" \"143\";", 0, 0, NULL},
{"alter local \"cDebugFlag\" \"143\";", 0, 0, NULL},
{"alter local \"uDebugFlag\" \"143\";", 0, 0, NULL},
{"alter local \"rpcDebugFlag\" \"143\";", 0, 0, NULL},
{"alter local \"tmrDebugFlag\" \"143\";", 0, 0, NULL},
{"alter topic", 0, 0, NULL},
{"alter user <user_name> <user_actions> <anyword> ;", 0, 0, NULL},
// 20
......@@ -108,6 +109,7 @@ SWords shellCommands[] = {
{"drop topic <topic_name> ;", 0, 0, NULL},
{"drop stream <stream_name> ;", 0, 0, NULL},
{"explain select", 0, 0, NULL}, // 44 append sub sql
{"flush database <db_name> ;", 0, 0, NULL},
{"help;", 0, 0, NULL},
{"grant all on <anyword> to <user_name> ;", 0, 0, NULL},
{"grant read on <anyword> to <user_name> ;", 0, 0, NULL},
......@@ -121,7 +123,6 @@ SWords shellCommands[] = {
{"revoke read on <anyword> from <user_name> ;", 0, 0, NULL},
{"revoke write on <anyword> from <user_name> ;", 0, 0, NULL},
{"select * from <all_table>", 0, 0, NULL},
{"select _block_dist() from <all_table> \\G;", 0, 0, NULL},
{"select client_version();", 0, 0, NULL},
// 60
{"select current_user();", 0, 0, NULL},
......@@ -247,7 +248,7 @@ char* db_options[] = {"keep ",
"wal_retention_size ",
"wal_segment_size "};
char* alter_db_options[] = {"keep ", "cachemodel ", "cachesize ", "wal_fsync_period ", "wal_level "};
char* alter_db_options[] = {"cachemodel ", "replica ", "keep ", "cachesize ", "wal_fsync_period ", "wal_level "};
char* data_types[] = {"timestamp", "int",
"int unsigned", "varchar(16)",
......@@ -262,6 +263,14 @@ char* key_tags[] = {"tags("};
char* key_select[] = {"select "};
char* key_systable[] = {
"ins_dnodes", "ins_mnodes", "ins_modules", "ins_qnodes", "ins_snodes", "ins_cluster",
"ins_databases", "ins_functions", "ins_indexes", "ins_stables", "ins_tables", "ins_tags",
"ins_users", "ins_grants", "ins_vgroups", "ins_configs", "ins_dnode_variables", "ins_topics",
"ins_subscriptions", "ins_streams", "ins_stream_tasks", "ins_vnodes", "ins_user_privileges", "perf_connections",
"perf_queries", "perf_consumers", "perf_trans", "perf_apps"};
//
// ------- gobal variant define ---------
//
......@@ -293,8 +302,9 @@ bool waitAutoFill = false;
#define WT_VAR_TBOPTION 16
#define WT_VAR_USERACTION 17
#define WT_VAR_KEYSELECT 18
#define WT_VAR_SYSTABLE 19
#define WT_VAR_CNT 19
#define WT_VAR_CNT 20
#define WT_FROM_DB_MAX 6 // max get content from db
#define WT_FROM_DB_CNT (WT_FROM_DB_MAX + 1)
......@@ -327,19 +337,19 @@ int cntDel = 0; // delete byte count after next press tab
// show auto tab introduction
void printfIntroduction() {
printf(" ****************************** Tab Completion **********************************\n");
printf(" * The TDengine CLI supports tab completion for a variety of items, *\n");
printf(" * including database names, table names, function names and keywords. *\n");
printf(" * The full list of shortcut keys is as follows: *\n");
printf(" * [ TAB ] ...... complete the current word *\n");
printf(" * ...... if used on a blank line, display all valid commands *\n");
printf(" * [ Ctrl + A ] ...... move cursor to the st[A]rt of the line *\n");
printf(" * [ Ctrl + E ] ...... move cursor to the [E]nd of the line *\n");
printf(" * [ Ctrl + W ] ...... move cursor to the middle of the line *\n");
printf(" * [ Ctrl + L ] ...... clear the entire screen *\n");
printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n");
printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n");
printf(" **********************************************************************************\n\n");
printf(" ****************************** Tab Completion *************************************\n");
printf(" * The TDengine CLI supports tab completion for a variety of items, *\n");
printf(" * including database names, table names, function names and keywords. *\n");
printf(" * The full list of shortcut keys is as follows: *\n");
printf(" * [ TAB ] ...... complete the current word *\n");
printf(" * ...... if used on a blank line, display all supported commands *\n");
printf(" * [ Ctrl + A ] ...... move cursor to the st[A]rt of the line *\n");
printf(" * [ Ctrl + E ] ...... move cursor to the [E]nd of the line *\n");
printf(" * [ Ctrl + W ] ...... move cursor to the middle of the line *\n");
printf(" * [ Ctrl + L ] ...... clear the entire screen *\n");
printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n");
printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n");
printf(" *************************************************************************************\n\n");
}
void showHelp() {
......@@ -348,23 +358,24 @@ void showHelp() {
"\n\
----- A ----- \n\
alter database <db_name> <db_options> \n\
alter dnode <dnode_id> balance \n\
alter dnode <dnode_id> resetlog;\n\
alter all dnodes monitor \n\
alter alldnodes balance \n\
alter alldnodes resetlog;\n\
alter alldnodes debugFlag \n\
alter alldnodes monitor \n\
alter dnode <dnode_id> 'resetlog';\n\
alter dnode <dnode_id> 'monitor' '0';\n\
alter dnode <dnode_id> 'monitor' \"1\";\n\
alter dnode <dnode_id> \"debugflag\" \"143\";\n\
alter all dnodes \"monitor\" \"0\";\n\
alter all dnodes \"monitor\" \"1\";\n\
alter all dnodes \"resetlog\";\n\
alter all dnodes \"debugFlag\" \n\
alter table <tb_name> <tb_actions> ;\n\
alter table modify column\n\
alter local resetlog;\n\
alter local DebugFlag 143;\n\
alter local \"resetlog\";\n\
alter local \"DebugFlag\" \"143\";\n\
alter topic\n\
alter user <user_name> <user_actions> ...\n\
----- C ----- \n\
create table <tb_name> using <stb_name> tags ...\n\
create database <db_name> <db_options> ...\n\
create dnode ...\n\
create dnode \"fqdn:port\"n\
create index ...\n\
create mnode on dnode <dnode_id> ;\n\
create qnode on dnode <dnode_id> ;\n\
......@@ -387,6 +398,8 @@ void showHelp() {
drop stream <stream_name> ;\n\
----- E ----- \n\
explain select clause ...\n\
----- F ----- \n\
flush database <db_name>;\n\
----- H ----- \n\
help;\n\
----- I ----- \n\
......@@ -409,7 +422,6 @@ void showHelp() {
revoke write on <priv_level> from <user_name> ;\n\
----- S ----- \n\
select * from <all_table> where ... \n\
select _block_dist() from <all_table>;\n\
select client_version();\n\
select current_user();\n\
select database();\n\
......@@ -619,12 +631,17 @@ bool shellAutoInit() {
GenerateVarType(WT_VAR_TBOPTION, tb_options, sizeof(tb_options) / sizeof(char*));
GenerateVarType(WT_VAR_USERACTION, user_actions, sizeof(user_actions) / sizeof(char*));
GenerateVarType(WT_VAR_KEYSELECT, key_select, sizeof(key_select) / sizeof(char*));
GenerateVarType(WT_VAR_SYSTABLE, key_systable, sizeof(key_systable) / sizeof(char*));
return true;
}
// set conn
void shellSetConn(TAOS* conn) { varCon = conn; }
void shellSetConn(TAOS* conn) {
varCon = conn;
// init database and stable
updateTireValue(WT_VAR_DBNAME, false);
}
// exit shell auto funciton, shell exit call once
void shellAutoExit() {
......@@ -800,9 +817,42 @@ void* varObtainThread(void* param) {
return NULL;
}
// return true is need update value by async
bool updateTireValue(int type, bool autoFill) {
// TYPE CONTEXT GET FROM DB
taosThreadMutexLock(&tiresMutex);
// check need obtain from server
if (tires[type] == NULL) {
waitAutoFill = autoFill;
// need async obtain var names from db sever
if (threads[type] != NULL) {
if (taosThreadRunning(threads[type])) {
// thread running , need not obtain again, return
taosThreadMutexUnlock(&tiresMutex);
return NULL;
}
// destroy previous thread handle for new create thread handle
taosDestroyThread(threads[type]);
threads[type] = NULL;
}
// create new
void* param = taosMemoryMalloc(sizeof(int));
*((int*)param) = type;
threads[type] = taosCreateThread(varObtainThread, param);
taosThreadMutexUnlock(&tiresMutex);
return true;
}
taosThreadMutexUnlock(&tiresMutex);
return false;
}
// only match next one word from all match words, return valuue must free by caller
char* matchNextPrefix(STire* tire, char* pre) {
SMatch* match = NULL;
if(tire == NULL) return NULL;
// re-use last result
if (lastMatch) {
......@@ -888,32 +938,9 @@ char* tireSearchWord(int type, char* pre) {
return matchNextPrefix(tire, pre);
}
// TYPE CONTEXT GET FROM DB
taosThreadMutexLock(&tiresMutex);
// check need obtain from server
if (tires[type] == NULL) {
waitAutoFill = true;
// need async obtain var names from db sever
if (threads[type] != NULL) {
if (taosThreadRunning(threads[type])) {
// thread running , need not obtain again, return
taosThreadMutexUnlock(&tiresMutex);
return NULL;
}
// destroy previous thread handle for new create thread handle
taosDestroyThread(threads[type]);
threads[type] = NULL;
}
// create new
void* param = taosMemoryMalloc(sizeof(int));
*((int*)param) = type;
threads[type] = taosCreateThread(varObtainThread, param);
taosThreadMutexUnlock(&tiresMutex);
if(updateTireValue(type, true)) {
return NULL;
}
taosThreadMutexUnlock(&tiresMutex);
// can obtain var names from local
STire* tire = getAutoPtr(type);
......@@ -1116,6 +1143,7 @@ void printScreen(TAOS* con, SShellCmd* cmd, SWords* match) {
// main key press tab , matched return true else false
bool firstMatchCommand(TAOS* con, SShellCmd* cmd) {
if(con == NULL || cmd == NULL) return false;
// parse command
SWords* input = (SWords*)taosMemoryMalloc(sizeof(SWords));
memset(input, 0, sizeof(SWords));
......@@ -1660,6 +1688,41 @@ bool matchOther(TAOS* con, SShellCmd* cmd) {
return false;
}
// last match if nothing matched
bool matchEnd(TAOS* con, SShellCmd* cmd) {
// str dump
bool ret = false;
char* ps = strndup(cmd->command, cmd->commandSize);
char* last = lastWord(ps);
char* elast = strrchr(last, '.'); // find end last
if(elast) {
last = elast + 1;
}
// less one char can match
if(strlen(last) == 0 ) {
goto _return;
}
// match database
if(elast == NULL) {
// dot need not completed with dbname
if (fillWithType(con, cmd, last, WT_VAR_DBNAME)) {
ret = true;
goto _return;
}
}
if (fillWithType(con, cmd, last, WT_VAR_SYSTABLE)) {
ret = true;
goto _return;
}
_return:
taosMemoryFree(ps);
return ret;
}
// main key press tab
void pressTabKey(SShellCmd* cmd) {
// check
......@@ -1695,6 +1758,9 @@ void pressTabKey(SShellCmd* cmd) {
matched = matchSelectQuery(varCon, cmd);
if (matched) return;
// match end
matched = matchEnd(varCon, cmd);
return;
}
......@@ -1911,6 +1977,7 @@ void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb) {
if (dealUseDB(sql)) {
// change to new db
updateTireValue(WT_VAR_STABLE, false);
return;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册