提交 c237f7f2 编写于 作者: G Ganlin Zhao

Merge branch '3.0' into fix/TD-17040

......@@ -108,6 +108,7 @@ typedef struct SDataBlockInfo {
// TODO: optimize and remove following
int32_t childId; // used for stream, do not serialize
EStreamType type; // used for stream, do not serialize
STimeWindow calWin; // used for stream, do not serialize
} SDataBlockInfo;
typedef struct SSDataBlock {
......
......@@ -42,25 +42,28 @@ typedef struct SReadHandle {
bool initTqReader;
} SReadHandle;
// in queue mode, data streams are seperated by msg
typedef enum {
OPTR_EXEC_MODEL_BATCH = 0x1,
OPTR_EXEC_MODEL_STREAM = 0x2,
OPTR_EXEC_MODEL_QUEUE = 0x3,
} EOPTR_EXEC_MODEL;
/**
* Create the exec task for streaming mode
* Create the exec task for stream mode
* @param pMsg
* @param streamReadHandle
* @param SReadHandle
* @return
*/
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
/**
* Switch the stream scan to snapshot mode
* @param tinfo
* Create the exec task for queue mode
* @param pMsg
* @param SReadHandle
* @return
*/
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo);
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers);
/**
* Set the input data block for the stream scan.
......@@ -111,7 +114,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
* @return
*/
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion);
int32_t* tversion);
/**
* The main task execution function, including query on both table and multiple tables,
......
......@@ -14,6 +14,7 @@
*/
#include "os.h"
#include "query.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
......@@ -119,6 +120,7 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
if (dequeueFlag == STREAM_QUEUE__FAILED) {
ASSERT(0);
ASSERT(queue->qItem != NULL);
return streamQueueCurItem(queue);
} else {
......@@ -305,6 +307,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
qInfo("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
......
......@@ -94,7 +94,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
#define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); }
// clang-format on
#define BUF_PAGE_DEBUG
//#define BUF_PAGE_DEBUG
#ifdef __cplusplus
}
#endif
......
......@@ -169,6 +169,7 @@ typedef struct SReqResultInfo {
uint32_t numOfRows;
uint64_t totalRows;
uint32_t current;
bool localResultFetched;
bool completed;
int32_t precision;
bool convertUcs4;
......
......@@ -1905,6 +1905,10 @@ int32_t appendTbToReq(SArray* pList, int32_t pos1, int32_t len1, int32_t pos2, i
tbLen = len1;
}
if (dbLen <= 0 || tbLen <= 0) {
return -1;
}
if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
return -1;
}
......
......@@ -852,23 +852,33 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
}
// all data has returned to App already, no need to try again
if ((pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) && pResultInfo->completed) {
if (pResultInfo->completed && (pRequest->body.queryJob != 0)) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
// it is a local executed query, no need to do async fetch
if (pResultInfo->current < pResultInfo->numOfRows && pRequest->body.queryJob == 0) {
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
if (pRequest->body.queryJob == 0) {
ASSERT(pResultInfo->completed && pResultInfo->numOfRows >= 0);
if (pResultInfo->localResultFetched) {
pResultInfo->numOfRows = 0;
pResultInfo->current = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
} else {
pResultInfo->localResultFetched = true;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
}
return;
}
SSchedulerReq req = {
.syncReq = false,
.fetchFp = fetchCallback,
.cbParam = pRequest,
};
schedulerFetchRows(pRequest->body.queryJob, &req);
}
......@@ -880,10 +890,10 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// set the current block is all consumed
pResultInfo->current = pResultInfo->numOfRows;
pResultInfo->convertUcs4 = false;
taos_fetch_rows_a(res, fp, param);
// it is a local executed query, no need to do async fetch
taos_fetch_rows_a(pRequest, fp, param);
}
const void *taos_get_raw_block(TAOS_RES *res) {
......
......@@ -1149,11 +1149,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
/*tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);*/
} else {
ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
}
taosMemoryFree(pMsg->pData);
......@@ -2427,15 +2426,15 @@ static void destroyCreateTbReqBatch(void* data) {
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
SVCreateTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SHashObj *pVgroupHashmap = NULL;
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
SVCreateTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
SHashObj* pVgroupHashmap = NULL;
code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
......@@ -2455,8 +2454,8 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
STscObj* pTscObj = pRequest->pTscObj;
SVCreateTbReq *pCreateReq = NULL;
SCatalog* pCatalog = NULL;
SVCreateTbReq* pCreateReq = NULL;
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
goto end;
......@@ -2540,13 +2539,13 @@ static void destroyDropTbReqBatch(void* data) {
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
SVDropTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SHashObj *pVgroupHashmap = NULL;
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
SVDropTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
SHashObj* pVgroupHashmap = NULL;
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2568,8 +2567,8 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
STscObj* pTscObj = pRequest->pTscObj;
SVDropTbReq *pDropReq = NULL;
SCatalog *pCatalog = NULL;
SVDropTbReq* pDropReq = NULL;
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
goto end;
......@@ -2640,17 +2639,16 @@ end:
return code;
}
static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
SVAlterTbReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SArray *pArray = NULL;
SVgDataBlocks *pVgData = NULL;
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
SVAlterTbReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
SArray* pArray = NULL;
SVgDataBlocks* pVgData = NULL;
code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
......
......@@ -320,7 +320,9 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
memcpy(pColumnInfoData->pData, pSource->pData, pSource->varmeta.length);
} else {
memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
if (pSource->pData) {
memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
}
}
pColumnInfoData->hasNull = pSource->hasNull;
......@@ -1736,56 +1738,57 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id:%" PRIu64 "|\n", flag,
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId);
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id:%" PRIu64 "| uid:%ld\n", flag,
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
pDataBlock->info.uid);
if (len >= size - 1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) {
len += snprintf(dumpBuf + len, size - len, "%s |", flag);
if (len >= size -1) return dumpBuf;
if (len >= size - 1) return dumpBuf;
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (colDataIsNull(pColInfoData, rows, j, NULL) || !pColInfoData->pData) {
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
if (len >= size -1) return dumpBuf;
if (len >= size - 1) return dumpBuf;
continue;
}
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
if (len >= size -1) return dumpBuf;
if (len >= size - 1) return dumpBuf;
break;
case TSDB_DATA_TYPE_INT:
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
if (len >= size -1) return dumpBuf;
if (len >= size - 1) return dumpBuf;
break;
case TSDB_DATA_TYPE_UINT:
len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
if (len >= size -1) return dumpBuf;
if (len >= size - 1) return dumpBuf;
break;
case TSDB_DATA_TYPE_BIGINT:
len += snprintf(dumpBuf + len, size - len, " %15ld |", *(int64_t*)var);
if (len >= size -1) return dumpBuf;
if (len >= size - 1) return dumpBuf;
break;
case TSDB_DATA_TYPE_UBIGINT:
len += snprintf(dumpBuf + len, size - len, " %15lu |", *(uint64_t*)var);
if (len >= size -1) return dumpBuf;
if (len >= size - 1) return dumpBuf;
break;
case TSDB_DATA_TYPE_FLOAT:
len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
if (len >= size -1) return dumpBuf;
if (len >= size - 1) return dumpBuf;
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
if (len >= size -1) return dumpBuf;
if (len >= size - 1) return dumpBuf;
break;
}
}
len += snprintf(dumpBuf + len, size - len, "\n");
if (len >= size -1) return dumpBuf;
if (len >= size - 1) return dumpBuf;
}
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
return dumpBuf;
......
......@@ -114,7 +114,7 @@ int32_t tsMinSlidingTime = 10;
// the maxinum number of distict query result
int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
// 1 us for interval time range, changed accordingly
// 1 database precision unit for interval time range, changed accordingly
int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
......
......@@ -4962,7 +4962,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
if (tDecodeTag(pCoder, (STag **)&pReq->ctb.pTag) < 0) return -1;
} else if (pReq->type == TSDB_NORMAL_TABLE) {
if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
if (tDecodeSSchemaWrapperEx(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
} else {
ASSERT(0);
}
......@@ -5526,6 +5526,11 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
ASSERT(0);
// TODO
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
} else {
ASSERT(0);
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/
/*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/
/*return true;*/
}
}
return false;
......
......@@ -199,6 +199,7 @@ int32_t mndInitSync(SMnode *pMnode) {
}
// decrease election timer
setPingTimerMS(pMgmt->sync, 5000);
setElectTimerMS(pMgmt->sync, 600);
setHeartbeatTimerMS(pMgmt->sync, 300);
......
......@@ -89,8 +89,6 @@ typedef struct {
STqExecTb execTb;
STqExecDb execDb;
};
// TODO remove it
int64_t tsdbEndVer;
} STqExecHandle;
......@@ -101,6 +99,8 @@ typedef struct {
int32_t epoch;
int8_t fetchMeta;
int64_t snapshotVer;
// TODO remove
SWalReader* pWalReader;
......@@ -131,7 +131,7 @@ typedef struct {
static STqMgmt tqMgmt = {0};
// tqRead
int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* offset);
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* offset);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec
......
......@@ -284,7 +284,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffsetNew = pOffset->val;
char formatBuf[80];
tFormatOffset(formatBuf, 80, &fetchOffsetNew);
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, offset reset to %s", consumerId, pHandle->subKey, formatBuf);
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode), formatBuf);
} else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
......@@ -299,8 +300,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer %ld, subkey %s, offset reset to %ld", consumerId, pHandle->subKey,
dataRsp.rspOffset.version);
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, offset reset to %ld", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
......@@ -318,10 +319,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// 3.query
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
fetchOffsetNew.version++;
}
if (tqScan(pTq, &pHandle->execHandle, &dataRsp, &fetchOffsetNew) < 0) {
/*if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {*/
/*fetchOffsetNew.version++;*/
/*}*/
if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0);
code = -1;
goto OVER;
......@@ -480,30 +481,28 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->fetchMeta = req.withMeta;
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
/*for (int32_t i = 0; i < 5; i++) {*/
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/
// TODO version should be assigned in preprocess
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
pHandle->execHandle.execCol.qmsg = req.qmsg;
pHandle->snapshotVer = ver;
req.qmsg = NULL;
for (int32_t i = 0; i < 5; i++) {
SReadHandle handle = {
.tqReader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = ver,
};
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner);
ASSERT(scanner);
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader[i]);
pHandle->execHandle.tsdbEndVer = ver;
}
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
for (int32_t i = 0; i < 5; i++) {
......
......@@ -59,13 +59,13 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
return 0;
}
int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
qTaskInfo_t task = pExec->execCol.task[0];
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->execCol.task[0];
if (qStreamPrepareScan(task, pOffset) < 0) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
pRsp->rspOffset = *pOffset;
pRsp->rspOffset.version--;
return 0;
}
......@@ -73,9 +73,11 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
tqDebug("task start to execute");
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
tqDebug("task execute end, get %p", pDataBlock);
if (pDataBlock != NULL) {
tqAddBlockDataToRsp(pDataBlock, pRsp);
......@@ -97,7 +99,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
}
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqOffsetResetToLog(pOffset, pExec->tsdbEndVer + 1);
tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1);
qStreamPrepareScan(task, pOffset);
continue;
}
......@@ -116,7 +118,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
}
tqDebug("task exec exited");
break;
}
......
......@@ -19,6 +19,7 @@ static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1;
if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
......@@ -32,6 +33,7 @@ static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1;
if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
......@@ -78,19 +80,25 @@ int32_t tqMetaOpen(STQ* pTq) {
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) {
handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);
}
/*for (int32_t i = 0; i < 5; i++) {*/
/*handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) {
SReadHandle reader = {
.tqReader = handle.execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = handle.snapshotVer,
};
handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader);
handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader);
ASSERT(handle.execHandle.execCol.task[i]);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner);
ASSERT(scanner);
handle.execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader[i]);
}
} else {
handle.execHandle.execDb.pFilterOutTbUid =
......
......@@ -63,15 +63,15 @@ typedef struct SBlockLoadSuppInfo {
} SBlockLoadSuppInfo;
typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list
SArray* pFileList; // data file list
int32_t order;
int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list
SArray* pFileList; // data file list
int32_t order;
} SFilesetIter;
typedef struct SFileDataBlockInfo {
int32_t
tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
uint64_t uid;
} SFileDataBlockInfo;
......@@ -119,10 +119,10 @@ struct STsdbReader {
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo suppInfo;
SIOCostSummary cost;
STSchema* pSchema;
SDataFReader* pFileReader;
SVersionRange verRange;
SIOCostSummary cost;
STSchema* pSchema;
SDataFReader* pFileReader;
SVersionRange verRange;
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
......@@ -287,9 +287,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFSt
return TSDB_CODE_SUCCESS;
}
static void cleanupFilesetIterator(SFilesetIter* pIter) {
taosArrayDestroy(pIter->pFileList);
}
static void cleanupFilesetIterator(SFilesetIter* pIter) { taosArrayDestroy(pIter->pFileList); }
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pIter->order);
......@@ -304,6 +302,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
STimeWindow win = {0};
while (1) {
/*if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader);*/
pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
......@@ -349,9 +348,7 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
}
}
static void cleanupDataBlockIterator(SDataBlockIter* pIter) {
taosArrayDestroy(pIter->blockList);
}
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
static void initReaderStatus(SReaderStatus* pStatus) {
pStatus->pTableIter = NULL;
......@@ -392,8 +389,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
initReaderStatus(&pReader->status);
pReader->pTsdb =
getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->suid = pCond->suid;
pReader->order = pCond->order;
pReader->capacity = 4096;
......@@ -833,7 +829,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
pBlockData, &pb, &pb1);
pBlockData, &pb, &pb1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -1459,18 +1455,18 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl
}
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
// ts is not overlap
if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
return false;
}
int32_t step = ASCENDING_TRAVERSE(order)? 1:-1;
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
// version is not overlap
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
for(int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += step) {
for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += step) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
if (p->version >= pBlock->minVersion) {
......@@ -1502,8 +1498,8 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
}
// has duplicated ts of different version in this block
bool hasDup = (pBlock->nSubBlock == 1)? pBlock->hasDup:true;
bool overlapWithDel= overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || overlapWithDel);
......@@ -2220,17 +2216,18 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
}
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
int64_t startVer = (pCond->startVersion == -1)? 0:pCond->startVersion;
int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
if (VND_IS_RSMA(pVnode)) {
return (SVersionRange){.minVer = startVer, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
}
int64_t endVer = 0;
if (pCond->endVersion == -1) { // user not specified end version, set current maximum version of vnode as the endVersion
if (pCond->endVersion ==
-1) { // user not specified end version, set current maximum version of vnode as the endVersion
endVer = pVnode->state.applied;
} else {
endVer = (pCond->endVersion > pVnode->state.applied)? pVnode->state.applied:pCond->endVersion;
endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
}
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
......@@ -2274,9 +2271,9 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
if (pDelList == NULL) {
return false;
}
size_t num = taosArrayGetSize(pDelList);
bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc? 1:-1;
size_t num = taosArrayGetSize(pDelList);
bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc ? 1 : -1;
if (asc) {
if (*index >= num - 1) {
......@@ -2823,7 +2820,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pSupInfo->colIds);
taosArrayDestroy(pSupInfo->pColAgg);
for(int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
if (pSupInfo->buildBuf[i] != NULL) {
taosMemoryFreeClear(pSupInfo->buildBuf[i]);
}
......@@ -2835,7 +2832,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
destroyBlockScanInfo(pReader->status.pTableMap);
blockDataDestroy(pReader->pResBlock);
if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader);
#if 0
// if (pReader->status.pTableScanInfo != NULL) {
// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
......@@ -3011,8 +3008,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return TSDB_CODE_SUCCESS;
}
pReader->order = pCond->order;
pReader->type = BLOCK_LOAD_OFFSET_ORDER;
pReader->order = pCond->order;
pReader->type = BLOCK_LOAD_OFFSET_ORDER;
pReader->status.loadFromFile = true;
pReader->status.pTableIter = NULL;
......@@ -3028,6 +3025,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t numOfTables = 1;
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
tsdbDataFReaderClose(&pReader->pFileReader);
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
......@@ -3114,13 +3113,12 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
}
/*
hasNext = blockIteratorNext(&pStatus->blockIter);
*/
/*
hasNext = blockIteratorNext(&pStatus->blockIter);
*/
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
}
return code;
......@@ -3158,7 +3156,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
return rows;
}
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t *suid) {
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
int32_t sversion = 1;
SMetaReader mr = {0};
......@@ -3171,7 +3169,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
}
*suid = 0;
if (mr.me.type == TSDB_CHILD_TABLE) {
*suid = mr.me.ctbEntry.suid;
code = metaGetTableEntryByUid(&mr, *suid);
......@@ -3188,8 +3186,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
metaReaderClear(&mr);
*pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
return TSDB_CODE_SUCCESS;
}
......@@ -569,7 +569,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
return -1;
}
setPingTimerMS(pVnode->sync, 3000);
setPingTimerMS(pVnode->sync, 5000);
setElectTimerMS(pVnode->sync, 500);
setHeartbeatTimerMS(pVnode->sync, 100);
return 0;
......
......@@ -389,6 +389,7 @@ typedef struct SStreamScanInfo {
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
STimeWindow updateWin;
// status for tmq
// SSchemaWrapper schema;
......
......@@ -191,6 +191,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
pBlock->info.blockId = pNode->dataBlockId;
pBlock->info.type = STREAM_INVALID;
pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
for (int32_t i = 0; i < numOfCols; ++i) {
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
......
......@@ -106,6 +106,30 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return code;
}
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers) {
if (msg == NULL) {
// TODO create raw scan
return NULL;
}
struct SSubplan* plan = NULL;
int32_t code = qStringToSubplan(msg, &plan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
return NULL;
}
return pTaskInfo;
}
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
if (msg == NULL) {
return NULL;
......@@ -186,7 +210,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion) {
int32_t* tversion) {
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
......
......@@ -269,13 +269,13 @@ const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
return pTaskInfo->streamInfo.metaBlk;
}
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
return 0;
}
......@@ -283,35 +283,41 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
pTaskInfo->streamInfo.prepareStatus = *pOffset;
// TODO: optimize
/*if (pTaskInfo->streamInfo.lastStatus.type != pOffset->type ||*/
/*pTaskInfo->streamInfo.prepareStatus.version != pTaskInfo->streamInfo.lastStatus.version) {*/
while (1) {
uint8_t type = pOperator->operatorType;
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOperator->info;
if (pOffset->type == TMQ_OFFSET__LOG) {
if (tqSeekVer(pInfo->tqReader, pOffset->version) < 0) {
return -1;
}
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts;
if (uid == 0) {
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
while (1) {
uint8_t type = pOperator->operatorType;
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOperator->info;
if (pOffset->type == TMQ_OFFSET__LOG) {
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
ASSERT(0);
}
}
if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||
pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {
#endif
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
return -1;
}
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts;
if (uid == 0) {
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
}
}
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
bool found = false;
......@@ -320,6 +326,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
if (pTableInfo->uid == uid) {
found = true;
pTableScanInfo->currentTable = i;
break;
}
}
......@@ -335,18 +342,18 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
pTableScanInfo->currentTable, tableSz);
}
/*}*/
} else {
ASSERT(0);
}
return 0;
} else {
ASSERT(0);
ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
}
return 0;
} else {
ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
}
}
/*}*/
return 0;
}
......
......@@ -538,7 +538,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
return code;
}
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
// todo add a dummy funtion to avoid process check
......@@ -2969,25 +2969,10 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
code = doAggregateImpl(pOperator, 0, pSup->pCtx);
code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != 0) {
longjmp(pTaskInfo->env, code);
}
#if 0 // test for encode/decode result info
if(pOperator->fpSet.encodeResultRow){
char *result = NULL;
int32_t length = 0;
pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
SAggSupporter* pSup = &pAggInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable);
pInfo->resultRowInfo.size = 0;
pOperator->fpSet.decodeResultRow(pOperator, result);
if(result){
taosMemoryFree(result);
}
}
#endif
}
closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
......@@ -3250,6 +3235,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
pOperator->status = OP_OPENED;
return NULL;
}
return NULL;
}
......@@ -3283,11 +3272,15 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
while (1) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
qDebug("projection call next");
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
// TODO optimize
/*if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {*/
qDebug("projection get null");
/*if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH) {*/
doSetOperatorCompleted(pOperator);
/*} else if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {*/
/*pOperator->status = OP_RES_TO_RETURN;*/
/*}*/
break;
}
......
......@@ -884,6 +884,28 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
return true;
}
static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[*pRowIndex], pInterval,
TSDB_ORDER_ASC);
STimeWindow endWin = win;
STimeWindow preWin = win;
while (1) {
(*pRowIndex) += getNumOfRowsInTimeWindow(pDataBlockInfo, tsCol, *pRowIndex, endWin.ekey,
binarySearchForKey, NULL, TSDB_ORDER_ASC);
do {
preWin = endWin;
getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
} while (tsCol[(*pRowIndex) - 1] >= endWin.skey);
endWin = preWin;
if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows ) {
win.ekey = endWin.ekey;
return win;
}
win.ekey = endWin.ekey;
}
}
static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
STimeWindow win = {
.skey = INT64_MIN,
......@@ -905,10 +927,13 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC);
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) +=
getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
pInfo->updateWin.skey = tsCols[*pRowIndex];
win = getSlidingWindow(tsCols, &pInfo->interval, &pSDB->info, pRowIndex);
pInfo->updateWin.ekey = tsCols[*pRowIndex - 1];
// win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC);
// (*pRowIndex) +=
// getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
}
needRead = true;
} else if (isStateWindow(pInfo)) {
......@@ -974,10 +999,12 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_
}
}
if (!pResult) {
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
return NULL;
}
if (pResult->info.groupId == pInfo->groupId) {
pResult->info.calWin = pInfo->updateWin;
return pResult;
}
}
......@@ -1209,6 +1236,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*return NULL;*/
/*}*/
qDebug("stream scan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) {
SFetchRet ret = {0};
......@@ -1220,6 +1248,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
// TODO clean data block
if (pInfo->pRes->info.rows > 0) {
qDebug("stream scan log return %d rows", pInfo->pRes->info.rows);
return pInfo->pRes;
}
} else if (ret.fetchType == FETCH_TYPE__META) {
......@@ -1230,6 +1259,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} else if (ret.fetchType == FETCH_TYPE__NONE) {
pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version);
qDebug("stream scan log return null");
return NULL;
} else {
ASSERT(0);
......@@ -1237,7 +1267,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL;
if (pResult && pResult->info.rows > 0) {
qDebug("stream scan tsdb return %d rows", pResult->info.rows);
return pResult;
}
qDebug("stream scan tsdb return null");
return NULL;
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
// TODO scan meta
ASSERT(0);
......@@ -1256,8 +1291,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
// TODO move into scan
pBlock->info.calWin.skey = INT64_MIN;
pBlock->info.calWin.ekey = INT64_MAX;
blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) {
case STREAM_NORMAL:
case STREAM_GET_ALL:
return pBlock;
case STREAM_RETRIEVE: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
......@@ -1287,6 +1327,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qDebug("scan mode %d", pInfo->scanMode);
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
blockDataDestroy(pInfo->pUpdateRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
......@@ -1381,7 +1422,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
}
}
qDebug("scan rows: %d", pBlockInfo->rows);
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
#if 0
......@@ -1533,6 +1574,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pStreamScanOp = pOperator;
pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createPullDataBlock();
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
pOperator->name = "StreamScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......@@ -2860,101 +2902,3 @@ _error:
return NULL;
}
static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SLastrowScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t size = taosArrayGetSize(pInfo->pTableList);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
// check if it is a group by tbname
if (size == taosArrayGetSize(pInfo->pTableList)) {
blockDataCleanup(pInfo->pRes);
tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds);
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} else {
// todo fetch the result for each group
}
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
static void destroyLastrowScanOperator(void* param, int32_t numOfOutput) {
SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
blockDataDestroy(pInfo->pRes);
tsdbLastrowReaderClose(pInfo->pLastrowReader);
taosMemoryFreeClear(param);
}
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList,
SExecTaskInfo* pTaskInfo) {
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pTableList = pTableList;
pInfo->readHandle = *readHandle;
pInfo->pRes = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols,
COL_MATCH_FROM_COL_ID);
int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t));
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
pCols[i] = pColMatch->colId;
}
pInfo->pSlotIds = taosMemoryMalloc(numOfCols * sizeof(pInfo->pSlotIds[0]));
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i);
for (int32_t j = 0; j < pTaskInfo->schemaVer.sw->nCols; ++j) {
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId &&
pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pInfo->pSlotIds[pColMatch->targetSlotId] = -1;
break;
}
if (pColMatch->colId == pTaskInfo->schemaVer.sw->pSchema[j].colId) {
pInfo->pSlotIds[pColMatch->targetSlotId] = j;
break;
}
}
}
tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, pCols, numOfCols,
&pInfo->pLastrowReader);
taosMemoryFree(pCols);
pOperator->name = "LastrowScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
initResultSizeInfo(pOperator, 1024);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
pOperator->cost.openCost = 0;
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
......@@ -419,6 +419,14 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
return true;
}
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo) {
if (pInterval->interval != pInterval->sliding && (pWin->ekey < pBlockInfo->calWin.skey ||
pWin->skey > pBlockInfo->calWin.ekey) ) {
return false;
}
return true;
}
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
bool ascQuery = (order == TSDB_ORDER_ASC);
......@@ -432,6 +440,10 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext,
return -1;
}
if (!inSlidingWindow(pInterval, pNext, pDataBlockInfo) && order == TSDB_ORDER_ASC) {
return -1;
}
TSKEY skey = ascQuery ? pNext->skey : pNext->ekey;
int32_t startPos = 0;
......@@ -801,7 +813,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
int32_t ret = TSDB_CODE_SUCCESS;
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
......@@ -834,7 +846,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
}
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, pInfo->order);
......@@ -916,7 +928,7 @@ int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo
tsCols = (int64_t*)pColDataInfo->pData;
if (tsCols != NULL) {
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
}
}
......@@ -1279,17 +1291,23 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
pGpDatas = (uint64_t*)pGpCol->pData;
}
int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, TSDB_ORDER_ASC);
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId;
int32_t startPos = 0;
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[0], pInterval, TSDB_ORDER_ASC);
while (1) {
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId;
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput);
if (pUpWins && res) {
SWinRes winRes = {.ts = win.skey, .groupId = winGpId};
taosArrayPush(pUpWins, &winRes);
}
int32_t prevEndPos = step - 1 + startPos;
startPos = getNextQualifiedWindow(pInterval, &win, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
if (startPos < 0) {
break;
}
}
}
......@@ -1332,13 +1350,13 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
if (chIds && pPullDataMap) {
SArray* chAy = *(SArray**)chIds;
int32_t size = taosArrayGetSize(chAy);
qInfo("window %" PRId64 " wait child size:%d", win.skey, size);
qDebug("window %" PRId64 " wait child size:%d", win.skey, size);
for (int32_t i = 0; i < size; i++) {
qInfo("window %" PRId64 " wait chid id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
qDebug("window %" PRId64 " wait chid id:%d", win.skey, *(int32_t*)taosArrayGet(chAy, i));
}
continue;
} else if (pPullDataMap) {
qInfo("close window %" PRId64, win.skey);
qDebug("close window %" PRId64, win.skey);
}
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
......@@ -2434,7 +2452,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
}
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if (pInfo->ignoreExpiredData && isClosed) {
if ((pInfo->ignoreExpiredData && isClosed) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
......@@ -2491,8 +2509,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
if (IS_FINAL_OP(pInfo)) {
forwardRows = 1;
} else {
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
NULL, TSDB_ORDER_ASC);
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResultRow(pResult, tableGroupId, pUpdated);
......@@ -2609,6 +2627,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) {
......@@ -2659,7 +2679,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearSpecialDataBlock(pInfo->pUpdateRes);
removeDeleteResults(pUpdated, pInfo->pDelWins);
pOperator->status = OP_RES_TO_RETURN;
qInfo("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
qDebug("%s return data", IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi");
break;
}
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval Final recv" : "interval Semi recv");
......@@ -3101,12 +3121,7 @@ int64_t getSessionWindowEndkey(void* data, int32_t index) {
}
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap) {
int64_t sGap = ts - pWin->skey + gap;
int64_t eGap = pWin->ekey - ts + gap;
// if ((sGap < 0 && sGap >= -gap) || (eGap < 0 && eGap >= -gap) || (sGap >= 0 && eGap >= 0)) {
// return true;
// }
if (sGap >= 0 && eGap >= 0) {
if (ts + gap >= pWin->skey && ts - gap <= pWin->ekey) {
return true;
}
return false;
......
......@@ -2774,7 +2774,6 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
}
}
pInfo->hasResult = true;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1;
break;
}
......@@ -2871,7 +2870,6 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
}
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
break;
}
......@@ -6014,6 +6012,15 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
}
}
......
......@@ -2481,7 +2481,6 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
int32_t code = tjsonToObject(pJson, jkSubplanId, jsonToSubplanId, &pNode->id);
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkSubplanType, pNode->subplanType, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanMsgType, &pNode->msgType);
......
......@@ -956,7 +956,8 @@ void nodesDestroyNode(SNode* pNode) {
}
case QUERY_NODE_PHYSICAL_SUBPLAN: {
SSubplan* pSubplan = (SSubplan*)pNode;
nodesDestroyList(pSubplan->pChildren);
// nodesDestroyList(pSubplan->pChildren);
nodesClearList(pSubplan->pChildren);
nodesDestroyNode((SNode*)pSubplan->pNode);
nodesDestroyNode((SNode*)pSubplan->pDataSink);
nodesDestroyNode((SNode*)pSubplan->pTagCond);
......@@ -972,7 +973,7 @@ void nodesDestroyNode(SNode* pNode) {
SNode* pElement = NULL;
FOREACH(pElement, pPlan->pSubplans) {
if (first) {
first = false;
// first = false;
nodesDestroyNode(pElement);
} else {
nodesClearList(((SNodeListNode*)pElement)->pNodeList);
......
......@@ -556,6 +556,7 @@ signed_literal(A) ::= TIMESTAMP NK_STRING(B).
signed_literal(A) ::= duration_literal(B). { A = releaseRawExprNode(pCxt, B); }
signed_literal(A) ::= NULL(B). { A = createValueNode(pCxt, TSDB_DATA_TYPE_NULL, &B); }
signed_literal(A) ::= literal_func(B). { A = releaseRawExprNode(pCxt, B); }
signed_literal(A) ::= NK_QUESTION(B). { A = createPlaceholderValueNode(pCxt, &B); }
%type literal_list { SNodeList* }
%destructor literal_list { nodesDestroyList($$); }
......
......@@ -133,7 +133,10 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
assert(*p == TS_PATH_DELIMITER[0]);
int32_t dbLen = p - pTableName->z;
char name[TSDB_DB_FNAME_LEN] = {0};
if (dbLen <= 0) {
return buildInvalidOperationMsg(pMsgBuf, msg2);
}
char name[TSDB_DB_FNAME_LEN] = {0};
strncpy(name, pTableName->z, dbLen);
dbLen = strdequote(name);
......
......@@ -2173,14 +2173,28 @@ static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char uni
return -1;
}
static const char* getPrecisionStr(uint8_t precision) {
switch (precision) {
case TSDB_TIME_PRECISION_MILLI:
return TSDB_TIME_PRECISION_MILLI_STR;
case TSDB_TIME_PRECISION_MICRO:
return TSDB_TIME_PRECISION_MICRO_STR;
case TSDB_TIME_PRECISION_NANO:
return TSDB_TIME_PRECISION_NANO_STR;
default:
break;
}
return "unknown";
}
static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) {
uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision;
SValueNode* pInter = (SValueNode*)pInterval->pInterval;
bool valInter = TIME_IS_VAR_DURATION(pInter->unit);
if (pInter->datum.i <= 0 ||
(!valInter && convertTimePrecision(pInter->datum.i, precision, TSDB_TIME_PRECISION_MICRO) < tsMinIntervalTime)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL, tsMinIntervalTime);
if (pInter->datum.i <= 0 || (!valInter && pInter->datum.i < tsMinIntervalTime)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL, tsMinIntervalTime,
getPrecisionStr(precision));
}
if (NULL != pInterval->pOffset) {
......@@ -2754,6 +2768,11 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns
}
}
if (NULL == pPrimaryKeyExpr) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM,
"Primary timestamp column can not be null");
}
return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery);
}
......@@ -2998,8 +3017,7 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
int32_t code =
checkRangeOption(pCxt, "buffer", pOptions->buffer, TSDB_MIN_BUFFER_PER_VNODE, TSDB_MAX_BUFFER_PER_VNODE);
if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "cacheLast", pOptions->cacheLast, TSDB_MIN_DB_CACHE_LAST,
TSDB_MAX_DB_CACHE_LAST);
code = checkRangeOption(pCxt, "cacheLast", pOptions->cacheLast, TSDB_MIN_DB_CACHE_LAST, TSDB_MAX_DB_CACHE_LAST);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "cacheLastSize", pOptions->cacheLastSize, TSDB_MIN_DB_CACHE_LAST_SIZE,
......
......@@ -60,7 +60,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_EXPRIE_STATEMENT:
return "This statement is no longer supported";
case TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL:
return "Interval cannot be less than %d us";
return "Interval cannot be less than %d %s";
case TSDB_CODE_PAR_DB_NOT_SPECIFIED:
return "Database not specified";
case TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME:
......
此差异已折叠。
......@@ -729,6 +729,7 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) {
if (colDataIsNull_s(output.columnData, 0)) {
res->node.resType.type = TSDB_DATA_TYPE_NULL;
res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
} else {
res->node.resType.type = output.columnData->info.type;
res->node.resType.bytes = output.columnData->info.bytes;
......@@ -819,6 +820,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
if (colDataIsNull_s(output.columnData, 0)) {
if(node->node.resType.type != TSDB_DATA_TYPE_JSON){
res->node.resType.type = TSDB_DATA_TYPE_NULL;
res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
}else{
res->node.resType = node->node.resType;
res->isNull = true;
......
......@@ -173,7 +173,8 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
}
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
qInfo("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, pReq->upstreamTaskId);
qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId,
pReq->upstreamTaskId);
// 1. handle input
streamTaskEnqueue(pTask, pReq, pRsp);
......
......@@ -26,10 +26,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qDebug("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks;
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DROP) {
// TODO exec drop
......
......@@ -30,7 +30,7 @@ extern "C" {
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define PING_TIMER_MS 5000
#define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
......
......@@ -273,16 +273,8 @@ int32_t syncLeaderTransfer(int64_t rid) {
}
ASSERT(rid == pSyncNode->rid);
if (pSyncNode->peersNum == 0) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
int32_t ret = syncNodeLeaderTransfer(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
int32_t ret = syncLeaderTransferTo(rid, newLeader);
return ret;
}
......@@ -293,25 +285,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
int32_t ret = 0;
if (pSyncNode->replicaNum == 1) {
sError("only one replica, cannot drop leader");
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_ONE_REPLICA;
return -1;
}
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
pMsg->newLeaderId.vgId = pSyncNode->vgId;
pMsg->newNodeInfo = newLeader;
ASSERT(pMsg != NULL);
SRpcMsg rpcMsg = {0};
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
syncLeaderTransferDestroy(pMsg);
ret = syncNodePropose(pSyncNode, &rpcMsg, false);
int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
......@@ -337,6 +312,12 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
return -1;
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
pMsg->newLeaderId.vgId = pSyncNode->vgId;
......@@ -1118,19 +1099,13 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode);
return;
} else {
syncNodeBecomeFollower(pSyncNode, "first start");
}
syncNodeBecomeFollower(pSyncNode, "first start");
// int32_t ret = 0;
// ret = syncNodeStartPingTimer(pSyncNode);
// ASSERT(ret == 0);
if (gRaftDetailLog) {
syncNodeLog2("==state change become leader immediately==", pSyncNode);
}
int32_t ret = 0;
ret = syncNodeStartPingTimer(pSyncNode);
ASSERT(ret == 0);
}
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
......@@ -1147,8 +1122,6 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
void syncNodeClose(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "sync close");
// leader transfer
int32_t ret;
ASSERT(pSyncNode != NULL);
......@@ -1183,14 +1156,6 @@ void syncNodeClose(SSyncNode* pSyncNode) {
pSyncNode->pNewNodeReceiver = NULL;
}
/*
if (pSyncNode->pSnapshot != NULL) {
taosMemoryFree(pSyncNode->pSnapshot);
}
*/
// tsem_destroy(&pSyncNode->restoreSem);
// free memory in syncFreeNode
// taosMemoryFree(pSyncNode);
}
......@@ -1255,7 +1220,7 @@ int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
&pSyncNode->pPingTimer);
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
} else {
sError("sync env is stop, syncNodeStartPingTimer");
sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
}
return ret;
}
......@@ -1276,7 +1241,7 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
&pSyncNode->pElectTimer);
atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser);
} else {
sError("sync env is stop, syncNodeStartElectTimer");
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
}
return ret;
}
......@@ -1316,7 +1281,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
&pSyncNode->pHeartbeatTimer);
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else {
sError("sync env is stop, syncNodeStartHeartbeatTimer");
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
}
return ret;
}
......@@ -2643,7 +2608,7 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
syncNodeEventLog(ths, "begin leader transfer");
syncNodeEventLog(ths, "do leader transfer");
bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
......
......@@ -17,6 +17,11 @@
#include "syncElection.h"
#include "syncReplication.h"
int32_t syncNodeTimerRoutine(SSyncNode* ths) {
syncNodeEventLog(ths, "timer routines ... ");
return 0;
}
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0;
syncTimeoutLog2("==syncNodeOnTimeoutCb==", pMsg);
......@@ -24,8 +29,11 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->pingTimerCounter);
// syncNodePingAll(ths);
syncNodePingPeers(ths);
// syncNodePingPeers(ths);
syncNodeTimerRoutine(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
......@@ -40,7 +48,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
syncNodeReplicate(ths);
}
} else {
sTrace("unknown timeoutType:%d", pMsg->timeoutType);
sError("vgId:%d, unknown timeout-type:%d", ths->vgId, pMsg->timeoutType);
}
return ret;
......
......@@ -66,6 +66,7 @@ void walCloseReader(SWalReader *pRead) {
}
int32_t walNextValidMsg(SWalReader *pRead) {
wDebug("vgId:%d wal start to fetch", pRead->pWal->cfg.vgId);
int64_t fetchVer = pRead->curVersion;
int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal);
while (fetchVer <= endVer) {
......@@ -176,7 +177,7 @@ int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) {
return -1;
}
wDebug("wal version reset from %ld to %ld", pRead->curVersion, ver);
wDebug("wal version reset from %ld(invalid: %d) to %ld", pRead->curVersion, pRead->curInvalid, ver);
pRead->curVersion = ver;
return 0;
......@@ -242,6 +243,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
return -1;
}
}
pRead->curInvalid = 0;
return 0;
}
......@@ -301,6 +303,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int64_t code;
ASSERT(pRead->curVersion == pRead->pHead->head.version);
ASSERT(pRead->curInvalid == 0);
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
......@@ -404,6 +407,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
}
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
wDebug("vgId:%d wal start to read ver %ld", pRead->pWal->cfg.vgId, ver);
int64_t contLen;
bool seeked = false;
......
......@@ -96,11 +96,11 @@
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
# ./test.sh -f tsim/stream/distributesession0.sim
./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session1.sim
# ./test.sh -f tsim/stream/state0.sim
./test.sh -f tsim/stream/state0.sim
./test.sh -f tsim/stream/triggerInterval0.sim
# ./test.sh -f tsim/stream/triggerSession0.sim
./test.sh -f tsim/stream/partitionby.sim
......
......@@ -89,5 +89,10 @@ endi
#TODO: MOVE IT TO NORMAL CASE
sql_error select * from tb1 where not (null);
sql select sum(1/0) from tb1;
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -76,7 +76,7 @@ if $data01 != 5 then
goto loop1
endi
if $data02 != 14 then
if $data02 != 38 then
print =====data02=$data02
goto loop1
endi
......@@ -134,7 +134,7 @@ if $data01 != 6 then
goto loop2
endi
if $data02 != 18 then
if $data02 != 42 then
print =====data02=$data02
goto loop2
endi
......@@ -192,7 +192,7 @@ if $data01 != 7 then
goto loop3
endi
if $data02 != 22 then
if $data02 != 46 then
print =====data02=$data02
goto loop3
endi
......@@ -232,60 +232,4 @@ endi
print loop3 over
$loop_count = 0
loop4:
sleep 1000
sql select * from streamtST1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 7 then
print =====data01=$data01
goto loop4
endi
if $data02 != 22 then
print =====data02=$data02
goto loop4
endi
# row 1
if $data11 != 3 then
print =====data11=$data11
goto loop4
endi
if $data12 != 10 then
print =====data12=$data12
goto loop4
endi
#row2
if $data21 != 3 then
print =====data21=$data21
goto loop4
endi
if $data22 != 11 then
print =====data22=$data22
goto loop4
endi
#row 3
if $data31 != 5 then
print =====data31=$data31
goto loop4
endi
if $data32 != 60 then
print =====data32=$data32
goto loop4
endi
print loop4 over
system sh/stop_dnodes.sh
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
if $data[0][0] != 1 then
return -1
endi
if $data[0][4] != ready then
goto check_dnode_ready
endi
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$loop_cnt = 0
check_dnode_ready_1:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnodes not ready!
return -1
endi
sql show dnodes
print ===> $rows $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ===> $rows $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
print ===> $rows $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6]
print ===> $rows $data[3][0] $data[3][1] $data[3][2] $data[3][3] $data[3][4] $data[3][5] $data[3][6]
if $data[0][4] != ready then
goto check_dnode_ready_1
endi
if $data[1][4] != ready then
goto check_dnode_ready_1
endi
if $data[2][4] != ready then
goto check_dnode_ready_1
endi
if $data[3][4] != ready then
goto check_dnode_ready_1
endi
$replica = 3
$vgroups = 1
print ============= create database
sql create database db replica $replica vgroups $vgroups
$loop_cnt = 0
check_db_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 100 then
print ====> db not ready!
return -1
endi
sql show databases
print ===> rows: $rows
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[2][4] $data[2][5] $data[2][6] $data[2][7] $data[2][8] $data[2][9] $data[2][6] $data[2][11] $data[2][12] $data[2][13] $data[2][14] $data[2][15] $data[2][16] $data[2][17] $data[2][18] $data[2][19]
if $rows != 3 then
return -1
endi
if $data[2][19] != ready then
goto check_db_ready
endi
sql use db
$loop_cnt = 0
check_vg_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 300 then
print ====> vgroups not ready!
return -1
endi
sql show vgroups
print ===> rows: $rows
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] $data[0][7] $data[0][8] $data[0][9] $data[0][10] $data[0][11]
if $rows != $vgroups then
return -1
endi
if $data[0][4] == leader then
if $data[0][6] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][3]
endi
endi
elif $data[0][6] == leader then
if $data[0][4] == follower then
if $data[0][8] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][5]
endi
endi
elif $data[0][8] == leader then
if $data[0][4] == follower then
if $data[0][6] == follower then
print ---- vgroup $data[0][0] leader locate on dnode $data[0][7]
endi
endi
else
goto check_vg_ready
endi
vg_ready:
print ====> create stable/child table
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
sql create table ct1 using stb tags(1000)
print ===> write 100 records
$N = 100
$count = 0
while $count < $N
$ms = 1591200000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
#sql flush database db;
sleep 3000
print ===> stop dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
########################################################
print ===> start dnode1 dnode2 dnode3 dnode4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sleep 3000
print =============== query data
sql connect
sql use db
sql select * from ct1
print rows: $rows
print $data00 $data01 $data02
if $rows != 100 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
#########################################################
......@@ -210,10 +210,11 @@ class TDTestCase:
self.tag_check(i,k,tag_unint)
for error in [constant.INT_UN_MIN-1,constant.INT_UN_MAX+1]:
tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}')
elif v.lower() == 'bigint unsigned':
self.tag_check(i,k,tag_unbigint)
for error in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1]:
tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}')
#! bug TD-17106
# elif v.lower() == 'bigint unsigned':
# self.tag_check(i,k,tag_unbigint)
# for error in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1]:
# tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}')
elif v.lower() == 'bool':
self.tag_check(i,k,tag_bool)
elif v.lower() == 'float':
......@@ -223,7 +224,8 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult[0][0],tdSql.queryResult[0][0])
else:
tdLog.exit(f'select {k} from {self.stbname}_{i},data check failure')
# for error in [constant.FLOAT_MIN*10,constant.FLOAT_MAX*10]:
#! bug TD-17106
# for error in [constant.FLOAT_MIN*1.1,constant.FLOAT_MAX*1.1]:
# tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}')
elif v.lower() == 'double':
tdSql.execute(f'alter table {self.stbname}_{i} set tag {k} = {tag_double}')
......@@ -232,7 +234,7 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult[0][0],tdSql.queryResult[0][0])
else:
tdLog.exit(f'select {k} from {self.stbname}_{i},data check failure')
for error in [constant.DOUBLE_MIN-1,constant.DOUBLE_MAX+1]:
for error in [constant.DOUBLE_MIN*1.1,constant.DOUBLE_MAX*1.1]:
tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}')
elif 'binary' in v.lower():
tag_binary_error = tdCom.getLongName(self.binary_length+1)
......@@ -242,7 +244,8 @@ class TDTestCase:
tdSql.checkData(0,0,tag_binary)
elif 'nchar' in v.lower():
tag_nchar_error = tdCom.getLongName(self.nchar_length+1)
tdSql.execute(f'alter table {self.stbname}_{i} set tag {k} = "{tag_nchar_error}"')
tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = "{tag_nchar_error}"')
tdSql.execute(f'alter table {self.stbname}_{i} set tag {k} = "{tag_nchar}"')
tdSql.query(f'select {k} from {self.stbname}_{i}')
tdSql.checkData(0,0,tag_nchar)
......
......@@ -16,18 +16,18 @@ class TDTestCase:
self.rowNum = 10
self.ts = 1640966400000 # 2022-1-1 00:00:00.000
def check_customize_param_ms(self):
time_zone = time.strftime('%z')
tdSql.execute('create database db1 precision "ms"')
tdSql.execute('use db1')
tdSql.execute('create table if not exists ntb(ts timestamp, c1 int, c2 timestamp)')
for i in range(self.rowNum):
tdSql.execute("insert into ntb values(%d, %d, %d)"
tdSql.execute("insert into ntb values(%d, %d, %d)"
% (self.ts + i, i + 1, self.ts + i))
tdSql.query('select to_iso8601(ts) from ntb')
for i in range(self.rowNum):
tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{time_zone}')
timezone_list = ['+0000','+0100','+0200','+0300','+0330','+0400','+0500','+0530','+0600','+0700','+0800','+0900','+1000','+1100','+1200',\
'+00','+01','+02','+03','+04','+05','+06','+07','+08','+09','+10','+11','+12',\
'+00:00','+01:00','+02:00','+03:00','+03:30','+04:00','+05:00','+05:30','+06:00','+07:00','+08:00','+09:00','+10:00','+11:00','+12:00',\
......@@ -39,7 +39,7 @@ class TDTestCase:
tdSql.query(f'select to_iso8601(ts,"{j}") from ntb')
for i in range(self.rowNum):
tdSql.checkEqual(tdSql.queryResult[i][0],f'2022-01-01T00:00:00.00{i}{j}')
error_param_list = [0,100.5,'a','!']
for i in error_param_list:
tdSql.error(f'select to_iso8601(ts,"{i}") from ntb')
......@@ -47,7 +47,7 @@ class TDTestCase:
error_timezone_param = ['+13','-13','+1300','-1300','+0001','-0001','-0330','-0530']
for i in error_timezone_param:
tdSql.error(f'select to_iso8601(ts,"{i}") from ntb')
def check_base_function(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create tables==========")
......@@ -75,12 +75,12 @@ class TDTestCase:
tdSql.query("select to_iso8601(ts) from ntb")
tdSql.checkRows(3)
tdSql.query("select to_iso8601(ts) from db.ntb")
tdSql.query("select to_iso8601(today()) from ntb")
tdSql.checkRows(3)
tdSql.query("select to_iso8601(now()) from ntb")
tdSql.checkRows(3)
tdSql.error("select to_iso8601(timezone()) from ntb")
tdSql.error("select to_iso8601('abc') from ntb")
......@@ -104,7 +104,7 @@ class TDTestCase:
for i in err_param:
tdSql.error(f"select to_iso8601({i}) from ntb")
tdSql.error(f"select to_iso8601({i}) from db.ntb")
tdSql.query("select to_iso8601(now) from stb")
tdSql.checkRows(3)
tdSql.query("select to_iso8601(now()) from stb")
......@@ -126,7 +126,7 @@ class TDTestCase:
tdSql.query(f"select to_iso8601(today()) {i}null from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
def run(self): # sourcery skip: extract-duplicate-method
self.check_base_function()
self.check_customize_param_ms()
......
......@@ -58,7 +58,7 @@ class TDTestCase:
tag_sql += f"{k} {v},"
create_stb_sql = f'create table {stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})'
return create_stb_sql
def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb',precision = 'ms'):
for k,v in column_dict.items():
num_up = 0
......@@ -175,7 +175,7 @@ class TDTestCase:
tdSql.execute('drop database db')
def run(self): # sourcery skip: extract-duplicate-method
self.today_check_ntb()
self.today_check_stb_tb()
......
......@@ -47,7 +47,7 @@ class TDTestCase:
c9 = "'nchar_val'"
c10 = ts
tdSql.execute(f" insert into {tbname} values ({ts},{c1},{c2},{c3},{c4},{c5},{c6},{c7},{c8},{c9},{c10})")
tdSql.execute("use test")
tbnames = ["stb", "sub_tb_1"]
support_types = ["BIGINT", "SMALLINT", "TINYINT", "FLOAT", "DOUBLE", "INT"]
......@@ -60,7 +60,7 @@ class TDTestCase:
origin_sql = "select {} from {} order by tbname".format(colname, tbname)
if coltype[1] in support_types:
self.check_result_auto(origin_sql , abs_sql)
def prepare_datas(self):
tdSql.execute(
......
......@@ -47,7 +47,7 @@ class TDTestCase:
c9 = "'nchar_val'"
c10 = ts
tdSql.execute(f" insert into {tbname} values ({ts},{c1},{c2},{c3},{c4},{c5},{c6},{c7},{c8},{c9},{c10})")
tdSql.execute("use test")
tbnames = ["stb", "sub_tb_1"]
support_types = ["BIGINT", "SMALLINT", "TINYINT", "FLOAT", "DOUBLE", "INT"]
......@@ -62,7 +62,7 @@ class TDTestCase:
cols = random.sample(colnames,3)
self.check_function("&",False,tbname,cols[0],cols[1],cols[2])
self.check_function("|",False,tbname,cols[0],cols[1],cols[2])
def prepare_datas(self):
tdSql.execute(
......@@ -215,14 +215,14 @@ class TDTestCase:
"abs value check pass , it work as expected ,sql is \"%s\" " % abs_query)
def check_function(self, opera ,agg, tbname , *args):
if opera =="&":
pass
elif opera =="|":
pass
else:
pass
work_sql = " select "
work_sql = " select "
for ind , arg in enumerate(args):
if ind ==len(args)-1:
work_sql += f"cast({arg} as bigint) "
......@@ -235,7 +235,7 @@ class TDTestCase:
work_sql+= f" from {tbname} "
tdSql.query(work_sql)
work_result = tdSql.queryResult
origin_sql = " select "
for ind , arg in enumerate(args):
if ind ==len(args)-1:
......@@ -323,7 +323,7 @@ class TDTestCase:
tdSql.checkData(0,0,None)
tdSql.checkData(1,0,640)
tdSql.checkData(10,0,0)
# used for regular table
tdSql.query("select abs(c1)&c3&c3 from t1")
tdSql.checkData(0, 0, None)
......@@ -349,7 +349,7 @@ class TDTestCase:
self.check_function("&",False,"stb1","c1","floor(t1)","abs(c1+c2)","t1+1")
self.check_function("&",True,"stb1","max(c1)","min(floor(t1))","sum(abs(c1+c2))","last(t1)+1")
self.check_function("&",False,"stb1","abs(abs(abs(abs(abs(abs(abs(abs(abs(abs(c1))))))))))","floor(t1)","abs(c1+c2)","t1+1")
# mix with common col
tdSql.query("select c1&abs(c1)&c2&c3 ,c1,c2, t1 from ct1")
tdSql.checkData(0, 0, 8)
......@@ -388,7 +388,7 @@ class TDTestCase:
# agg functions mix with agg functions
tdSql.query("select sum(c1&abs(c1)&c2&c3) ,max(c5), count(c5) from stb1")
tdSql.query("select max(c1)&max(c2)|first(ts), count(c5) from ct1")
# bug fix for compute
......@@ -409,7 +409,7 @@ class TDTestCase:
tdSql.checkData(1, 2, 894.900000000)
def check_boundary_values(self):
......@@ -490,7 +490,7 @@ class TDTestCase:
self.check_function("&", False ,"ct4","123","abs(c1)","t1","abs(t2)","abs(t3)","abs(t4)","t5")
self.check_function("&", False ,"ct4","c1+2","abs(t2+2)","t3","abs(t4)","abs(t5)","abs(c1)","t5")
tdSql.query(" select sum(c1) from stb1 where t1+10 >1; ")
tdSql.query(" select sum(c1) from stb1 where t1+10 >1; ")
tdSql.query("select c1 ,t1 from stb1 where t1 =0 ")
tdSql.checkRows(13)
self.check_function("&", False ,"t1","c1+2","abs(c2)")
......@@ -534,7 +534,7 @@ class TDTestCase:
self.support_super_table_test()
self.insert_datas_and_check_abs(self.tb_nums,self.row_nums,self.time_step)
def stop(self):
tdSql.close()
......
......@@ -44,7 +44,7 @@ class TDTestCase:
'col12': f'binary({self.binary_length})',
'col13': f'nchar({self.nchar_length})'
}
self.tag_dict = {
'ts_tag' : 'timestamp',
't1': 'tinyint',
......@@ -79,9 +79,9 @@ class TDTestCase:
self.tag_values = [
f'{self.tag_ts},{self.tag_tinyint},{self.tag_smallint},{self.tag_int},{self.tag_bigint},\
{self.tag_utint},{self.tag_usint},{self.tag_uint},{self.tag_ubint},{self.tag_float},{self.tag_double},{self.tag_bool},"{self.binary_str}","{self.nchar_str}"'
]
self.percent = [1,50,100]
self.param_list = ['default','t-digest']
def insert_data(self,column_dict,tbname,row_num):
......@@ -90,7 +90,7 @@ class TDTestCase:
insert_list = []
self.setsql.insert_values(column_dict,i,insert_sql,insert_list,self.ts)
def function_check_ntb(self):
tdSql.prepare()
tdSql.execute(self.setsql.set_create_normaltable_sql(self.ntbname,self.column_dict))
......@@ -126,7 +126,7 @@ class TDTestCase:
def run(self):
self.function_check_ntb()
self.function_check_stb()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
......@@ -48,7 +48,7 @@ class TDTestCase:
'col12': 'binary(20)',
'col13': 'nchar(20)'
}
self.param_list = [1,100]
def insert_data(self,column_dict,tbname,row_num):
insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str)
......@@ -125,11 +125,11 @@ class TDTestCase:
self.bottom_check_data(f'{stbname}_{i}','child_table')
self.bottom_check_data(f'{stbname}','stable')
tdSql.execute(f'drop database {self.dbname}')
def run(self):
self.bottom_check_ntb()
self.bottom_check_stb()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -419,7 +419,7 @@ class TDTestCase:
tdSql.checkData(3,0,4)
tdSql.query("select csum(abs(c1))+2 from t1 ")
tdSql.checkRows(4)
def csum_support_stable(self):
tdSql.query(" select csum(1) from stb1 ")
tdSql.checkRows(70)
......@@ -434,17 +434,17 @@ class TDTestCase:
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(40)
# # bug need fix
# # bug need fix
# tdSql.query("select csum(st1+c1) from stb1 partition by tbname slimit 1 ")
# tdSql.checkRows(4)
# tdSql.error("select csum(st1+c1) from stb1 partition by tbname limit 1 ")
# bug need fix
# bug need fix
tdSql.query("select csum(st1+c1) from stb1 partition by tbname")
tdSql.checkRows(40)
tdSql.checkRows(40)
# bug need fix
# bug need fix
# tdSql.query("select tbname , csum(c1) from stb1 partition by tbname")
# tdSql.checkRows(40)
# tdSql.query("select tbname , csum(st1) from stb1 partition by tbname")
......@@ -452,7 +452,7 @@ class TDTestCase:
# tdSql.query("select tbname , csum(st1) from stb1 partition by tbname slimit 1")
# tdSql.checkRows(7)
# partition by tags
# partition by tags
# tdSql.query("select st1 , csum(c1) from stb1 partition by st1")
# tdSql.checkRows(40)
# tdSql.query("select csum(c1) from stb1 partition by st1")
......@@ -491,4 +491,4 @@ class TDTestCase:
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
tdCases.addLinux(__file__, TDTestCase())
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册