提交 811bdc2b 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0' into feature/3.0_debug_wxy

......@@ -55,8 +55,12 @@ extern int32_t tMsgDict[];
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INFO(TYPE) \
(((TYPE) >= 0 && (TYPE) < TDMT_MAX) ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] : 0)
#define TMSG_INFO(TYPE) \
((TYPE) >= 0 && \
((TYPE) < TDMT_DND_MAX_MSG | (TYPE) < TDMT_MND_MAX_MSG | (TYPE) < TDMT_VND_MAX_MSG | (TYPE) < TDMT_SCH_MAX_MSG | \
(TYPE) < TDMT_STREAM_MAX_MSG | (TYPE) < TDMT_MON_MAX_MSG | (TYPE) < TDMT_SYNC_MAX_MSG)) \
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
: 0
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
typedef uint16_t tmsg_t;
......
......@@ -82,6 +82,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
......@@ -164,6 +165,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
......@@ -198,6 +200,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL)
......@@ -209,6 +212,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
......@@ -217,6 +221,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG)
TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL)
......@@ -227,6 +232,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SYNC_MSG)
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL)
......@@ -251,6 +257,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
......
......@@ -279,7 +279,6 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
}
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
// pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
}
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
......
......@@ -665,8 +665,6 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
}
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
tscDebug("enter meta callback, code %s", tstrerror(code));
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery * pQuery = pWrapper->pQuery;
SRequestObj * pRequest = pWrapper->pRequest;
......@@ -686,10 +684,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
TSWAP(pRequest->tableList, (pQuery)->pTableList);
destorySqlParseWrapper(pWrapper);
tscDebug("0x%"PRIx64" analysis semantics completed, start async query, reqId:0x%"PRIx64, pRequest->self, pRequest->requestId);
launchAsyncQuery(pRequest, pQuery, pResultMeta);
} else {
destorySqlParseWrapper(pWrapper);
tscDebug("error happens, code:%d", code);
if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
......
......@@ -32,8 +32,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
pVnodes[num] = (*ppVnode);
num++;
pVnodes[num++] = (*ppVnode);
pIter = taosHashIterate(pMgmt->hash, pIter);
} else {
taosHashCancelIterate(pMgmt->hash, pIter);
......
......@@ -88,7 +88,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
dTrace("vgId:%d, vnode-fetch queue is empty", pVnode->vgId);
dTrace("vgId:%d, vnode queue is empty", pVnode->vgId);
vmFreeQueue(pMgmt, pVnode);
vnodeClose(pVnode->pImpl);
......@@ -140,7 +140,7 @@ static void *vmOpenVnodeInThread(void *param) {
}
static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (pMgmt->hash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
dError("failed to init vnode hash since %s", terrstr());
......@@ -156,7 +156,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
pMgmt->state.totalVnodes = numOfVnodes;
int32_t threadNum = 1;
int32_t threadNum = tsNumOfCores / 2;
if (threadNum < 1) threadNum = 1;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
......
......@@ -114,28 +114,6 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
}
}
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-merge queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr());
vmSendRsp(pMsg, code);
}
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
const STraceId *trace = &pMsg->info.traceId;
SMsgHead *pHead = pMsg->pCont;
......@@ -207,7 +185,11 @@ int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) return -1;
if (pMsg == NULL) {
rpcFreeCont(pMsg->pCont);
pRpc->pCont = NULL;
return -1;
}
SMsgHead *pHead = pRpc->pCont;
dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType));
......@@ -215,7 +197,16 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
return vmPutMsgToQueue(pMgmt, pMsg, qtype);
int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype);
if (code != 0) {
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pMsg->pCont);
pRpc->pCont = NULL;
}
return code;
}
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
......
......@@ -212,6 +212,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupClient(pDnode);
dmCleanupServer(pDnode);
dmClearVars(pDnode);
rpcCleanup();
dDebug("dnode is closed, ptr:%p", pDnode);
}
......
......@@ -71,9 +71,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans * pTrans = &pDnode->trans;
SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1;
SRpcMsg * pMsg = NULL;
SRpcMsg *pMsg = NULL;
SMgmtWrapper *pWrapper = NULL;
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
......@@ -185,6 +185,7 @@ _OVER:
taosFreeQitem(pMsg);
}
rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;
}
dmReleaseWrapper(pWrapper);
......@@ -195,11 +196,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SArray * pArray = (*pWrapper->func.getHandlesFp)();
SArray *pArray = (*pWrapper->func.getHandlesFp)();
if (pArray == NULL) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
if (pMgmt->needCheckVgId) {
pHandle->needCheckVgId = pMgmt->needCheckVgId;
......
......@@ -739,9 +739,12 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
}
int64_t vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
if (pDb->cfg.cacheLastRow > 0) {
vgroupMemroy += (int64_t)pDb->cfg.lastRowMem * 1024 * 1024;
int64_t vgroupMemroy = 0;
if (pDb != NULL) {
vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024;
if (pDb->cfg.cacheLastRow > 0) {
vgroupMemroy += (int64_t)pDb->cfg.lastRowMem * 1024 * 1024;
}
}
if (pDbInput == NULL) {
......
......@@ -548,19 +548,17 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
}
static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
SSDataBlock* pBlock = createDataBlock();
if (NULL == pBlock) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBlock->pDataBlock = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SColumnInfoData));
SNode* pProj = NULL;
FOREACH(pProj, pProjects) {
SColumnInfoData infoData = {0};
infoData.info.type = ((SExprNode*)pProj)->resType.type;
infoData.info.bytes = ((SExprNode*)pProj)->resType.bytes;
taosArrayPush(pBlock->pDataBlock, &infoData);
blockDataAppendColInfo(pBlock, &infoData);
}
*pOutput = pBlock;
return TSDB_CODE_SUCCESS;
......
......@@ -51,6 +51,13 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
#define START_TS_COLUMN_INDEX 0
#define END_TS_COLUMN_INDEX 1
#define UID_COLUMN_INDEX 2
#define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX
#define DELETE_GROUPID_COLUMN_INDEX 2
enum {
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED = 0x1u,
......@@ -364,6 +371,8 @@ typedef struct SStreamBlockScanInfo {
int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
} SStreamBlockScanInfo;
typedef struct SSysTableScanInfo {
......@@ -429,6 +438,10 @@ typedef struct SIntervalAggOperatorInfo {
bool invertible;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
bool ignoreExpiredData;
SArray* pRecycledPages;
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
} SIntervalAggOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo {
......@@ -451,6 +464,10 @@ typedef struct SStreamFinalIntervalOperatorInfo {
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool ignoreExpiredData;
SArray* pRecycledPages;
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
......
......@@ -807,18 +807,38 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
}
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
if (groupId) {
return *groupId;
}
return 0;
/* Todo(liuyao) for partition by column
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId);
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
uint64_t resId = 0;
uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
if (groupId) {
return *groupId;
} else if (len != 0) {
resId = calcGroupId(pTableScanInfo->keyBuf, len);
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t));
}
return resId;
*/
}
static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
ASSERT(rowIndex < pBlock->info.rows);
switch (pBlock->info.type)
{
case STREAM_DELETE_DATA:
case STREAM_RETRIEVE: {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
pInfo->groupId = groupCol[rowIndex];
}
break;
case STREAM_DELETE_DATA:
break;
default:
break;
}
......@@ -840,14 +860,14 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0;
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
getSessionTimeWindow(pAggSup, tsCols[*pRowIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win;
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL);
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else {
win =
getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL);
setGroupId(pInfo, pSDB, 2, *pRowIndex);
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL,
getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
}
needRead = true;
......@@ -891,27 +911,6 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow
dest->info.rows++;
}
static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t rowId) {
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupId) {
return *groupId;
}
return 0;
/* Todo(liuyao) for partition by column
recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId);
int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals);
uint64_t resId = 0;
uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len);
if (groupId) {
return *groupId;
} else if (len != 0) {
resId = calcGroupId(pTableScanInfo->keyBuf, len);
taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t));
}
return resId;
*/
}
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) {
SSDataBlock* pResult = NULL;
......@@ -935,7 +934,7 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
blockDataCleanup(pResult);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock->info.uid);
if (id == pInfo->groupId) {
copyOneRow(pResult, pBlock, i);
}
......@@ -944,6 +943,40 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i
*/
}
static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) {
if (pDelBlock->info.rows == 0) {
return;
}
blockDataCleanup(pUpdateRes);
blockDataEnsureCapacity(pUpdateRes, 64);
ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, DELETE_GROUPID_COLUMN_INDEX);
for (int32_t i = pInfo->deleteDataIndex ; i < pDelBlock->info.rows &&
i < pDelBlock->info.capacity - (endData[i] - startData[i])/pInfo->interval.interval - 1; i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]);
for (TSKEY startTs = startData[i]; startTs <= endData[i]; ) {
colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false);
colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false);
pUpdateRes->info.rows++;
startTs = taosTimeAdd(startTs, pInfo->interval.interval, pInfo->interval.intervalUnit, pInfo->interval.precision);
}
pInfo->deleteDataIndex++;
}
if (pInfo->deleteDataIndex > 0 && pInfo->deleteDataIndex == pDelBlock->info.rows) {
blockDataCleanup(pDelBlock);
pInfo->deleteDataIndex = 0;
}
}
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
blockDataCleanup(pUpdateBlock);
int32_t size = taosArrayGetSize(pInfo->tsArray);
......@@ -953,11 +986,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
blockDataEnsureCapacity(pUpdateBlock, size);
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid);
int32_t i = 0;
for (; i < size; i++) {
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId);
uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid);
if (pInfo->groupId != id) {
break;
}
......@@ -974,28 +1007,32 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
if (size > 0 && pInfo->tsArrayIndex == size) {
taosArrayClear(pInfo->tsArray);
}
if (size == 0) {
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock);
}
}
static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
SSDataBlock* pUpdateBlock) {
static void checkUpdateData(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
bool out) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId])) {
if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId]) && out) {
taosArrayPush(pInfo->tsArray, &rowId);
}
}
if (!pUpdateBlock) {
taosArrayClear(pInfo->tsArray);
return;
}
static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t uidColIndex) {
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex);
uint64_t* uidCol = (uint64_t*)pColDataInfo->pData;
ASSERT(pBlock->info.rows > 0);
for (int32_t i = 0 ; i < pBlock->info.rows; i++) {
uidCol[i] = getGroupId(pOperator, uidCol[i]);
}
setUpdateData(pInfo, pBlock, pUpdateBlock);
// Todo(liuyao) get from tsdb
// SSDataBlock* p = createOneDataBlock(pBlock, true);
// p->info.type = STREAM_INVERT;
// taosArrayClear(pInfo->tsArray);
// return p;
}
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
......@@ -1020,13 +1057,29 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
int32_t current = pInfo->validBlockIndex++;
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
blockDataUpdateTsWindow(pBlock, 0);
if (pBlock->info.type == STREAM_RETRIEVE) {
switch (pBlock->info.type) {
case STREAM_RETRIEVE:{
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
copyDataBlock(pInfo->pPullDataRes, pBlock);
pInfo->pullDataResIndex = 0;
prepareDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex);
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
}
break;
case STREAM_DELETE_DATA: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes);
pInfo->updateResIndex = 0;
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
return pInfo->pUpdateRes;
}
break;
default:
break;
}
return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
......@@ -1043,39 +1096,33 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex);
if (pSDB != NULL) {
getUpdateDataBlock(pInfo, true, pSDB, NULL);
checkUpdateData(pInfo, true, pSDB, false);
pSDB->info.type = STREAM_PULL_DATA;
return pSDB;
}
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else {
if (isStateWindow(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
if (!prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
}
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
pSDB->info.type = STREAM_NORMAL;
checkUpdateData(pInfo, true, pSDB, false);
return pSDB;
}
if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) {
SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB == NULL) {
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) {
if (!isStateWindow(pInfo)) {
// Todo(liuyao) mybe can delete this.
bool test = prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
ASSERT(test == false);
}
return pInfo->pUpdateRes;
} else {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
}
} else {
pSDB->info.type = STREAM_NORMAL;
getUpdateDataBlock(pInfo, true, pSDB, NULL);
return pSDB;
}
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) {
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
return pInfo->pUpdateRes;
}
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else if (isStateWindow(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
if (prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
ASSERT(pInfo->pUpdateRes->info.rows == 0);
// return empty data blcok
return pInfo->pUpdateRes;
}
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
}
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
......@@ -1169,7 +1216,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
} else if (pInfo->pUpdateInfo) {
pInfo->tsArrayIndex = 0;
getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes);
checkUpdateData(pInfo, true, pInfo->pRes, true);
setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes);
if (pInfo->pUpdateRes->info.rows > 0) {
if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) {
pInfo->updateResIndex = 0;
......@@ -1180,9 +1228,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
}
}
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
// check reader last status
// if not match, reset status
......@@ -1295,6 +1341,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->groupId = 0;
pInfo->pPullDataRes = createPullDataBlock();
pInfo->pStreamScanOp = pOperator;
pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createPullDataBlock();
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......@@ -1748,8 +1796,8 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
getPerfDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
pInfo->pRes->info.rows = p->info.rows;
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false);
blockDataDestroy(p);
return pInfo->pRes->info.rows;
......
......@@ -48,13 +48,19 @@ static int32_t validateTimeUnitParam(uint8_t dbPrec, const SValueNode* pVal) {
return TIME_UNIT_INVALID;
}
if (TSDB_TIME_PRECISION_MILLI == dbPrec && 0 == strcasecmp(pVal->literal, "1u")) {
if (TSDB_TIME_PRECISION_MILLI == dbPrec && (0 == strcasecmp(pVal->literal, "1u") ||
0 == strcasecmp(pVal->literal, "1b"))) {
return TIME_UNIT_TOO_SMALL;
}
if (pVal->literal[0] != '1' ||
(pVal->literal[1] != 'u' && pVal->literal[1] != 'a' && pVal->literal[1] != 's' && pVal->literal[1] != 'm' &&
pVal->literal[1] != 'h' && pVal->literal[1] != 'd' && pVal->literal[1] != 'w')) {
if (TSDB_TIME_PRECISION_MICRO == dbPrec && 0 == strcasecmp(pVal->literal, "1b")) {
return TIME_UNIT_TOO_SMALL;
}
if (pVal->literal[0] != '1' || (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' &&
pVal->literal[1] != 's' && pVal->literal[1] != 'm' &&
pVal->literal[1] != 'h' && pVal->literal[1] != 'd' &&
pVal->literal[1] != 'w' && pVal->literal[1] != 'b')) {
return TIME_UNIT_INVALID;
}
......@@ -700,9 +706,8 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"ELAPSED function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"ELAPSED function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"ELAPSED function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
}
......@@ -1223,9 +1228,8 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"STATEDURATION function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"STATEDURATION function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"STATEDURATION function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
}
......@@ -1735,9 +1739,8 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMETRUNCATE function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMETRUNCATE function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
......@@ -1775,9 +1778,8 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMEDIFF function time unit parameter should be greater than db precision");
} else if (ret == TIME_UNIT_INVALID) {
return buildFuncErrMsg(
pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMEDIFF function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]");
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TIMEDIFF function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]");
}
}
......
......@@ -490,18 +490,7 @@ static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicN
}
static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) {
switch (nodeType(pChild)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
return pushDownCondOptPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond);
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return pushDownCondOptPushCondToProject(pCxt, (SProjectLogicNode*)pChild, pCond);
case QUERY_NODE_LOGIC_PLAN_JOIN:
return pushDownCondOptPushCondToJoin(pCxt, (SJoinLogicNode*)pChild, pCond);
default:
break;
}
planError("pushDownCondOptPushCondToChild failed, invalid logic plan node %s", nodesNodeName(nodeType(pChild)));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
return pushDownCondOptAppendCond(&pChild->pConditions, pCond);
}
static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) {
......@@ -802,10 +791,7 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
return TSDB_CODE_SUCCESS;
}
// TODO: remove it after full implementation of pushing down to child
if (1 != LIST_LENGTH(pAgg->node.pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0)) &&
QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(nodesListGetNode(pAgg->node.pChildren, 0)) &&
QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) {
if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
return TSDB_CODE_SUCCESS;
}
......@@ -832,6 +818,77 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
return code;
}
typedef struct SRewriteProjCondContext {
SProjectLogicNode* pProj;
int32_t errCode;
}SRewriteProjCondContext;
static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext) {
SRewriteProjCondContext* pCxt = pContext;
SProjectLogicNode* pProj = pCxt->pProj;
if (QUERY_NODE_COLUMN == nodeType(*ppNode)) {
SNode* pTarget = NULL;
FOREACH(pTarget, pProj->node.pTargets) {
if (nodesEqualNode(pTarget, *ppNode)) {
SNode* pProjection = NULL;
FOREACH(pProjection, pProj->pProjections) {
if (0 == strcmp(((SExprNode*)pProjection)->aliasName, ((SColumnNode*)(*ppNode))->colName)) {
SNode* pExpr = nodesCloneNode(pProjection);
if (pExpr == NULL) {
pCxt->errCode = terrno;
return DEAL_RES_ERROR;
}
nodesDestroyNode(*ppNode);
*ppNode = pExpr;
} // end if expr alias name equal column name
} // end for each project
} // end if target node equals cond column node
} // end for each targets
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** ppProjectCond) {
SRewriteProjCondContext cxt = {.pProj = pProject, .errCode = TSDB_CODE_SUCCESS};
SNode* pProjectCond = pProject->node.pConditions;
nodesRewriteExpr(&pProjectCond, rewriteProjectCondForPushDownImpl, &cxt);
*ppProjectCond = pProjectCond;
pProject->node.pConditions = NULL;
return cxt.errCode;
}
static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicNode* pProject) {
if (NULL == pProject->node.pConditions ||
OPTIMIZE_FLAG_TEST_MASK(pProject->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS;
}
// TODO: remove it after full implementation of pushing down to child
if (1 != LIST_LENGTH(pProject->node.pChildren)) {
return TSDB_CODE_SUCCESS;
}
if (NULL != pProject->node.pLimit || NULL != pProject->node.pSlimit) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SNode* pProjCond = NULL;
code = rewriteProjectCondForPushDown(pCxt, pProject, &pProjCond);
if (TSDB_CODE_SUCCESS == code) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
code = pushDownCondOptPushCondToChild(pCxt, pChild, &pProjCond);
}
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pProject->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
pCxt->optimized = true;
} else {
nodesDestroyNode(pProjCond);
}
return code;
}
static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pLogicNode)) {
......@@ -844,6 +901,9 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog
case QUERY_NODE_LOGIC_PLAN_AGG:
code = pushDownCondOptDealAgg(pCxt, (SAggLogicNode*)pLogicNode);
break;
case QUERY_NODE_LOGIC_PLAN_PROJECT:
code = pushDownCondOptDealProject(pCxt, (SProjectLogicNode*)pLogicNode);
break;
default:
break;
}
......
......@@ -81,3 +81,8 @@ TEST_F(PlanOptimizeTest, eliminateProjection) {
run("SELECT c1 FROM st1s3");
// run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first");
}
TEST_F(PlanOptimizeTest, pushDownProjectCond) {
useDb("root", "test");
run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) where 1-c1>5 order by 1 nulls first");
}
\ No newline at end of file
......@@ -1174,7 +1174,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
timeUnit = timeUnit * 1000 / factor;
int64_t unit = timeUnit * 1000 / factor;
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
if (colDataIsNull_s(pInput[0].columnData, i)) {
......@@ -1209,12 +1209,14 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
NUM_TO_STRING(TSDB_DATA_TYPE_BIGINT, &timeVal, sizeof(buf), buf);
int32_t tsDigits = (int32_t)strlen(buf);
switch (timeUnit) {
case 0: { /* 1u */
switch (unit) {
case 0: { /* 1u or 1b */
if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
timeVal = timeVal / 1000 * 1000;
//} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
// //timeVal = timeVal / 1000;
if (timePrec == TSDB_TIME_PRECISION_NANO && timeUnit == 1) {
timeVal = timeVal * 1;
} else {
timeVal = timeVal / 1000 * 1000;
}
} else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
timeVal = timeVal * factor;
} else {
......@@ -1366,8 +1368,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
timeUnit = timeUnit * 1000 / factor;
int32_t numOfRows = 0;
for (int32_t i = 0; i < inputNum; ++i) {
if (pInput[i].numOfRows > numOfRows) {
......@@ -1447,9 +1447,14 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
}
}
} else {
switch(timeUnit) {
case 0: { /* 1u */
result = result / 1000;
int64_t unit = timeUnit * 1000 / factor;
switch(unit) {
case 0: { /* 1u or 1b */
if (timePrec == TSDB_TIME_PRECISION_NANO && timeUnit == 1) {
result = result / 1;
} else {
result = result / 1000;
}
break;
}
case 1: { /* 1a */
......
......@@ -96,8 +96,8 @@ typedef void* queue[2];
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout
typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx;
......@@ -180,18 +180,18 @@ typedef enum { Normal, Quit, Release, Register, Update } STransMsgType;
typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define rpcIsReq(type) (type & 1U)
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
#define transLabel(trans) ((STrans*)trans)->label
......
......@@ -241,18 +241,19 @@ static void uvHandleReq(SSvrConn* pConn) {
tDebug("conn %p acquired by server app", pConn);
}
}
STrans* pTransInst = pConn->pTransInst;
STraceId* trace = &pHead->traceId;
if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn);
tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pConn), pConn,
tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen);
} else {
tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", transLabel(pConn),
pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp,
transMsg.code);
tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port),
transMsg.contLen, pHead->noResp, transMsg.code);
// no ref here
}
......@@ -265,8 +266,8 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId;
tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pConn), transMsg.info.handle, pConn,
pConn->refId);
tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pTransInst), transMsg.info.handle,
pConn, pConn->refId);
assert(transMsg.info.handle != NULL);
if (pHead->noResp == 1) {
......@@ -281,7 +282,6 @@ static void uvHandleReq(SSvrConn* pConn) {
transReleaseExHandle(transGetRefMgt(), pConn->refId);
STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
}
......@@ -290,14 +290,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
SSvrConn* conn = cli->data;
SConnBuffer* pBuf = &conn->readBuf;
STrans* pTransInst = conn->pTransInst;
if (nread > 0) {
pBuf->len += nread;
tTrace("%s conn %p total read: %d, current read: %d", transLabel(conn->pTransInst), conn, pBuf->len, (int)nread);
tTrace("%s conn %p total read: %d, current read: %d", transLabel(pTransInst), conn, pBuf->len, (int)nread);
if (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(conn->pTransInst), conn);
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
uvHandleReq(conn);
} else {
tTrace("%s conn %p read partial packet, continue to read", transLabel(conn->pTransInst), conn);
tTrace("%s conn %p read partial packet, continue to read", transLabel(pTransInst), conn);
}
return;
}
......@@ -305,12 +306,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return;
}
tError("%s conn %p read error: %s", transLabel(conn->pTransInst), conn, uv_err_name(nread));
tError("%s conn %p read error: %s", transLabel(pTransInst), conn, uv_err_name(nread));
if (nread < 0) {
conn->broken = true;
if (conn->status == ConnAcquire) {
if (conn->regArg.init) {
tTrace("%s conn %p broken, notify server app", transLabel(conn->pTransInst), conn);
tTrace("%s conn %p broken, notify server app", transLabel(pTransInst), conn);
STrans* pTransInst = conn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg));
......@@ -414,8 +415,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
char* msg = (char*)pHead;
int32_t len = transMsgLenFromCont(pMsg->contLen);
STrans* pTransInst = pConn->pTransInst;
STraceId* trace = &pMsg->info.traceId;
tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pConn->pTransInst), pConn,
tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pTransInst), pConn,
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len);
pHead->msgLen = htonl(len);
......@@ -761,9 +763,10 @@ static SSvrConn* createConn(void* hThrd) {
exh->refId = transAddExHandle(transGetRefMgt(), exh);
transAcquireExHandle(transGetRefMgt(), exh->refId);
STrans* pTransInst = pThrd->pTransInst;
pConn->refId = exh->refId;
transRefSrvHandle(pConn);
tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pThrd->pTransInst), exh, pConn, pConn->refId);
tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pTransInst), exh, pConn, pConn->refId);
return pConn;
}
......@@ -812,7 +815,13 @@ static void uvDestroyConn(uv_handle_t* handle) {
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn);
STrans* pTransInst = thrd->pTransInst;
tDebug("%s conn %p destroy", transLabel(pTransInst), conn);
for (int i = 0; i < transQueueSize(&conn->srvMsgs); i++) {
SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i);
destroySmsg(msg);
}
transQueueDestroy(&conn->srvMsgs);
QUEUE_REMOVE(&conn->queue);
......@@ -1103,7 +1112,8 @@ void transRegisterMsg(const STransMsg* msg) {
m->msg = tmsg;
m->type = Register;
tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle);
STrans* pTransInst = pThrd->pTransInst;
tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId);
return;
......
......@@ -637,6 +637,7 @@ void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) {
SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn);
taosMemoryFreeClear(ppi->pData);
taosMemoryFreeClear(pNode);
ppi->pn = NULL;
tdListAppend(pBuf->freePgList, &ppi);
}
......
......@@ -24,7 +24,8 @@ class TDTestCase:
]
self.db_param_precision = ['ms','us','ns']
self.time_unit = ['1w','1d','1h','1m','1s','1a','1u']
self.error_unit = ['1b','2w','2d','2h','2m','2s','2a','2u','1c','#1']
#self.error_unit = ['1b','2w','2d','2h','2m','2s','2a','2u','1c','#1']
self.error_unit = ['2w','2d','2h','2m','2s','2a','2u','1c','#1']
self.ntbname = 'ntb'
self.stbname = 'stb'
self.ctbname = 'ctb'
......@@ -92,7 +93,7 @@ class TDTestCase:
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24)*24*60*60*1000)
tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24)*24*60*60*1000)
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0]))
......@@ -121,7 +122,7 @@ class TDTestCase:
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24)*24*60*60*1000*1000 )
tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24)*24*60*60*1000*1000 )
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0]))
......@@ -144,21 +145,21 @@ class TDTestCase:
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60)*60*60*1000*1000*1000 )
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24)*24*60*60*1000*1000*1000 )
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24)*24*60*60*1000*1000*1000 )
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24/7)*7*24*60*60*1000*1000*1000)
def data_check(self,date_time,precision,tb_type):
for unit in self.time_unit:
if (unit.lower() == '1u' and precision.lower() == 'ms') or () :
if tb_type.lower() == 'ntb':
if tb_type.lower() == 'ntb':
tdSql.error(f'select timetruncate(ts,{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.error(f'select timetruncate(ts,{unit}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.error(f'select timetruncate(ts,{unit}) from {self.stbname}')
elif precision.lower() == 'ms':
if tb_type.lower() == 'ntb':
if tb_type.lower() == 'ntb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}')
......@@ -167,7 +168,7 @@ class TDTestCase:
tdSql.checkRows(len(self.ts_str))
self.check_ms_timestamp(unit,date_time)
elif precision.lower() == 'us':
if tb_type.lower() == 'ntb':
if tb_type.lower() == 'ntb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}')
......@@ -176,7 +177,7 @@ class TDTestCase:
tdSql.checkRows(len(self.ts_str))
self.check_us_timestamp(unit,date_time)
elif precision.lower() == 'ns':
if tb_type.lower() == 'ntb':
if tb_type.lower() == 'ntb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}')
......
......@@ -21,7 +21,7 @@ python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
python3 ./test.py -f 1-insert/alter_stable.py
python3 ./test.py -f 1-insert/alter_table.py
#python3 ./test.py -f 1-insert/alter_table.py
python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
python3 ./test.py -f 1-insert/table_comment.py
python3 ./test.py -f 1-insert/time_range_wise.py
......@@ -118,7 +118,7 @@ python3 ./test.py -f 2-query/twa.py
python3 ./test.py -f 2-query/irate.py
python3 ./test.py -f 2-query/function_null.py
python3 ./test.py -f 2-query/queryQnode.py
#python3 ./test.py -f 2-query/queryQnode.py
#python3 ./test.py -f 6-cluster/5dnode1mnode.py
#python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
......
......@@ -6,7 +6,7 @@ python3 .\test.py -f 0-others\telemetry.py
python3 .\test.py -f 0-others\taosdMonitor.py
python3 .\test.py -f 0-others\udfTest.py
python3 .\test.py -f 0-others\udf_create.py
python3 .\test.py -f 0-others\udf_restart_taosd.py
@REM python3 .\test.py -f 0-others\udf_restart_taosd.py
@REM python3 .\test.py -f 0-others\cachelast.py
@REM python3 .\test.py -f 0-others\user_control.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册