提交 15127f3a 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into vnode_refact_merge

......@@ -48,7 +48,6 @@ int32_t init_env() {
return -1;
}
taos_free_result(pRes);
taosSsleep(1);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
......
......@@ -27,10 +27,7 @@ typedef void TAOS;
typedef void TAOS_STMT;
typedef void TAOS_RES;
typedef void **TAOS_ROW;
#if 0
typedef void TAOS_STREAM;
#endif
typedef void TAOS_SUB;
typedef void TAOS_SUB;
// Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
......@@ -196,12 +193,6 @@ DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
#endif
#if 0
DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
#endif
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
......@@ -241,12 +232,8 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
#endif
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
......@@ -273,7 +260,7 @@ DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO
#if 0
DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res);
DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
#endif
#if 0
......
......@@ -99,6 +99,15 @@ typedef struct SColumnInfoData {
};
} SColumnInfoData;
typedef struct SQueryTableDataCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
} SQueryTableDataCond;
void* blockDataDestroy(SSDataBlock* pBlock);
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
......@@ -229,7 +238,6 @@ typedef struct SResSchame {
char name[TSDB_COL_NAME_LEN];
} SResSchema;
// TODO move away to executor.h
typedef struct SExprBasicInfo {
SResSchema resSchema;
int16_t numOfParams; // argument value of each function
......
......@@ -221,7 +221,7 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw);
* @param pRaw The raw data.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw);
int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw);
/**
* @brief Acquire a row from sdb
......
......@@ -202,7 +202,7 @@ typedef struct SqlFunctionCtx {
SPoint1 end;
SFuncExecFuncs fpSet;
SScalarFuncExecFuncs sfp;
SExprInfo *pExpr;
struct SExprInfo *pExpr;
struct SDiskbasedBuf *pBuf;
struct SSDataBlock *pSrcBlock;
int32_t curBufPage;
......
......@@ -158,6 +158,8 @@ typedef enum {
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart();
extern int32_t sDebugFlag;
//-----------------------------------------
......
......@@ -139,16 +139,9 @@ int32_t* taosGetErrno();
// mnode-common
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301)
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0304)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0305)
#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0306)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0307)
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0308)
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0309)
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x030A)
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030B)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0304)
// mnode-show
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310)
......
......@@ -56,7 +56,7 @@ struct tmq_conf_t {
int8_t autoCommit;
int8_t resetOffset;
uint16_t port;
uint16_t autoCommitInterval;
int32_t autoCommitInterval;
char* ip;
char* user;
char* pass;
......@@ -76,17 +76,25 @@ struct tmq_t {
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t autoCommit;
int64_t consumerId;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
int64_t consumerId;
tmq_commit_cb* commit_cb;
// status
int8_t status;
int8_t epStatus;
int32_t epoch;
#if 0
int8_t epStatus;
int32_t epSkipCnt;
#endif
int64_t pollCnt;
// timer
tmr_h hbTimer;
tmr_h reportTimer;
tmr_h commitTimer;
// connection
STscObj* pTscObj;
......@@ -111,6 +119,12 @@ enum {
TMQ_CONSUMER_STATUS__READY,
};
enum {
TMQ_DELAYED_TASK__HB = 1,
TMQ_DELAYED_TASK__REPORT,
TMQ_DELAYED_TASK__COMMIT,
};
typedef struct {
// statistics
int64_t pollCnt;
......@@ -280,6 +294,50 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg);
}
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
*pTaskType = TMQ_DELAYED_TASK__HB;
taosWriteQitem(tmq->delayedTask, pTaskType);
}
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
taosWriteQitem(tmq->delayedTask, pTaskType);
}
void tmqAssignDelayedReportTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t));
*pTaskType = TMQ_DELAYED_TASK__REPORT;
taosWriteQitem(tmq->delayedTask, pTaskType);
}
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
STaosQall* qall = taosAllocateQall();
taosReadAllQitems(tmq->delayedTask, qall);
while (1) {
int8_t* pTaskType = NULL;
taosGetQitem(qall, (void**)&pTaskType);
if (pTaskType == NULL) break;
if (*pTaskType == TMQ_DELAYED_TASK__HB) {
tmqAskEp(tmq, false);
taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit(tmq, NULL, true);
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
} else {
ASSERT(0);
}
}
taosFreeQall(qall);
return 0;
}
void tmqClearUnhandleMsg(tmq_t* tmq) {
SMqRspWrapper* msg = NULL;
while (1) {
......@@ -408,13 +466,15 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
pTmq->epStatus = 0;
pTmq->epSkipCnt = 0;
/*pTmq->epStatus = 0;*/
/*pTmq->epSkipCnt = 0;*/
// set conf
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
pTmq->autoCommit = conf->autoCommit;
/*pTmq->autoCommit = conf->autoCommit;*/
pTmq->autoCommit = 0;
pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset;
......@@ -607,6 +667,14 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
taosMsleep(500);
}
// init hb timer
tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
// init auto commit timer
if (tmq->autoCommit) {
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
}
code = 0;
FAIL:
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
......@@ -909,7 +977,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
END:
atomic_store_8(&tmq->epStatus, 0);
/*atomic_store_8(&tmq->epStatus, 0);*/
if (pParam->sync) {
tsem_post(&pParam->rspSem);
}
......@@ -918,6 +986,7 @@ END:
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int32_t code = 0;
#if 0
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
......@@ -925,11 +994,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
if (epSkipCnt < 5000) return 0;
}
atomic_store_32(&tmq->epSkipCnt, 0);
#endif
int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
if (req == NULL) {
tscError("failed to malloc get subscribe ep buf");
atomic_store_8(&tmq->epStatus, 0);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1;
}
req->consumerId = htobe64(tmq->consumerId);
......@@ -940,7 +1010,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
if (pParam == NULL) {
tscError("failed to malloc subscribe param");
taosMemoryFree(req);
atomic_store_8(&tmq->epStatus, 0);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1;
}
pParam->tmq = tmq;
......@@ -952,7 +1022,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
taosMemoryFree(req);
atomic_store_8(&tmq->epStatus, 0);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1;
}
......@@ -1216,7 +1286,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
}
while (1) {
tmqAskEp(tmq, false);
tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, blocking_time);
/*tsem_wait(&tmq->rspSem);*/
......
......@@ -79,7 +79,11 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.length;
} else {
return pColumnInfoData->info.bytes * numOfRows;
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
return 0;
} else {
return pColumnInfoData->info.bytes * numOfRows;
}
}
}
......
......@@ -68,6 +68,7 @@ static void dmSetSignalHandle() {
static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
int32_t cmdEnvIndex = 0;
if (argc < 2) return 0;
global.envCmd = taosMemoryMalloc(argc-1);
memset(global.envCmd, 0, argc-1);
for (int32_t i = 1; i < argc; ++i) {
......
......@@ -126,6 +126,8 @@ typedef enum {
DND_REASON_OTHERS
} EDndReason;
typedef void (*TransCbFp)(SMnode* pMnode, void* param);
typedef struct {
int32_t id;
ETrnStage stage;
......@@ -148,6 +150,8 @@ typedef struct {
int64_t dbUid;
char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_ERROR_LEN];
TransCbFp transCbFp;
void* transCbParam;
} STrans;
typedef struct {
......
......@@ -44,6 +44,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, TransCbFp fp, void *param);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
......
......@@ -17,8 +17,8 @@
#include "mndAcct.h"
#include "mndShow.h"
#define TSDB_ACCT_VER_NUMBER 1
#define TSDB_ACCT_RESERVE_SIZE 128
#define ACCT_VER_NUMBER 1
#define ACCT_RESERVE_SIZE 128
static int32_t mndCreateDefaultAcct(SMnode *pMnode);
static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct);
......@@ -55,6 +55,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
acctObj.createdTime = taosGetTimestampMs();
acctObj.updateTime = acctObj.createdTime;
acctObj.acctId = 1;
acctObj.status = 0;
acctObj.cfg = (SAcctCfg){.maxUsers = INT32_MAX,
.maxDbs = INT32_MAX,
.maxStbs = INT32_MAX,
......@@ -79,7 +80,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, TSDB_ACCT_VER_NUMBER, sizeof(SAcctObj) + TSDB_ACCT_RESERVE_SIZE);
SSdbRaw *pRaw = sdbAllocRaw(SDB_ACCT, ACCT_VER_NUMBER, sizeof(SAcctObj) + ACCT_RESERVE_SIZE);
if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -100,7 +101,7 @@ static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) {
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.maxTopics, _OVER)
SDB_SET_INT64(pRaw, dataPos, pAcct->cfg.maxStorage, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAcct->cfg.accessState, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_ACCT_RESERVE_SIZE, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, ACCT_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
terrno = 0;
......@@ -122,7 +123,7 @@ static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_ACCT_VER_NUMBER) {
if (sver != ACCT_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
......@@ -151,7 +152,7 @@ static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.maxTopics, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pAcct->cfg.maxStorage, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pAcct->cfg.accessState, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_ACCT_RESERVE_SIZE, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, ACCT_RESERVE_SIZE, _OVER)
terrno = 0;
......@@ -178,7 +179,6 @@ static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) {
static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) {
mTrace("acct:%s, perform update action, old row:%p new row:%p", pOld->acct, pOld, pNew);
pOld->updateTime = pNew->updateTime;
pOld->status = pNew->status;
memcpy(&pOld->cfg, &pNew->cfg, sizeof(SAcctCfg));
......@@ -186,19 +186,19 @@ static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) {
}
static int32_t mndProcessCreateAcctReq(SNodeMsg *pReq) {
terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED;
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("failed to process create acct request since %s", terrstr());
return -1;
}
static int32_t mndProcessAlterAcctReq(SNodeMsg *pReq) {
terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED;
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("failed to process create acct request since %s", terrstr());
return -1;
}
static int32_t mndProcessDropAcctReq(SNodeMsg *pReq) {
terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED;
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("failed to process create acct request since %s", terrstr());
return -1;
}
\ No newline at end of file
......@@ -620,7 +620,7 @@ static int32_t mndSetAlterDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOl
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
......
......@@ -363,7 +363,7 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
}
mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion);
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
goto PROCESS_STATUS_MSG_OVER;
}
......
......@@ -72,7 +72,7 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
}
mTrace("wal:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
if (sdbWriteNotFree(pSdb, (void *)pHead->head.body) < 0) {
if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
mError("failed to read wal from sdb since %s, ver:%" PRId64, terrstr(), ver);
goto WAL_RESTORE_OVER;
}
......
......@@ -193,9 +193,9 @@ TRANS_ENCODE_OVER:
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow * pRow = NULL;
STrans * pTrans = NULL;
char * pData = NULL;
SSdbRow *pRow = NULL;
STrans *pTrans = NULL;
char *pData = NULL;
int32_t dataLen = 0;
int8_t sver = 0;
int32_t redoLogNum = 0;
......@@ -456,7 +456,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
}
static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
if (pTrans == NULL) {
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
......@@ -574,6 +574,11 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) {
pTrans->rpcRspLen = contLen;
}
void mndTransSetCb(STrans *pTrans, TransCbFp fp, void *param) {
pTrans->transCbFp = fp;
pTrans->transCbParam = param;
}
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
pTrans->dbUid = pDb->uid;
memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN);
......@@ -626,7 +631,7 @@ static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewT
if (mndIsBasicTrans(pNewTrans)) return 0;
STrans *pTrans = NULL;
void * pIter = NULL;
void *pIter = NULL;
int32_t code = 0;
while (1) {
......@@ -707,6 +712,8 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
pNew->rpcRefId = pTrans->rpcRefId;
pNew->rpcRsp = pTrans->rpcRsp;
pNew->rpcRspLen = pTrans->rpcRspLen;
pNew->transCbFp = pTrans->transCbFp;
pNew->transCbParam = pTrans->transCbParam;
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
......@@ -830,14 +837,14 @@ HANDLE_ACTION_RSP_OVER:
}
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
int32_t arraySize = taosArrayGetSize(pArray);
if (arraySize == 0) return 0;
for (int32_t i = 0; i < arraySize; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
int32_t code = sdbWriteNotFree(pSdb, pRaw);
int32_t code = sdbWriteWithoutFree(pSdb, pRaw);
if (code != 0) {
return code;
}
......@@ -1117,6 +1124,11 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
}
mDebug("trans:%d, finished, code:0x%04x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
if (pTrans->transCbFp != NULL) {
(*pTrans->transCbFp)(pMnode, pTrans->transCbParam);
}
return continueExec;
}
......@@ -1205,11 +1217,11 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
}
static int32_t mndProcessKillTransReq(SNodeMsg *pReq) {
SMnode * pMnode = pReq->pNode;
SMnode *pMnode = pReq->pNode;
SKillTransReq killReq = {0};
int32_t code = -1;
SUserObj * pUser = NULL;
STrans * pTrans = NULL;
SUserObj *pUser = NULL;
STrans *pTrans = NULL;
if (tDeserializeSKillTransReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -1249,7 +1261,7 @@ KILL_OVER:
void mndTransPullup(SMnode *pMnode) {
STrans *pTrans = NULL;
void * pIter = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
......@@ -1264,11 +1276,11 @@ void mndTransPullup(SMnode *pMnode) {
static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb * pSdb = pMnode->pSdb;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
STrans *pTrans = NULL;
int32_t cols = 0;
char * pWrite;
char *pWrite;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans);
......
......@@ -368,7 +368,7 @@ int32_t mndProcessMsg(SNodeMsg *pMsg) {
}
if (isReq && (pRpc->contLen == 0 || pRpc->pCont == NULL)) {
terrno = TSDB_CODE_MND_INVALID_MSG_LEN;
terrno = TSDB_CODE_INVALID_MSG_LEN;
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
return -1;
}
......
......@@ -32,7 +32,7 @@ TEST_F(MndTestAcct, 01_Create_Acct) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_ACCT, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
ASSERT_EQ(pRsp->code, TSDB_CODE_MSG_NOT_PROCESSED);
}
TEST_F(MndTestAcct, 02_Alter_Acct) {
......@@ -42,7 +42,7 @@ TEST_F(MndTestAcct, 02_Alter_Acct) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_ACCT, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
ASSERT_EQ(pRsp->code, TSDB_CODE_MSG_NOT_PROCESSED);
}
TEST_F(MndTestAcct, 03_Drop_Acct) {
......@@ -52,5 +52,5 @@ TEST_F(MndTestAcct, 03_Drop_Acct) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_ACCT, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
ASSERT_EQ(pRsp->code, TSDB_CODE_MSG_NOT_PROCESSED);
}
......@@ -202,7 +202,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
break;
}
code = sdbWriteNotFree(pSdb, pRaw);
code = sdbWriteWithoutFree(pSdb, pRaw);
if (code != 0) {
mError("failed to read file:%s since %s", file, terrstr());
goto PARSE_SDB_DATA_ERROR;
......@@ -263,7 +263,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
continue;
}
sdbPrintOper(pSdb, pRow, "writeFile");
sdbPrintOper(pSdb, pRow, "write");
SSdbRaw *pRaw = (*encodeFp)(pRow->pObj);
if (pRaw != NULL) {
......
......@@ -51,7 +51,9 @@ const char *sdbTableName(ESdbType type) {
case SDB_TOPIC:
return "topic";
case SDB_VGROUP:
return "vgId";
return "vgroup";
case SDB_SMA:
return "sma";
case SDB_STB:
return "stb";
case SDB_DB:
......@@ -86,13 +88,13 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
EKeyType keyType = pSdb->keyTypes[pRow->type];
if (keyType == SDB_KEY_BINARY) {
mTrace("%s:%s, refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount,
oper, pRow->pObj, sdbStatusStr(pRow->status));
mTrace("%s:%s, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), (char *)pRow->pObj, pRow->refCount, oper,
pRow->pObj, sdbStatusStr(pRow->status));
} else if (keyType == SDB_KEY_INT32) {
mTrace("%s:%d, refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj,
pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status));
mTrace("%s:%d, ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int32_t *)pRow->pObj, pRow->refCount,
oper, pRow->pObj, sdbStatusStr(pRow->status));
} else if (keyType == SDB_KEY_INT64) {
mTrace("%s:%" PRId64 ", refCount:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj,
mTrace("%s:%" PRId64 ", ref:%d oper:%s row:%p status:%s", sdbTableName(pRow->type), *(int64_t *)pRow->pObj,
pRow->refCount, oper, pRow->pObj, sdbStatusStr(pRow->status));
} else {
}
......@@ -142,7 +144,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
pRow->refCount = 0;
pRow->status = pRaw->status;
sdbPrintOper(pSdb, pRow, "insertRow");
sdbPrintOper(pSdb, pRow, "insert");
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
taosWUnLockLatch(pLock);
......@@ -191,7 +193,7 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "updateRow");
sdbPrintOper(pSdb, pOldRow, "update");
taosRUnLockLatch(pLock);
int32_t code = 0;
......@@ -220,7 +222,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
SSdbRow *pOldRow = *ppOldRow;
pOldRow->status = pRaw->status;
sdbPrintOper(pSdb, pOldRow, "deleteRow");
sdbPrintOper(pSdb, pOldRow, "delete");
taosHashRemove(hash, pOldRow->pObj, keySize);
taosWUnLockLatch(pLock);
......@@ -233,7 +235,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
return 0;
}
int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) {
SHashObj *hash = sdbGetHash(pSdb, pRaw->type);
if (hash == NULL) return terrno;
......@@ -266,7 +268,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
}
int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
int32_t code = sdbWriteNotFree(pSdb, pRaw);
int32_t code = sdbWriteWithoutFree(pSdb, pRaw);
sdbFreeRaw(pRaw);
return code;
}
......@@ -296,7 +298,7 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
case SDB_STATUS_UPDATING:
atomic_add_fetch_32(&pRow->refCount, 1);
pRet = pRow->pObj;
sdbPrintOper(pSdb, pRow, "acquireRow");
sdbPrintOper(pSdb, pRow, "acquire");
break;
case SDB_STATUS_CREATING:
terrno = TSDB_CODE_SDB_OBJ_CREATING;
......@@ -318,7 +320,7 @@ static void sdbCheck(SSdb *pSdb, SSdbRow *pRow) {
taosRLockLatch(pLock);
int32_t ref = atomic_load_32(&pRow->refCount);
sdbPrintOper(pSdb, pRow, "checkRow");
sdbPrintOper(pSdb, pRow, "check");
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pSdb, pRow);
}
......@@ -330,13 +332,13 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
if (pObj == NULL) return;
SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow));
if (pRow->type >= SDB_MAX ) return;
if (pRow->type >= SDB_MAX) return;
SRWLatch *pLock = &pSdb->locks[pRow->type];
taosRLockLatch(pLock);
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "releaseRow");
sdbPrintOper(pSdb, pRow, "release");
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pSdb, pRow);
}
......@@ -372,7 +374,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
}
atomic_add_fetch_32(&pRow->refCount, 1);
sdbPrintOper(pSdb, pRow, "fetchRow");
sdbPrintOper(pSdb, pRow, "fetch");
*ppObj = pRow->pObj;
break;
}
......
......@@ -27,7 +27,7 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
pRaw->sver = sver;
pRaw->dataLen = dataLen;
mTrace("raw:%p, is created, len:%d", pRaw, dataLen);
mTrace("raw:%p, is created, len:%d table:%s", pRaw, dataLen, sdbTableName(type));
return pRaw;
}
......
......@@ -43,7 +43,7 @@ void sdbFreeRow(SSdb *pSdb, SSdbRow *pRow) {
(*deleteFp)(pSdb, pRow->pObj);
}
sdbPrintOper(pSdb, pRow, "freeRow");
sdbPrintOper(pSdb, pRow, "free");
mTrace("row:%p, is freed", pRow->pObj);
taosMemoryFreeClear(pRow);
......
......@@ -91,16 +91,15 @@ int metaTbCursorNext(SMTbCursor *pTbCur);
// tsdb
typedef struct STsdb STsdb;
typedef struct STsdbQueryCond STsdbQueryCond;
typedef void *tsdbReaderT;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId,
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId,
void *pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
......@@ -112,6 +111,7 @@ bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg **pBlockStatis);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond);
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
......@@ -172,15 +172,6 @@ struct SVnodeCfg {
int8_t hashMethod;
};
struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
};
typedef struct {
TSKEY lastKey;
uint64_t uid;
......
......@@ -254,7 +254,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
assert(info.lastKey >= pTsdbReadHandle->window.skey && info.lastKey <= pTsdbReadHandle->window.ekey);
} else {
assert(info.lastKey >= pTsdbReadHandle->window.ekey && info.lastKey <= pTsdbReadHandle->window.skey);
info.lastKey = pTsdbReadHandle->window.skey;
}
taosArrayPush(pTableCheckInfo, &info);
......@@ -317,7 +317,7 @@ static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
}
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond* pCond) {
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond) {
pTsdbReadHandle->window = pCond->twindow;
bool updateTs = false;
......@@ -343,7 +343,7 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond*
}
}
static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, uint64_t qId, uint64_t taskId) {
static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
if (pReadHandle == NULL) {
goto _end;
......@@ -422,7 +422,7 @@ _end:
return NULL;
}
tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
tsdbReaderT* tsdbQueryTables(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId) {
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId);
if (pTsdbReadHandle == NULL) {
......@@ -448,7 +448,7 @@ tsdbReaderT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo
return (tsdbReaderT)pTsdbReadHandle;
}
void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) {
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) {
STsdbReadHandle* pTsdbReadHandle = queryHandle;
if (emptyQueryTimewindow(pTsdbReadHandle)) {
......@@ -485,7 +485,7 @@ void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) {
resetCheckInfo(pTsdbReadHandle);
}
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond* pCond, STableGroupInfo* groupList) {
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableGroupInfo* groupList) {
STsdbReadHandle* pTsdbReadHandle = queryHandle;
pTsdbReadHandle->order = pCond->order;
......@@ -526,7 +526,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond* pC
// pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
}
tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId) {
pCond->twindow = updateLastrowForEachGroup(groupList);
......@@ -555,7 +555,7 @@ tsdbReaderT tsdbQueryLastRow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo
}
#if 0
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, SQueryTableDataCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) {
STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
if (pTsdbReadHandle == NULL) {
return NULL;
......@@ -618,8 +618,8 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr
return pNew;
}
tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId) {
tsdbReaderT tsdbQueryRowsInExternalWindow(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList,
uint64_t qId, uint64_t taskId) {
STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList);
if (pNew->numOfTables == 0) {
......@@ -1185,10 +1185,12 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
}
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
// do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
......@@ -1225,8 +1227,8 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
if ((cur->pos == 0 && endPos == binfo.rows - 1 && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
(cur->pos == (binfo.rows - 1) && endPos == 0 && (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)))) {
if ((cur->pos == 0 && endPos == binfo.rows - 1 && ascScan) ||
(cur->pos == (binfo.rows - 1) && endPos == 0 && (!ascScan))) {
pTsdbReadHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows;
......@@ -1234,7 +1236,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
cur->mixBlock = false;
cur->blockCompleted = true;
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
if (ascScan) {
cur->lastKey = binfo.window.ekey + 1;
cur->pos = binfo.rows;
} else {
......@@ -1382,8 +1384,6 @@ static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
int32_t start, int32_t end) {
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
......@@ -1394,6 +1394,11 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
return numOfRows;
}
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
int32_t trueStart = ascScan ? start : end;
int32_t trueEnd = ascScan ? end : start;
int32_t step = ascScan ? 1 : -1;
int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
// data in buffer has greater timestamp, copy data in file block
......@@ -1411,7 +1416,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance
// memmove(pData, (char*)src->pData + bytes * start, bytes * num);
int32_t rowIndex = numOfRows;
for (int32_t k = start; k <= end; ++k, ++rowIndex) {
for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
SCellVal sVal = {0};
if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
TASSERT(0);
......@@ -1427,7 +1432,7 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
int32_t rowIndex = numOfRows;
// todo refactor, only copy one-by-one
for (int32_t k = start; k < num + start; ++k, ++rowIndex) {
for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex) {
SCellVal sVal = {0};
if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
TASSERT(0);
......@@ -1444,27 +1449,19 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
j++;
i++;
} else { // pColInfo->info.colId < src->colId, it is a NULL data
int32_t rowIndex = numOfRows;
for (int32_t k = start; k < num + start; ++k, ++rowIndex) { // TODO opt performance
colDataAppend(pColInfo, rowIndex, NULL, true);
}
colDataAppendNNULL(pColInfo, numOfRows, num);
i++;
}
}
while (i < requiredNumOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
int32_t rowIndex = numOfRows;
for (int32_t k = start; k < num + start; ++k, ++rowIndex) {
colDataAppend(pColInfo, rowIndex, NULL,
true); // TODO add a fast version to set a number of consecutive NULL value.
}
colDataAppendNNULL(pColInfo, numOfRows, num);
i++;
}
pTsdbReadHandle->cur.win.ekey = tsArray[end];
pTsdbReadHandle->cur.lastKey = tsArray[end] + step;
pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd];
pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step;
return numOfRows + num;
}
......@@ -2968,7 +2965,7 @@ bool tsdbNextDataBlock(tsdbReaderT pHandle) {
// }
//
// // load the previous row
// STsdbQueryCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
// SQueryTableDataCond cond = {.numOfCols = numOfCols, .loadExternalRows = false, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER};
// if (type == TSDB_PREV_ROW) {
// cond.order = TSDB_ORDER_DESC;
// cond.twindow = (STimeWindow){pTsdbReadHandle->window.skey, INT64_MIN};
......@@ -3225,7 +3222,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid, cur->rows,
cur->win.skey, cur->win.ekey, pHandle->idStr);
pDataBlockInfo->uid = uid;
pDataBlockInfo->uid = uid;
#if 0
// for multi-group data query processing test purpose
......@@ -3332,21 +3329,7 @@ SArray* tsdbRetrieveDataBlock(tsdbReaderT* pTsdbReadHandle, SArray* pIdList) {
return NULL;
}
// todo refactor
int32_t numOfRows = doCopyRowsFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
int32_t emptySize = pHandle->outputCapacity - numOfRows;
int32_t reqNumOfCols = (int32_t)taosArrayGetSize(pHandle->pColumns);
for (int32_t i = 0; i < reqNumOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pHandle->pColumns, i);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
numOfRows * pColInfo->info.bytes);
}
}
return pHandle->pColumns;
}
}
......
......@@ -201,84 +201,89 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// sync integration
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
ESyncState state = syncGetMyRole(pVnode->sync);
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
if (syncEnvIsStart()) {
SMsgHead *pHead = pMsg->pCont;
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
char logBuf[512];
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
ESyncState state = syncGetMyRole(pVnode->sync);
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
SRpcMsg *pRpcMsg = pMsg;
SMsgHead *pHead = pMsg->pCont;
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
char logBuf[512];
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
SRpcMsg *pRpcMsg = pMsg;
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
}
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
syncNodeRelease(pSyncNode);
syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
}
syncNodeRelease(pSyncNode);
} else {
vError("==vnodeProcessSyncReq== error syncEnv stop");
}
return 0;
}
......
......@@ -322,27 +322,32 @@ typedef struct SColMatchInfo {
bool output;
} SColMatchInfo;
typedef struct SScanInfo {
int32_t numOfAsc;
int32_t numOfDesc;
} SScanInfo;
typedef struct STableScanInfo {
void* dataReader;
int32_t numOfBlocks; // extract basic running information.
int32_t numOfSkipped;
int32_t numOfBlockStatis;
int64_t numOfRows;
int32_t order; // scan order
int32_t times; // repeat counts
int64_t elapsedTime;
int32_t prevGroupId; // previous table group id
SScanInfo scanInfo;
int32_t current;
int32_t reverseTimes; // 0 by default
SNode* pFilterNode; // filter operator info
SqlFunctionCtx* pCtx; // next operator query context
SNode* pFilterNode; // filter operator info
SqlFunctionCtx* pCtx; // next operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
int32_t numOfOutput;
int64_t elapsedTime;
int32_t prevGroupId; // previous table group id
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
double sampleRatio; // data block sample ratio, 1 by default
......@@ -623,6 +628,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
int32_t getTableScanOrder(SOperatorInfo* pOperator);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
......@@ -630,9 +636,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pReaderHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition,
SInterval* pInterval, double ratio, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo,
SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
......
......@@ -191,7 +191,7 @@ static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutpu
static int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo* pTableQueryInfo);
static void releaseQueryBuf(size_t numOfTables);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
// static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win);
// static SQueryTableDataCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win);
static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo);
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr);
......@@ -207,7 +207,6 @@ static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput);
static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput);
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput);
......@@ -3588,8 +3587,8 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer
#endif
}
// STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) {
// STsdbQueryCond cond = {
// SQueryTableDataCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) {
// SQueryTableDataCond cond = {
// .colList = pQueryAttr->tableCols,
// .order = pQueryAttr->order.order,
// .numOfCols = pQueryAttr->numOfCols,
......@@ -4681,7 +4680,18 @@ _error:
return NULL;
}
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; }
int32_t getTableScanOrder(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
return TSDB_ORDER_ASC;
} else {
return getTableScanOrder(pOperator->pDownstream[0]);
}
}
STableScanInfo* pTableScanInfo = pOperator->info;
return pTableScanInfo->cond.order;
}
// this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
......@@ -4885,6 +4895,76 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
return true;
}
enum {
PROJECT_RETRIEVE_CONTINUE = 0x1,
PROJECT_RETRIEVE_DONE = 0x2,
};
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes;
if (pProjectInfo->curSOffset > 0) {
if (pProjectInfo->groupId == 0) { // it is the first group
pProjectInfo->groupId = pBlock->info.groupId;
blockDataCleanup(pInfo->pRes);
return PROJECT_RETRIEVE_CONTINUE;
} else if (pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curSOffset -= 1;
// ignore data block in current group
if (pProjectInfo->curSOffset > 0) {
blockDataCleanup(pInfo->pRes);
return PROJECT_RETRIEVE_CONTINUE;
}
}
// set current group id of the project operator
pProjectInfo->groupId = pBlock->info.groupId;
}
if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curGroupOutput += 1;
if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
pOperator->status = OP_EXEC_DONE;
blockDataCleanup(pRes);
return PROJECT_RETRIEVE_DONE;
}
// reset the value for a new group data
pProjectInfo->curOffset = 0;
pProjectInfo->curOutput = 0;
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
pProjectInfo->groupId = pBlock->info.groupId;
if (pProjectInfo->curOffset >= pRes->info.rows) {
pProjectInfo->curOffset -= pRes->info.rows;
blockDataCleanup(pRes);
return PROJECT_RETRIEVE_CONTINUE;
} else if (pProjectInfo->curOffset < pRes->info.rows && pProjectInfo->curOffset > 0) {
blockDataTrimFirstNRows(pRes, pProjectInfo->curOffset);
pProjectInfo->curOffset = 0;
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
// check for the limitation in each group
if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
}
return PROJECT_RETRIEVE_DONE;
} else { // not full enough, continue to accumulate the output data in the buffer.
return PROJECT_RETRIEVE_CONTINUE;
}
}
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
......@@ -4953,63 +5033,22 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
// }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC, false);
int32_t order = getTableScanOrder(pOperator->pDownstream[0]);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput,
pProjectInfo->pPseudoColInfo);
if (pProjectInfo->curSOffset > 0) {
if (pProjectInfo->groupId == 0) { // it is the first group
pProjectInfo->groupId = pBlock->info.groupId;
blockDataCleanup(pInfo->pRes);
continue;
} else if (pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curSOffset -= 1;
// ignore data block in current group
if (pProjectInfo->curSOffset > 0) {
blockDataCleanup(pInfo->pRes);
continue;
}
}
pProjectInfo->groupId = pBlock->info.groupId;
}
if (pProjectInfo->groupId != 0 && pProjectInfo->groupId != pBlock->info.groupId) {
pProjectInfo->curGroupOutput += 1;
if ((pProjectInfo->slimit.limit > 0) && (pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput)) {
pOperator->status = OP_EXEC_DONE;
return NULL;
}
// reset the value for a new group data
pProjectInfo->curOffset = 0;
pProjectInfo->curOutput = 0;
}
pProjectInfo->groupId = pBlock->info.groupId;
// todo extract method
if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) {
blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset);
pProjectInfo->curOffset = 0;
} else if (pProjectInfo->curOffset >= pInfo->pRes->info.rows) {
pProjectInfo->curOffset -= pInfo->pRes->info.rows;
blockDataCleanup(pInfo->pRes);
int32_t status = handleLimitOffset(pOperator, pBlock);
if (status == PROJECT_RETRIEVE_CONTINUE) {
continue;
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
} else if (status == PROJECT_RETRIEVE_DONE) {
break;
}
}
if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pInfo->pRes->info.rows >= pProjectInfo->limit.limit) {
pInfo->pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
}
pProjectInfo->curOutput += pInfo->pRes->info.rows;
// copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
......@@ -5661,8 +5700,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo) {
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -6319,6 +6357,19 @@ static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOu
static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget);
static SArray* createIndexMap(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
SInterval interval = {
.interval = pTableScanNode->interval,
.sliding = pTableScanNode->sliding,
.intervalUnit = pTableScanNode->intervalUnit,
.slidingUnit = pTableScanNode->slidingUnit,
.offset = pTableScanNode->offset,
};
return interval;
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
......@@ -6329,7 +6380,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
int32_t numOfCols = 0;
int32_t numOfCols = 0;
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
if (pDataReader == NULL && terrno != 0) {
return NULL;
......@@ -6339,16 +6390,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
SInterval interval = {
.interval = pTableScanNode->interval,
.sliding = pTableScanNode->sliding,
.intervalUnit = pTableScanNode->intervalUnit,
.slidingUnit = pTableScanNode->slidingUnit,
.offset = pTableScanNode->offset,
};
SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
return createTableScanOperatorInfo(pDataReader, pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC,
numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq[0], pTableScanNode->scanSeq[1], pColList,
SInterval interval = extractIntervalInfo(pTableScanNode);
return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList,
pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
......@@ -6369,10 +6418,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
struct SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
pHandle->meta, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet,
......@@ -6493,38 +6542,47 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOptr;
}
static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo,
void* readHandle, uint64_t queryId, uint64_t taskId) {
STsdbQueryCond cond = {.loadExternalRows = false};
cond.order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
cond.numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo));
if (cond.colList == NULL) {
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
pCond->loadExternalRows = false;
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
return terrno;
}
cond.twindow = pTableScanNode->scanRange;
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
// cond.type = pTableScanNode->scanFlag;
pCond->twindow = pTableScanNode->scanRange;
#if 1
//todo work around a problem, remove it later
if ((pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey > pCond->twindow.ekey) ||
(pCond->order == TSDB_ORDER_DESC && pCond->twindow.skey < pCond->twindow.ekey)) {
TSWAP(pCond->twindow.skey, pCond->twindow.ekey, int64_t);
}
#endif
pCond->type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
// pCond->type = pTableScanNode->scanFlag;
int32_t j = 0;
for (int32_t i = 0; i < cond.numOfCols; ++i) {
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pTableScanNode->scan.pScanCols, i);
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
if (pColNode->colType == COLUMN_TYPE_TAG) {
continue;
}
cond.colList[j].type = pColNode->node.resType.type;
cond.colList[j].bytes = pColNode->node.resType.bytes;
cond.colList[j].colId = pColNode->colId;
pCond->colList[j].type = pColNode->node.resType.type;
pCond->colList[j].bytes = pColNode->node.resType.bytes;
pCond->colList[j].colId = pColNode->colId;
j += 1;
}
cond.numOfCols = j;
return tsdbQueryTables(readHandle, &cond, pGroupInfo, queryId, taskId);
pCond->numOfCols = j;
return TSDB_CODE_SUCCESS;
}
SArray* extractScanColumnId(SNodeList* pNodeList) {
......@@ -6742,7 +6800,13 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
goto _error;
}
return createDataReaderImpl(pTableScanNode, pTableGroupInfo, pHandle->reader, queryId, taskId);
SQueryTableDataCond cond = {0};
code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return tsdbQueryTables(pHandle->reader, &cond, pTableGroupInfo, queryId, taskId);
_error:
terrno = code;
......
......@@ -245,6 +245,10 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
// reset the block to be 0 by default, this blockId is assigned by physical plan and is used by direct upstream
// operator.
pBlock->info.blockId = 0;
doFilter(pTableScanInfo->pFilterNode, pBlock);
if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1;
......@@ -255,17 +259,15 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
return TSDB_CODE_SUCCESS;
}
static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
// reverse order time range
static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
SET_REVERSE_SCAN_FLAG(pTableScanInfo);
switchCtxOrder(pCtx, numOfOutput);
SWITCH_ORDER(pTableScanInfo->order);
setupQueryRangeForReverseScan(pTableScanInfo);
// setupQueryRangeForReverseScan(pTableScanInfo);
pTableScanInfo->times = 1;
pTableScanInfo->current = 0;
pTableScanInfo->reverseTimes = 0;
STimeWindow* pTWindow = &pTableScanInfo->cond.twindow;
TSWAP(pTWindow->skey, pTWindow->ekey, int64_t);
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
}
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
......@@ -294,9 +296,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
continue;
}
// reset the block to be 0 by default, this blockId is assigned by physical plan and is used by direct upstream
// operator.
pBlock->info.blockId = 0;
return pBlock;
}
......@@ -312,64 +311,71 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return NULL;
}
SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo;
*newgroup = false;
while (pTableScanInfo->current < pTableScanInfo->times) {
while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
if (p != NULL) {
return p;
}
if (++pTableScanInfo->current >= pTableScanInfo->times) {
if (pTableScanInfo->reverseTimes <= 0 /* || isTsdbCacheLastRow(pTableScanInfo->pTsdbReadHandle)*/) {
return NULL;
} else {
break;
}
pTableScanInfo->current += 1;
if (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN;
STimeWindow* pWin = &pTableScanInfo->cond.twindow;
qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64
"-%" PRId64,
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
// do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
}
}
// do prepare for the next round table scan operation
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
if (pTableScanInfo->current < total) {
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
}
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN;
STimeWindow* pWin = &pTableScanInfo->cond.twindow;
qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
// if (pResultRowInfo->size > 0) {
// pResultRowInfo->curPos = 0;
// }
while (pTableScanInfo->current < total) {
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
if (p != NULL) {
return p;
}
qDebug("%s start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
}
pTableScanInfo->current += 1;
SSDataBlock* p = NULL;
// todo refactor
if (pTableScanInfo->reverseTimes > 0) {
setupEnvForReverseScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
// STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
// tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond);
if (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug("%s start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
"-%" PRId64,
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
if (pResultRowInfo->size > 0) {
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
// do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
}
}
p = doTableScanImpl(pOperator, newgroup);
}
return p;
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
}
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag,
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo,
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput,
int32_t dataLoadFlag, const uint8_t* scanInfo, SArray* pColMatchInfo,
SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval,
double sampleRatio, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0);
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -380,18 +386,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int
return NULL;
}
pInfo->cond = *pCond;
pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]};
pInfo->interval = *pInterval;
pInfo->sampleRatio = sampleRatio;
pInfo->dataBlockLoadFlag = dataLoadFlag;
pInfo->pResBlock = pResBlock;
pInfo->pFilterNode = pCondition;
pInfo->dataReader = pDataReader;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
pInfo->order = order;
pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColMatchInfo;
pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false;
......@@ -413,8 +420,6 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
pInfo->dataReader = pTsdbReadHandle;
pInfo->times = 1;
pInfo->reverseTimes = 0;
pInfo->current = 0;
pInfo->prevGroupId = -1;
......
......@@ -86,8 +86,10 @@ static FORCE_INLINE _getDoubleValue_fn_t getVectorDoubleValueFn(int32_t srcType)
p = getVectorDoubleValue_JSON;
} else if (srcType == TSDB_DATA_TYPE_BOOL) {
p = getVectorDoubleValue_BOOL;
} else if (srcType == TSDB_DATA_TYPE_NULL) {
p = NULL;
} else {
assert(0);
ASSERT(0);
}
return p;
}
......
......@@ -591,21 +591,25 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == res) {
sclError("make value node failed");
sclFreeParam(&output);
sclError("make value node failed");
sclFreeParam(&output);
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
res->node.resType = node->node.resType;
res->translate = true;
int32_t type = output.columnData->info.type;
if (IS_VAR_DATA_TYPE(type)) { // todo refactor
res->datum.p = output.columnData->pData;
output.columnData->pData = NULL;
if (colDataIsNull_s(output.columnData, 0)) {
res->node.resType.type = TSDB_DATA_TYPE_NULL;
} else {
memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
res->node.resType = node->node.resType;
int32_t type = output.columnData->info.type;
if (IS_VAR_DATA_TYPE(type)) { // todo refactor
res->datum.p = output.columnData->pData;
output.columnData->pData = NULL;
} else {
memcpy(nodesGetValueFromNode(res), output.columnData->pData, tDataTypes[type].bytes);
}
}
nodesDestroyNode(*pNode);
......@@ -628,7 +632,7 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
if (QUERY_NODE_OPERATOR == nodeType(*pNode)) {
return sclRewriteOperator(pNode, ctx);
}
}
return DEAL_RES_CONTINUE;
}
......@@ -636,7 +640,7 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
SFunctionNode *node = (SFunctionNode *)pNode;
SScalarParam output = {0};
ctx->code = sclExecFunction(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
......@@ -653,7 +657,7 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) {
SLogicConditionNode *node = (SLogicConditionNode *)pNode;
SScalarParam output = {0};
ctx->code = sclExecLogic(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
......
......@@ -150,34 +150,36 @@ int64_t getVectorBigintValue_JSON(void *src, int32_t index){
_getBigintValue_fn_t getVectorBigintValueFn(int32_t srcType) {
_getBigintValue_fn_t p = NULL;
if(srcType==TSDB_DATA_TYPE_TINYINT) {
p = getVectorBigintValue_TINYINT;
}else if(srcType==TSDB_DATA_TYPE_UTINYINT) {
p = getVectorBigintValue_UTINYINT;
}else if(srcType==TSDB_DATA_TYPE_SMALLINT) {
p = getVectorBigintValue_SMALLINT;
}else if(srcType==TSDB_DATA_TYPE_USMALLINT) {
p = getVectorBigintValue_USMALLINT;
}else if(srcType==TSDB_DATA_TYPE_INT) {
p = getVectorBigintValue_INT;
}else if(srcType==TSDB_DATA_TYPE_UINT) {
p = getVectorBigintValue_UINT;
}else if(srcType==TSDB_DATA_TYPE_BIGINT) {
p = getVectorBigintValue_BIGINT;
}else if(srcType==TSDB_DATA_TYPE_UBIGINT) {
p = getVectorBigintValue_UBIGINT;
}else if(srcType==TSDB_DATA_TYPE_FLOAT) {
p = getVectorBigintValue_FLOAT;
}else if(srcType==TSDB_DATA_TYPE_DOUBLE) {
p = getVectorBigintValue_DOUBLE;
}else if(srcType==TSDB_DATA_TYPE_TIMESTAMP) {
p = getVectorBigintValue_BIGINT;
}else if(srcType==TSDB_DATA_TYPE_BOOL) {
p = getVectorBigintValue_BOOL;
}else if(srcType==TSDB_DATA_TYPE_JSON) {
p = getVectorBigintValue_JSON;
}else {
assert(0);
if (srcType==TSDB_DATA_TYPE_TINYINT) {
p = getVectorBigintValue_TINYINT;
} else if (srcType==TSDB_DATA_TYPE_UTINYINT) {
p = getVectorBigintValue_UTINYINT;
} else if (srcType==TSDB_DATA_TYPE_SMALLINT) {
p = getVectorBigintValue_SMALLINT;
} else if (srcType==TSDB_DATA_TYPE_USMALLINT) {
p = getVectorBigintValue_USMALLINT;
} else if (srcType==TSDB_DATA_TYPE_INT) {
p = getVectorBigintValue_INT;
} else if (srcType==TSDB_DATA_TYPE_UINT) {
p = getVectorBigintValue_UINT;
} else if (srcType==TSDB_DATA_TYPE_BIGINT) {
p = getVectorBigintValue_BIGINT;
} else if (srcType==TSDB_DATA_TYPE_UBIGINT) {
p = getVectorBigintValue_UBIGINT;
} else if (srcType==TSDB_DATA_TYPE_FLOAT) {
p = getVectorBigintValue_FLOAT;
} else if (srcType==TSDB_DATA_TYPE_DOUBLE) {
p = getVectorBigintValue_DOUBLE;
} else if (srcType==TSDB_DATA_TYPE_TIMESTAMP) {
p = getVectorBigintValue_BIGINT;
} else if (srcType==TSDB_DATA_TYPE_BOOL) {
p = getVectorBigintValue_BOOL;
} else if (srcType==TSDB_DATA_TYPE_JSON) {
p = getVectorBigintValue_JSON;
} else if (srcType==TSDB_DATA_TYPE_NULL){
p = NULL;
} else {
ASSERT(0);
}
return p;
}
......@@ -1594,7 +1596,7 @@ _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binFunctionId) {
case OP_TYPE_JSON_CONTAINS:
return vectorJsonContains;
default:
assert(0);
ASSERT(0);
return NULL;
}
}
......
......@@ -39,6 +39,8 @@ extern "C" {
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
typedef struct SSyncEnv {
uint8_t isStart;
// tick timer
tmr_h pEnvTickTimer;
int32_t envTickTimerMS;
......
......@@ -26,6 +26,14 @@ static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv);
static void syncEnvTick(void *param, void *tmrId);
// --------------------------------
bool syncEnvIsStart() {
if (gSyncEnv == NULL) {
return false;
}
return atomic_load_8(&(gSyncEnv->isStart));
}
int32_t syncEnvStart() {
int32_t ret = 0;
taosSeedRand(taosGetTimestampSec());
......@@ -88,12 +96,15 @@ static SSyncEnv *doSyncEnvStart() {
// start tmr thread
pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
atomic_store_8(&(pSyncEnv->isStart), 1);
return pSyncEnv;
}
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
assert(pSyncEnv == gSyncEnv);
if (pSyncEnv != NULL) {
atomic_store_8(&(pSyncEnv->isStart), 0);
taosTmrCleanUp(pSyncEnv->pTimerManager);
taosMemoryFree(pSyncEnv);
}
......
......@@ -61,7 +61,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ);
// TODO:change to deserialize function
if (pIdxTFile == NULL) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
......@@ -73,7 +72,6 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return -1;
}
// read idx file and get log file pos
// TODO:change to deserialize function
SWalIdxEntry entry;
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
taosThreadMutexUnlock(&pWal->mutex);
......@@ -167,7 +165,7 @@ int32_t walEndSnapshot(SWal *pWal) {
char fnameStr[WAL_FILE_LEN];
// remove file
for (int i = 0; i < deleteCnt; i++) {
SWalFileInfo *pInfo = taosArrayGet(pWal->fileInfoSet, i);
pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
......
......@@ -39,11 +39,11 @@ void* taosLoadDll(const char* filename) {
#else
void* handle = dlopen(filename, RTLD_LAZY);
if (!handle) {
//printf("load dll:%s failed, error:%s", filename, dlerror());
// printf("load dll:%s failed, error:%s", filename, dlerror());
return NULL;
}
//printf("dll %s loaded", filename);
// printf("dll %s loaded", filename);
return handle;
#endif
......@@ -59,17 +59,17 @@ void* taosLoadSym(void* handle, char* name) {
char* error = NULL;
if ((error = dlerror()) != NULL) {
//printf("load sym:%s failed, error:%s", name, dlerror());
// printf("load sym:%s failed, error:%s", name, dlerror());
return NULL;
}
//printf("sym %s loaded", name);
// printf("sym %s loaded", name);
return sym;
#endif
}
void taosCloseDll(void* handle) {
void taosCloseDll(void* handle) {
#if defined(WINDOWS)
return;
#elif defined(_TD_DARWIN_64)
......@@ -100,7 +100,7 @@ int taosSetConsoleEcho(bool on) {
struct termios term;
if (tcgetattr(STDIN_FILENO, &term) == -1) {
perror("Cannot get the attribution of the terminal");
/*perror("Cannot get the attribution of the terminal");*/
return -1;
}
......@@ -111,7 +111,7 @@ int taosSetConsoleEcho(bool on) {
err = tcsetattr(STDIN_FILENO, TCSAFLUSH, &term);
if (err == -1 || err == EINTR) {
printf("Cannot set the attribution of the terminal");
/*printf("Cannot set the attribution of the terminal");*/
return -1;
}
......@@ -154,7 +154,7 @@ void taosSetTerminalMode() {
int32_t taosGetOldTerminalMode() {
#if defined(WINDOWS)
#else
/* Make sure stdin is a terminal. */
if (!isatty(STDIN_FILENO)) {
......@@ -181,7 +181,7 @@ void taosResetTerminalMode() {
#endif
}
TdCmdPtr taosOpenCmd(const char *cmd) {
TdCmdPtr taosOpenCmd(const char* cmd) {
if (cmd == NULL) return NULL;
#ifdef WINDOWS
return (TdCmdPtr)_popen(cmd, "r");
......@@ -190,8 +190,8 @@ TdCmdPtr taosOpenCmd(const char *cmd) {
#endif
}
int64_t taosGetLineCmd(TdCmdPtr pCmd, char ** __restrict ptrBuf) {
if (pCmd == NULL || ptrBuf == NULL ) {
int64_t taosGetLineCmd(TdCmdPtr pCmd, char** __restrict ptrBuf) {
if (pCmd == NULL || ptrBuf == NULL) {
return -1;
}
if (*ptrBuf != NULL) {
......@@ -219,7 +219,7 @@ int32_t taosEOFCmd(TdCmdPtr pCmd) {
return feof((FILE*)pCmd);
}
int64_t taosCloseCmd(TdCmdPtr *ppCmd) {
int64_t taosCloseCmd(TdCmdPtr* ppCmd) {
if (ppCmd == NULL || *ppCmd == NULL) {
return 0;
}
......
......@@ -145,12 +145,6 @@ static int32_t cfgCheckAndSetDir(SConfigItem *pItem, const char *inputDir) {
return -1;
}
if (taosRealPath(fullDir, NULL, PATH_MAX) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get realpath of dir:%s since %s", inputDir, terrstr());
return -1;
}
taosMemoryFreeClear(pItem->str);
pItem->str = strdup(fullDir);
if (pItem->str == NULL) {
......
......@@ -144,17 +144,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt cl
// mnode-common
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Cluster not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Mnode not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_NEED_REPROCESSED, "Message need to be reprocessed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_RIGHTS, "Insufficient privilege for operation")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_OPTIONS, "Invalid mnode options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONNECTION, "Invalid message connection")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_VERSION, "Incompatible protocol version")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_LEN, "Invalid message length")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_TYPE, "Invalid message type")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_SHELL_CONNS, "Too many connections")
// mnode-show
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SHOWOBJ, "Data expired")
......
......@@ -135,7 +135,7 @@ echo "qDebugFlag 143" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "uDebugFlag 143" >> $TAOS_CFG
echo "sDebugFlag 143" >> $TAOS_CFG
echo "sDebugFlag 135" >> $TAOS_CFG
echo "wDebugFlag 143" >> $TAOS_CFG
echo "numOfLogLines 20000000" >> $TAOS_CFG
echo "statusInterval 1" >> $TAOS_CFG
......
......@@ -75,46 +75,46 @@ if $data02 != LEADER then
return -1
endi
print =============== create drop qnode 1
sql create qnode on dnode 1
sql show qnodes
print =============== create drop snode 1
sql create snode on dnode 1
sql show snodes
if $rows != 1 then
return -1
endi
if $data00 != 1 then
return -1
endi
sql_error create qnode on dnode 1
sql_error create snode on dnode 1
sql drop qnode on dnode 1
sql show qnodes
sql drop snode on dnode 1
sql show snodes
if $rows != 0 then
return -1
endi
sql_error drop qnode on dnode 1
sql_error drop snode on dnode 1
print =============== create drop qnode 2
sql create qnode on dnode 2
sql show qnodes
print =============== create drop snode 2
sql create snode on dnode 2
sql show snodes
if $rows != 1 then
return -1
endi
if $data00 != 2 then
return -1
endi
sql_error create qnode on dnode 2
sql_error create snode on dnode 2
sql drop qnode on dnode 2
sql show qnodes
sql drop snode on dnode 2
sql show snodes
if $rows != 0 then
return -1
endi
sql_error drop qnode on dnode 2
sql_error drop snode on dnode 2
print =============== create drop qnodes
sql create qnode on dnode 1
sql create qnode on dnode 2
sql show qnodes
print =============== create drop snodes
sql create snode on dnode 1
sql create snode on dnode 2
sql show snodes
if $rows != 2 then
return -1
endi
......@@ -126,7 +126,7 @@ system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sleep 2000
sql show qnodes
sql show snodes
if $rows != 2 then
return -1
endi
......
......@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
......
......@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
......
......@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
......
......@@ -25,7 +25,7 @@ $rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$pullDelay = 3
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册