提交 e68f14a0 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

......@@ -51,6 +51,13 @@ IF(${TD_WINDOWS})
"If build unit tests using googletest"
ON
)
ELSEIF (TD_DARWIN_64)
add_definitions(-DCOMPILER_SUPPORTS_CXX13)
option(
BUILD_TEST
"If build unit tests using googletest"
ON
)
ELSE ()
include(CheckCXXCompilerFlag)
CHECK_CXX_COMPILER_FLAG("-std=c++13" COMPILER_SUPPORTS_CXX13)
......
......@@ -241,7 +241,7 @@ int32_t create_topic() {
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -302,7 +302,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "experimental.snapshot.enable", "false");
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
......
......@@ -32,6 +32,18 @@ enum {
TMQ_CONF__RESET_OFFSET__LATEST = -1,
};
// clang-format off
#define IS_META_MSG(x) ( \
x == TDMT_VND_CREATE_STB \
|| x == TDMT_VND_ALTER_STB \
|| x == TDMT_VND_DROP_STB \
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TTL_TABLE \
)
// clang-format on
enum {
TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP,
......
......@@ -2826,8 +2826,8 @@ typedef struct {
static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
// tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
// tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
tlen += taosEncodeFixedI16(buf, pRsp->resMsgType);
tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen);
tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen);
......@@ -2835,8 +2835,8 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
}
static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
// buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
// buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
buf = taosDecodeFixedI16(buf, &pRsp->resMsgType);
buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen);
buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen);
......
......@@ -30,15 +30,15 @@ struct SRpcMsg;
struct SSubplan;
typedef struct SReadHandle {
void* reader;
void* streamReader;
void* meta;
void* config;
void* vnode;
void* mnd;
SMsgCb* pMsgCb;
// int8_t initTsdbReader;
bool tqReader;
bool initMetaReader;
bool initTableReader;
bool initStreamReader;
} SReadHandle;
typedef enum {
......
......@@ -223,7 +223,7 @@ typedef struct {
SEpSet epSet;
} SStreamChildEpInfo;
struct SStreamTask {
typedef struct SStreamTask {
int64_t streamId;
int32_t taskId;
int8_t isDataScan;
......@@ -235,6 +235,11 @@ struct SStreamTask {
int8_t taskStatus;
int8_t execStatus;
// exec info
int64_t enqueueVer;
int64_t processedVer;
int64_t checkpointVer;
// node info
int32_t selfChildId;
int32_t nodeId;
......@@ -277,7 +282,7 @@ struct SStreamTask {
// msg handle
SMsgCb* pMsgCb;
};
} SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
......@@ -288,6 +293,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
#if 0
while (1) {
int8_t inputStatus =
atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
......@@ -296,6 +302,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
}
ASSERT(0);
}
#endif
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
......@@ -316,8 +323,10 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}
#if 0
// TODO: back pressure
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0;
}
......
......@@ -88,7 +88,7 @@ typedef struct {
EWalType level; // wal level
} SWalCfg;
typedef struct SWalVer {
typedef struct {
int64_t firstVer;
int64_t verInSnapshotting;
int64_t snapshotVer;
......@@ -149,17 +149,22 @@ typedef struct SWal {
SWalCkHead writeHead;
} SWal; // WAL HANDLE
typedef struct SWalReadHandle {
SWal *pWal;
TdFilePtr pReadLogTFile;
TdFilePtr pReadIdxTFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int64_t status; // if cursor valid
TdThreadMutex mutex;
SWalCkHead *pHead;
} SWalReadHandle;
typedef struct {
int8_t scanUncommited;
int8_t scanMeta;
} SWalFilterCond;
typedef struct {
SWal *pWal;
TdFilePtr pLogFile;
TdFilePtr pIdxFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
TdThreadMutex mutex;
SWalFilterCond cond;
SWalCkHead *pHead;
} SWalReader;
// module initialization
int32_t walInit();
......@@ -178,7 +183,6 @@ void walFsync(SWal *, bool force);
// apis for lifecycle management
int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver);
// notify that previous logs can be pruned safely
int32_t walBeginSnapshot(SWal *, int64_t ver);
......@@ -187,15 +191,16 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*);
// read
SWalReadHandle *walOpenReadHandle(SWal *);
void walCloseReadHandle(SWalReadHandle *);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReader(SWalReader *pRead);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
// only for tq usage
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity);
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead);
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
typedef struct {
int64_t refId;
......@@ -207,10 +212,11 @@ void walCloseRef(SWalRef *);
int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *);
// help function for raft
bool walLogExist(SWal *, int64_t ver);
bool walIsEmpty(SWal *);
// lifecycle check
bool walIsEmpty(SWal *);
int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
......
......@@ -45,6 +45,7 @@ void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
char *strDupUnquo(const char *src);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context;
......
......@@ -26,6 +26,8 @@ if(TD_WINDOWS)
/DEF:${CMAKE_CURRENT_SOURCE_DIR}/src/taos.def
)
INCLUDE_DIRECTORIES(jni/windows)
INCLUDE_DIRECTORIES(jni/windows/win32)
INCLUDE_DIRECTORIES(jni/windows/win32/bridge)
else()
INCLUDE_DIRECTORIES(jni/linux)
endif()
......
......@@ -50,19 +50,18 @@ struct tmq_list_t {
};
struct tmq_conf_t {
char clientId[256];
char groupId[TSDB_CGROUP_LEN];
int8_t autoCommit;
int8_t resetOffset;
int8_t withTbName;
int8_t spEnable;
int32_t spBatchSize;
uint16_t port;
int32_t autoCommitInterval;
char* ip;
char* user;
char* pass;
/*char* db;*/
char clientId[256];
char groupId[TSDB_CGROUP_LEN];
int8_t autoCommit;
int8_t resetOffset;
int8_t withTbName;
int8_t spEnable;
int32_t spBatchSize;
uint16_t port;
int32_t autoCommitInterval;
char* ip;
char* user;
char* pass;
tmq_commit_cb* commitCb;
void* commitCbUserParam;
};
......@@ -338,7 +337,7 @@ tmq_list_t* tmq_list_new() {
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
SArray* container = &list->container;
char* topic = strdup(src);
char* topic = strDupUnquo(src);
if (taosArrayPush(container, &topic) == NULL) return -1;
return 0;
}
......
......@@ -1737,41 +1737,54 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id %lu|\n", flag,
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId);
if (len >= size -1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) {
len += snprintf(dumpBuf + len, size - len, "%s |", flag);
if (len >= size -1) return dumpBuf;
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
if (colDataIsNull(pColInfoData, rows, j, NULL)) {
len += snprintf(dumpBuf + len, size - len, " %15s |", "NULL");
if (len >= size -1) return dumpBuf;
continue;
}
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
len += snprintf(dumpBuf + len, size - len, " %25s |", pBuf);
if (len >= size -1) return dumpBuf;
break;
case TSDB_DATA_TYPE_INT:
len += snprintf(dumpBuf + len, size - len, " %15d |", *(int32_t*)var);
if (len >= size -1) return dumpBuf;
break;
case TSDB_DATA_TYPE_UINT:
len += snprintf(dumpBuf + len, size - len, " %15u |", *(uint32_t*)var);
if (len >= size -1) return dumpBuf;
break;
case TSDB_DATA_TYPE_BIGINT:
len += snprintf(dumpBuf + len, size - len, " %15ld |", *(int64_t*)var);
if (len >= size -1) return dumpBuf;
break;
case TSDB_DATA_TYPE_UBIGINT:
len += snprintf(dumpBuf + len, size - len, " %15lu |", *(uint64_t*)var);
if (len >= size -1) return dumpBuf;
break;
case TSDB_DATA_TYPE_FLOAT:
len += snprintf(dumpBuf + len, size - len, " %15f |", *(float*)var);
if (len >= size -1) return dumpBuf;
break;
case TSDB_DATA_TYPE_DOUBLE:
len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
if (len >= size -1) return dumpBuf;
break;
}
}
len += snprintf(dumpBuf + len, size - len, "\n");
if (len >= size -1) return dumpBuf;
}
len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag);
return dumpBuf;
......
......@@ -43,6 +43,9 @@ if (taosInitLog("taosdlog", 1) != 0) {
}
void Testbase::Init(const char* path, int16_t port) {
#ifdef _TD_DARWIN_64
osDefaultInit();
#endif
tsServerPort = port;
strcpy(tsLocalFqdn, "localhost");
snprintf(tsLocalEp, TSDB_EP_LEN, "%s:%u", tsLocalFqdn, tsServerPort);
......
......@@ -40,15 +40,6 @@ extern "C" {
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define IS_META_MSG(x) ( \
x == TDMT_VND_CREATE_STB \
|| x == TDMT_VND_ALTER_STB \
|| x == TDMT_VND_DROP_STB \
|| x == TDMT_VND_CREATE_TABLE \
|| x == TDMT_VND_ALTER_TABLE \
|| x == TDMT_VND_DROP_TABLE \
|| x == TDMT_VND_DROP_TTL_TABLE \
)
// clang-format on
typedef struct STqOffsetStore STqOffsetStore;
......@@ -128,7 +119,7 @@ typedef struct {
int8_t fetchMeta;
// reader
SWalReadHandle* pWalReader;
SWalReader* pWalReader;
// push
STqPushHandle pushHandle;
......
......@@ -332,7 +332,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
}
SReadHandle handle = {
.reader = pReadHandle,
.streamReader = pReadHandle,
.meta = pMeta,
.pMsgCb = pMsgCb,
.vnode = pVnode,
......
......@@ -28,8 +28,12 @@ int32_t tqInit() {
atomic_store_8(&tqMgmt.inited, 0);
return -1;
}
if (streamInit() < 0) {
return -1;
}
atomic_store_8(&tqMgmt.inited, 1);
}
return 0;
}
......@@ -42,6 +46,7 @@ void tqCleanUp() {
if (old == 1) {
taosTmrCleanUp(tqMgmt.timer);
streamCleanUp();
atomic_store_8(&tqMgmt.inited, 0);
}
}
......@@ -144,7 +149,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeSMqDataRsp(&encoder, pRsp);
/*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/
SRpcMsg rsp = {
.info = pMsg->info,
......@@ -361,8 +365,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(IS_META_MSG(pHead->msgType));
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->reqOffset.version;
metaRsp.rspOffset = fetchVer;
/*metaRsp.reqOffset = pReq->reqOffset.version;*/
/*metaRsp.rspOffset = fetchVer;*/
/*metaRsp.rspOffsetNew.version = fetchVer;*/
tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version);
tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer);
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
......@@ -439,7 +446,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.subType = req.subType;
pHandle->fetchMeta = req.withMeta;
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) {
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
......@@ -448,10 +455,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req.qmsg = NULL;
for (int32_t i = 0; i < 5; i++) {
SReadHandle handle = {
.reader = pHandle->execHandle.pExecReader[i],
.streamReader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.tqReader = true,
.initTableReader = true,
};
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]);
......@@ -522,19 +529,16 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
if (pTask->execType != TASK_EXEC__NONE) {
// expand runners
if (pTask->isDataScan) {
SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
SReadHandle handle = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initStreamReader = 1,
};
/*pTask->exec.inputHandle = pStreamReader;*/
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
} else {
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
ASSERT(pTask->exec.executor);
}
ASSERT(pTask->exec.executor);
}
// sink
......
......@@ -77,14 +77,14 @@ int32_t tqMetaOpen(STQ* pTq) {
STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
for (int32_t i = 0; i < 5; i++) {
handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
for (int32_t i = 0; i < 5; i++) {
SReadHandle reader = {
.reader = handle.execHandle.pExecReader[i],
.streamReader = handle.execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
......
......@@ -854,7 +854,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
if (pCommitter->pReader) {
code = tsdbDataFReaderClose(&pCommitter->pReader);
goto _err;
if (code) goto _err;
}
_exit:
......
......@@ -520,7 +520,7 @@ static int32_t tsdbScanAndTryFixFS(STsdbFS *pFS, int8_t deepScan) {
return code;
_err:
tsdbError("vgId:%d tsdb can and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbError("vgId:%d tsdb scan and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
......
......@@ -49,7 +49,7 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
}
pDelFWriter->fDel.size = TSDB_FHDR_SIZE;
pDelFWriter->fDel.size = 0;
pDelFWriter->fDel.offset = 0;
*ppWriter = pDelFWriter;
return code;
......
......@@ -281,8 +281,9 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing");
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG)
&& !vnodeIsLeader(pVnode)) {
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
pMsg->msgType == TDMT_VND_TABLE_CFG) &&
!vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
}
......@@ -348,7 +349,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY;
int32_t t = ntohl(*(int32_t *)pReq);
vError("rec ttl time:%d", t);
vDebug("rec ttl time:%d", t);
int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids);
if (ret != 0) {
goto end;
......@@ -389,10 +390,14 @@ static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *p
goto _err;
}
taosMemoryFree(req.schemaRow.pSchema);
taosMemoryFree(req.schemaTag.pSchema);
tDecoderClear(&coder);
return 0;
_err:
taosMemoryFree(req.schemaRow.pSchema);
taosMemoryFree(req.schemaTag.pSchema);
tDecoderClear(&coder);
return -1;
}
......@@ -811,7 +816,8 @@ _exit:
taosArrayDestroy(submitRsp.pArray);
// TODO: the partial success scenario and the error case
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level 1/level 2.
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
// 1/level 2.
// TODO: refactor
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
......@@ -915,4 +921,4 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
_err:
return code;
}
\ No newline at end of file
}
......@@ -348,7 +348,7 @@ typedef struct SessionWindowSupporter {
uint8_t parentType;
} SessionWindowSupporter;
typedef struct SStreamBlockScanInfo {
typedef struct SStreamScanInfo {
uint64_t tableUid; // queried super table uid
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
......@@ -365,7 +365,7 @@ typedef struct SStreamBlockScanInfo {
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times
void* streamBlockReader;// stream block reader handle
void* streamReader;// stream block reader handle
int32_t tsArrayIndex;
SArray* tsArray;
......@@ -374,7 +374,7 @@ typedef struct SStreamBlockScanInfo {
EStreamScanMode scanMode;
SOperatorInfo* pStreamScanOp;
SOperatorInfo* pSnapshotReadOp;
SOperatorInfo* pTableScanOp;
SArray* childIds;
SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
......@@ -383,7 +383,7 @@ typedef struct SStreamBlockScanInfo {
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
} SStreamBlockScanInfo;
} SStreamScanInfo;
typedef struct SSysTableScanInfo {
SRetrieveMetaTableRsp* pRsp;
......@@ -527,6 +527,7 @@ typedef struct SFillOperatorInfo {
void** p;
SSDataBlock* existNewGroupBlock;
bool multigroupResult;
STimeWindow win;
} SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo {
......@@ -587,6 +588,7 @@ typedef struct SSessionAggOperatorInfo {
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
SNode *pCondition;
} SSessionAggOperatorInfo;
typedef struct SResultWindowInfo {
......@@ -648,6 +650,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
// bool reptScan;
const SNode *pCondition;
} SStateWindowOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
......@@ -805,7 +808,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo);
SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
......@@ -819,7 +822,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId,
SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
......
......@@ -37,7 +37,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} else {
pOperator->status = OP_NOT_OPENED;
SStreamBlockScanInfo* pInfo = pOperator->info;
SStreamScanInfo* pInfo = pOperator->info;
pInfo->assignBlockUid = assignUid;
// TODO: if a block was set but not consumed,
......@@ -45,7 +45,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pInfo->blockType = type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) {
if (tqReadHandleSetMsg(pInfo->streamReader, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
......@@ -130,7 +130,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return pTaskInfo;
}
static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo, const SArray* tableIdList) {
static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
// let's discard the tables those are not created according to the queried super table.
......@@ -168,17 +168,17 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
pInfo = pInfo->pDownstream[0];
}
int32_t code = 0;
SStreamBlockScanInfo* pScanInfo = pInfo->info;
int32_t code = 0;
SStreamScanInfo* pScanInfo = pInfo->info;
if (isAdd) { // add new table id
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList);
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa);
code = tqReadHandleAddTbUidList(pScanInfo->streamReader, qa);
taosArrayDestroy(qa);
} else { // remove the table id in current list
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
code = tqReadHandleRemoveTbUidList(pScanInfo->streamBlockReader, tableIdList);
code = tqReadHandleRemoveTbUidList(pScanInfo->streamReader, tableIdList);
}
return code;
......
......@@ -1340,6 +1340,8 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
blockDataUpdateTsWindow(pBlock, 0);
taosMemoryFree(rowRes);
}
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
......@@ -2847,12 +2849,12 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamBlockScanInfo* pScanInfo = pOperator->info;
SStreamScanInfo* pScanInfo = pOperator->info;
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
pScanInfo->pSnapshotReadOp->status = OP_OPENED;
pScanInfo->pTableScanOp->status = OP_OPENED;
STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info;
STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);
if (uid == 0) {
......@@ -2912,8 +2914,8 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamBlockScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info;
SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
*uid = pSnapShotScanInfo->lastStatus.uid;
*ts = pSnapShotScanInfo->lastStatus.ts;
} else {
......@@ -3334,7 +3336,7 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pTaskInfo->window.ekey
int64_t ekey = Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey
: pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
......@@ -3395,7 +3397,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
// Fill the previous group data block, before handle the data block of new group.
// Close the fill operation for previous group data block
taosFillSetStartInfo(pInfo->pFillInfo, 0, pTaskInfo->window.ekey);
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
} else {
if (pBlock == NULL) {
if (pInfo->totalInputRows == 0) {
......@@ -3403,7 +3405,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
return NULL;
}
taosFillSetStartInfo(pInfo->pFillInfo, 0, pTaskInfo->window.ekey);
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
} else {
pInfo->totalInputRows += pBlock->info.rows;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
......@@ -3920,7 +3922,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
int32_t order = TSDB_ORDER_ASC;
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
pInfo->win = win;
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
taosMemoryFree(pInfo->pFillInfo);
taosMemoryFree(pInfo->p);
......@@ -4452,7 +4455,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr =
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo);
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
......@@ -4474,7 +4477,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
......@@ -4584,9 +4587,9 @@ static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanI
}
return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
} else {
SStreamBlockScanInfo* pInfo = pOperator->info;
ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
*ppInfo = pInfo->pSnapshotReadOp->info;
SStreamScanInfo* pInfo = pOperator->info;
ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
*ppInfo = pInfo->pTableScanOp->info;
return 0;
}
}
......
......@@ -302,14 +302,21 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
size_t rows = pRes->info.rows;
if (rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
if (pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += rows;
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0)? NULL:pRes;
}
......
......@@ -1146,13 +1146,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes);
return pBInfo->pRes;
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pBInfo->pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
}
int32_t order = TSDB_ORDER_ASC;
......@@ -1178,15 +1187,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes);
size_t rows = pBInfo->pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
return (rows == 0) ? NULL : pBInfo->pRes;
if (pBInfo->pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
}
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
......@@ -1880,12 +1896,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes);
return pBInfo->pRes->info.rows > 0 ? pBInfo->pRes : NULL;
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
if (pBInfo->pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
}
int64_t st = taosGetTimestampUs();
......@@ -1914,15 +1940,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes);
size_t rows = pBInfo->pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
break;
}
return (rows == 0) ? NULL : pBInfo->pRes;
if (pBInfo->pRes->info.rows > 0) {
break;
}
}
pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows;
return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes;
}
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
......@@ -2235,7 +2268,7 @@ _error:
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId,
SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo) {
SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo) {
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -2246,6 +2279,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pInfo->stateKey.type = pInfo->stateCol.type;
pInfo->stateKey.bytes = pInfo->stateCol.bytes;
pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
pInfo->pCondition = pCondition;
if (pInfo->stateKey.pData == NULL) {
goto _error;
}
......@@ -2289,7 +2323,7 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
STimeWindowAggSupp* pTwAggSupp, SNode* pCondition, SExecTaskInfo* pTaskInfo) {
SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -2315,6 +2349,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
pInfo->pCondition = pCondition;
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
pOperator->blocking = true;
......@@ -2988,7 +3023,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark,
uint8_t type) {
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
SStreamBlockScanInfo* pScanInfo = downstream->info;
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type};
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
}
......@@ -3545,8 +3580,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup,
SArray* pClosed, __get_win_info_ fn, bool delete) {
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn,
bool delete) {
// Todo(liuyao) save window to tdb
void** pIte = NULL;
size_t keyLen = 0;
......@@ -3708,8 +3743,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForSession, pInfo->ignoreExpiredData);
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession,
pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated);
......@@ -4210,8 +4245,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForState, pInfo->ignoreExpiredData);
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState,
pInfo->ignoreExpiredData);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated);
......
MESSAGE(STATUS "build parser unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(executorTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
executorTest
PRIVATE os util common transport gtest taos_static qcom executor function planner scalar nodes vnode
)
ADD_EXECUTABLE(executorTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
executorTest
PRIVATE os util common transport gtest taos_static qcom executor function planner scalar nodes vnode
)
TARGET_INCLUDE_DIRECTORIES(
executorTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/executor/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/executor/inc"
)
TARGET_INCLUDE_DIRECTORIES(
executorTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/executor/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/executor/inc"
)
ENDIF ()
\ No newline at end of file
......@@ -75,6 +75,10 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) {
#ifdef WINDOWS
GetModuleFileName(NULL, path, PATH_MAX);
taosDirName(path);
#elif defined(_TD_DARWIN_64)
uint32_t pathSize = sizeof(path);
_NSGetExecutablePath(path, &pathSize);
taosDirName(path);
#endif
} else {
strncpy(path, tsProcPath, strlen(tsProcPath));
......
add_executable(idxTest "")
add_executable(idxFstTest "")
add_executable(idxFstUT "")
add_executable(idxUtilUT "")
add_executable(idxJsonUT "")
IF(NOT TD_DARWIN)
add_executable(idxTest "")
add_executable(idxFstTest "")
add_executable(idxFstUT "")
add_executable(idxUtilUT "")
add_executable(idxJsonUT "")
target_sources(idxTest
PRIVATE
"indexTests.cc"
)
target_sources(idxFstTest
PRIVATE
"fstTest.cc"
)
target_sources(idxTest
PRIVATE
"indexTests.cc"
)
target_sources(idxFstTest
PRIVATE
"fstTest.cc"
)
target_sources(idxFstUT
PRIVATE
"fstUT.cc"
)
target_sources(idxUtilUT
PRIVATE
"utilUT.cc"
)
target_sources(idxFstUT
PRIVATE
"fstUT.cc"
)
target_sources(idxUtilUT
PRIVATE
"utilUT.cc"
)
target_sources(idxJsonUT
PRIVATE
"jsonUT.cc"
)
target_include_directories (idxTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (idxFstTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_sources(idxJsonUT
PRIVATE
"jsonUT.cc"
)
target_include_directories (idxTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (idxFstTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (idxFstUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (idxFstUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (idxUtilUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (idxUtilUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories (idxJsonUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (idxTest
os
util
common
gtest_main
index
)
target_link_libraries (idxFstTest
os
util
common
gtest_main
index
)
target_link_libraries (idxFstUT
os
util
common
gtest_main
index
)
target_include_directories (idxJsonUT
PUBLIC
"${TD_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (idxTest
os
util
common
gtest_main
index
)
target_link_libraries (idxFstTest
os
util
common
gtest_main
index
)
target_link_libraries (idxFstUT
os
util
common
gtest_main
index
)
target_link_libraries (idxUtilUT
os
util
common
gtest_main
index
)
target_link_libraries (idxUtilUT
os
util
common
gtest_main
index
)
target_link_libraries (idxJsonUT
os
util
common
gtest_main
index
)
target_link_libraries (idxJsonUT
os
util
common
gtest_main
index
)
add_test(
NAME idxtest
COMMAND idxTest
)
add_test(
NAME idxJsonUT
COMMAND idxJsonUT
)
add_test(
NAME idxUtilUT
COMMAND idxUtilUT
)
add_test(
NAME idxFstUT
COMMAND idxFstUT
)
add_test(
NAME idxtest
COMMAND idxTest
)
add_test(
NAME idxJsonUT
COMMAND idxJsonUT
)
add_test(
NAME idxUtilUT
COMMAND idxUtilUT
)
add_test(
NAME idxFstUT
COMMAND idxFstUT
)
ENDIF ()
\ No newline at end of file
......@@ -890,7 +890,7 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN
return code;
}
static int32_t pushDownCondOptDealLogicNode(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
static int32_t pushDownCondOptTrivialPushDown(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
if (NULL == pLogicNode->pConditions ||
OPTIMIZE_FLAG_TEST_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS;
......@@ -921,7 +921,7 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog
break;
case QUERY_NODE_LOGIC_PLAN_SORT:
case QUERY_NODE_LOGIC_PLAN_PARTITION:
code = pushDownCondOptDealLogicNode(pCxt, pLogicNode);
code = pushDownCondOptTrivialPushDown(pCxt, pLogicNode);
break;
default:
break;
......
MESSAGE(STATUS "build planner unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(plannerTest
ADD_EXECUTABLE(plannerTest
${SOURCE_LIST}
"${SOURCE_LIST}/../../../parser/test/mockCatalog.cpp"
"${SOURCE_LIST}/../../../parser/test/mockCatalogService.cpp"
)
)
TARGET_LINK_LIBRARIES(
plannerTest
PUBLIC os util common nodes planner parser catalog transport gtest function qcom
)
TARGET_LINK_LIBRARIES(
plannerTest
PUBLIC os util common nodes planner parser catalog transport gtest function qcom
)
TARGET_INCLUDE_DIRECTORIES(
plannerTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/planner/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/planner/inc"
PRIVATE "${TD_SOURCE_DIR}/source/libs/parser/test"
)
TARGET_INCLUDE_DIRECTORIES(
plannerTest
PUBLIC "${TD_SOURCE_DIR}/include/libs/planner/"
PRIVATE "${TD_SOURCE_DIR}/source/libs/planner/inc"
PRIVATE "${TD_SOURCE_DIR}/source/libs/parser/test"
)
if(${BUILD_WINGETOPT})
target_include_directories(
plannerTest
PUBLIC "${TD_SOURCE_DIR}/contrib/wingetopt/src"
)
target_link_libraries(plannerTest PUBLIC wingetopt)
endif()
if(${BUILD_WINGETOPT})
target_include_directories(
plannerTest
PUBLIC "${TD_SOURCE_DIR}/contrib/wingetopt/src"
)
target_link_libraries(plannerTest PUBLIC wingetopt)
endif()
add_test(
NAME plannerTest
COMMAND plannerTest
)
add_test(
NAME plannerTest
COMMAND plannerTest
)
ENDIF ()
\ No newline at end of file
......@@ -17,6 +17,7 @@
#define _STREAM_INC_H_
#include "executor.h"
#include "tref.h"
#include "tstream.h"
#ifdef __cplusplus
......@@ -24,8 +25,9 @@ extern "C" {
#endif
typedef struct {
int8_t inited;
void* timer;
int8_t inited;
int32_t refPool;
void* timer;
} SStreamGlobalEnv;
static SStreamGlobalEnv streamEnv;
......
......@@ -76,9 +76,6 @@ void streamTriggerByTimer(void* param, void* tmrId) {
int32_t streamSetupTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) {
if (streamInit() < 0) {
return -1;
}
pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
}
......
......@@ -32,8 +32,8 @@ typedef struct SSyncLogStoreData {
SSyncNode* pSyncNode;
SWal* pWal;
TdThreadMutex mutex;
SWalReadHandle* pWalHandle;
TdThreadMutex mutex;
SWalReader* pWalHandle;
// SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
} SSyncLogStoreData;
......
......@@ -62,7 +62,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
ASSERT(pData->pWal != NULL);
taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReadHandle(pData->pWal);
pData->pWalHandle = walOpenReader(pData->pWal, NULL);
ASSERT(pData->pWalHandle != NULL);
pLogStore->appendEntry = logStoreAppendEntry;
......@@ -95,7 +95,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
taosThreadMutexLock(&(pData->mutex));
if (pData->pWalHandle != NULL) {
walCloseReadHandle(pData->pWalHandle);
walCloseReader(pData->pWalHandle);
pData->pWalHandle = NULL;
}
taosThreadMutexUnlock(&(pData->mutex));
......@@ -255,7 +255,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
*ppEntry = NULL;
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle;
SWalReader* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
......@@ -263,7 +263,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
taosThreadMutexLock(&(pData->mutex));
code = walReadWithHandle(pWalHandle, index);
code = walReadVer(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
......@@ -398,10 +398,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
taosThreadMutexLock(&(pData->mutex));
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle;
SWalReader* pWalHandle = pData->pWalHandle;
ASSERT(pWalHandle != NULL);
int32_t code = walReadWithHandle(pWalHandle, index);
int32_t code = walReadVer(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
......
......@@ -16,7 +16,11 @@
class TfsTest : public ::testing::Test {
protected:
#ifdef _TD_DARWIN_64
static void SetUpTestSuite() { root = "/private" TD_TMP_DIR_PATH "tfsTest"; }
#else
static void SetUpTestSuite() { root = TD_TMP_DIR_PATH "tfsTest"; }
#endif
static void TearDownTestSuite() {}
public:
......@@ -299,6 +303,17 @@ TEST_F(TfsTest, 04_File) {
TEST_F(TfsTest, 05_MultiDisk) {
int32_t code = 0;
#ifdef _TD_DARWIN_64
const char *root00 = "/private" TD_TMP_DIR_PATH "tfsTest00";
const char *root01 = "/private" TD_TMP_DIR_PATH "tfsTest01";
const char *root10 = "/private" TD_TMP_DIR_PATH "tfsTest10";
const char *root11 = "/private" TD_TMP_DIR_PATH "tfsTest11";
const char *root12 = "/private" TD_TMP_DIR_PATH "tfsTest12";
const char *root20 = "/private" TD_TMP_DIR_PATH "tfsTest20";
const char *root21 = "/private" TD_TMP_DIR_PATH "tfsTest21";
const char *root22 = "/private" TD_TMP_DIR_PATH "tfsTest22";
const char *root23 = "/private" TD_TMP_DIR_PATH "tfsTest23";
#else
const char *root00 = TD_TMP_DIR_PATH "tfsTest00";
const char *root01 = TD_TMP_DIR_PATH "tfsTest01";
const char *root10 = TD_TMP_DIR_PATH "tfsTest10";
......@@ -308,6 +323,7 @@ TEST_F(TfsTest, 05_MultiDisk) {
const char *root21 = TD_TMP_DIR_PATH "tfsTest21";
const char *root22 = TD_TMP_DIR_PATH "tfsTest22";
const char *root23 = TD_TMP_DIR_PATH "tfsTest23";
#endif
SDiskCfg dCfg[9] = {0};
tstrncpy(dCfg[0].dir, root01, TSDB_FILENAME_LEN);
......
......@@ -19,6 +19,7 @@
#include "taoserror.h"
#include "tchecksum.h"
#include "tcoding.h"
#include "tcommon.h"
#include "tcompare.h"
#include "wal.h"
......
......@@ -16,20 +16,29 @@
#include "taoserror.h"
#include "walInt.h"
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle));
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
static int32_t walFetchBodyNew(SWalReader *pRead);
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
SWalReader *pRead = taosMemoryMalloc(sizeof(SWalReader));
if (pRead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pRead->pWal = pWal;
pRead->pReadIdxTFile = NULL;
pRead->pReadLogTFile = NULL;
pRead->pIdxFile = NULL;
pRead->pLogFile = NULL;
pRead->curVersion = -1;
pRead->curFileFirstVer = -1;
pRead->capacity = 0;
pRead->status = 0;
if (cond)
pRead->cond = *cond;
else {
pRead->cond.scanMeta = 0;
pRead->cond.scanUncommited = 0;
}
taosThreadMutexInit(&pRead->mutex, NULL);
......@@ -39,26 +48,46 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
taosMemoryFree(pRead);
return NULL;
}
return pRead;
}
void walCloseReadHandle(SWalReadHandle *pRead) {
taosCloseFile(&pRead->pReadIdxTFile);
taosCloseFile(&pRead->pReadLogTFile);
void walCloseReader(SWalReader *pRead) {
taosCloseFile(&pRead->pIdxFile);
taosCloseFile(&pRead->pLogFile);
taosMemoryFreeClear(pRead->pHead);
taosMemoryFree(pRead);
}
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
// TODO
return 0;
int32_t walNextValidMsg(SWalReader *pRead) {
int64_t fetchVer = pRead->curVersion;
int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal);
while (fetchVer <= endVer) {
if (walFetchHeadNew(pRead, fetchVer) < 0) {
return -1;
}
if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT ||
(IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) {
if (walFetchBodyNew(pRead) < 0) {
return -1;
}
return 0;
} else {
if (walSkipFetchBodyNew(pRead) < 0) {
return -1;
}
fetchVer++;
ASSERT(fetchVer == pRead->curVersion);
}
}
return -1;
}
static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0;
TdFilePtr pIdxTFile = pRead->pReadIdxTFile;
TdFilePtr pLogTFile = pRead->pReadLogTFile;
TdFilePtr pIdxTFile = pRead->pIdxFile;
TdFilePtr pLogTFile = pRead->pLogFile;
// seek position
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
......@@ -90,11 +119,11 @@ static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
return ret;
}
static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
char fnameStr[WAL_FILE_LEN];
taosCloseFile(&pRead->pReadIdxTFile);
taosCloseFile(&pRead->pReadLogTFile);
taosCloseFile(&pRead->pIdxFile);
taosCloseFile(&pRead->pLogFile);
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
......@@ -104,7 +133,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return -1;
}
pRead->pReadLogTFile = pLogTFile;
pRead->pLogFile = pLogTFile;
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
......@@ -114,11 +143,11 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
return -1;
}
pRead->pReadIdxTFile = pIdxTFile;
pRead->pIdxFile = pIdxTFile;
return 0;
}
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
SWal *pWal = pRead->pWal;
if (ver == pRead->curVersion) {
return 0;
......@@ -153,9 +182,94 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0;
}
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; }
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
int64_t contLen;
if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) return -1;
}
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen != sizeof(SWalCkHead)) {
if (contLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curVersion = -1;
return -1;
}
return 0;
}
static int32_t walFetchBodyNew(SWalReader *pRead) {
SWalCont *pReadHead = &pRead->pHead->head;
int64_t ver = pReadHead->version;
if (pRead->capacity < pReadHead->bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
pRead->pHead = ptr;
pReadHead = &pRead->pHead->head;
pRead->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since %s",
pRead->pHead->head.version, ver, tstrerror(terrno));
} else {
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since file corrupted",
pRead->pHead->head.version, ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
pRead->curVersion = -1;
ASSERT(0);
return -1;
}
if (pReadHead->version != ver) {
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
if (walValidBodyCksum(pRead->pHead) != 0) {
wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1;
}
pRead->curVersion = ver + 1;
return 0;
}
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int64_t code;
ASSERT(pRead->curVersion == pRead->pHead->head.version);
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1;
return -1;
}
pRead->curVersion++;
return 0;
}
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int64_t code;
// TODO: valid ver
......@@ -168,9 +282,9 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
if (code < 0) return -1;
}
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
ASSERT(taosValidFile(pRead->pLogFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalCkHead));
code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) {
return -1;
}
......@@ -186,12 +300,12 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
return 0;
}
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code;
ASSERT(pRead->curVersion == pHead->head.version);
code = taosLSeekFile(pRead->pReadLogTFile, pHead->head.bodyLen, SEEK_CUR);
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curVersion = -1;
......@@ -203,7 +317,7 @@ int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
return 0;
}
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
SWalCont *pReadHead = &((*ppHead)->head);
int64_t ver = pReadHead->version;
......@@ -218,7 +332,7 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
pRead->capacity = pReadHead->bodyLen;
}
if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
ASSERT(0);
return -1;
}
......@@ -241,9 +355,9 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
return 0;
}
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHead) {
int32_t walReadWithHandle_s(SWalReader *pRead, int64_t ver, SWalCont **ppHead) {
taosThreadMutexLock(&pRead->mutex);
if (walReadWithHandle(pRead, ver) < 0) {
if (walReadVer(pRead, ver) < 0) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
......@@ -257,7 +371,7 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHea
return 0;
}
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
int64_t code;
if (pRead->pWal->vers.firstVer == -1) {
......@@ -280,9 +394,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return -1;
}
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
ASSERT(taosValidFile(pRead->pLogFile) == true);
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalCkHead));
code = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (code != sizeof(SWalCkHead)) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -310,7 +424,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
pRead->capacity = pRead->pHead->head.bodyLen;
}
if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
if ((code = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
pRead->pHead->head.bodyLen) {
if (code < 0)
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -340,46 +454,3 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return 0;
}
#if 0
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code;
code = walSeekVer(pWal, ver);
if (code != 0) {
return code;
}
if (*ppHead == NULL) {
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead));
if (ptr == NULL) {
return -1;
}
*ppHead = ptr;
}
if (tfRead(pWal->pWriteLogTFile, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
return -1;
}
// TODO: endian compatibility processing after read
if (walValidHeadCksum(*ppHead) != 0) {
return -1;
}
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
if (ptr == NULL) {
taosMemoryFree(*ppHead);
*ppHead = NULL;
return -1;
}
if (tfRead(pWal->pWriteLogTFile, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
return -1;
}
// TODO: endian compatibility processing after read
if (walValidBodyCksum(*ppHead) != 0) {
return -1;
}
return 0;
}
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
return 0;
}
#endif
......@@ -79,19 +79,21 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
}
int32_t walRollback(SWal *pWal, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
int64_t code;
char fnameStr[WAL_FILE_LEN];
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
terrno = TSDB_CODE_WAL_INVALID_VER;
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
taosThreadMutexLock(&pWal->mutex);
// find correct file
if (ver < walGetLastFileFirstVer(pWal)) {
// change current files
code = walChangeWrite(pWal, ver);
if (code < 0) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
......@@ -146,6 +148,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
ASSERT(taosValidFile(pLogTFile));
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) {
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
code = walValidHeadCksum(&head);
......@@ -154,11 +157,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
if (code != 0) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
if (head.head.version != ver) {
ASSERT(0);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
......@@ -167,12 +172,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
if (code < 0) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
code = taosFtruncateFile(pIdxTFile, idxOff);
if (code < 0) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
taosThreadMutexUnlock(&pWal->mutex);
return -1;
}
pWal->vers.lastVer = ver - 1;
......
......@@ -292,8 +292,8 @@ TEST_F(WalCleanDeleteEnv, roll) {
TEST_F(WalKeepEnv, readHandleRead) {
walResetEnv();
int code;
SWalReadHandle* pRead = walOpenReadHandle(pWal);
int code;
SWalReader* pRead = walOpenReader(pWal, NULL);
ASSERT(pRead != NULL);
int i;
......@@ -306,7 +306,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
}
for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 100;
code = walReadWithHandle(pRead, ver);
code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
......@@ -325,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
walCloseReadHandle(pRead);
walCloseReader(pRead);
}
TEST_F(WalRetentionEnv, repairMeta1) {
......@@ -354,12 +354,12 @@ TEST_F(WalRetentionEnv, repairMeta1) {
ASSERT_EQ(pWal->vers.lastVer, 99);
SWalReadHandle* pRead = walOpenReadHandle(pWal);
SWalReader* pRead = walOpenReader(pWal, NULL);
ASSERT(pRead != NULL);
for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 100;
code = walReadWithHandle(pRead, ver);
code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
......@@ -389,7 +389,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
for (int i = 0; i < 1000; i++) {
int ver = taosRand() % 200;
code = walReadWithHandle(pRead, ver);
code = walReadVer(pRead, ver);
ASSERT_EQ(code, 0);
// printf("rrbody: \n");
......
......@@ -857,19 +857,27 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
return;
}
buf[n] = '\0';
for (int i = n - 1; i >= 0; --i) {
if (buf[i] == '/') {
if (tz) {
tz = buf + i + 1;
break;
}
tz = buf + i + 1;
}
}
if (!tz || 0 == strchr(tz, '/')) {
char *zi = strstr(buf, "zoneinfo");
if (!zi) {
printf("parsing /etc/localtime failed");
return;
}
tz = zi + strlen("zoneinfo") + 1;
//for (int i = n - 1; i >= 0; --i) {
// if (buf[i] == '/') {
// if (tz) {
// tz = buf + i + 1;
// break;
// }
// tz = buf + i + 1;
// }
//}
//if (!tz || 0 == strchr(tz, '/')) {
// printf("parsing /etc/localtime failed");
// return;
//}
setenv("TZ", tz, 1);
tzset();
......@@ -900,7 +908,7 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
int n = readlink("/etc/localtime", buf, sizeof(buf));
if (n < 0) {
printf("read /etc/localtime error, reason:%s", strerror(errno));
if (taosCheckExistFile("/etc/timezone")) {
/*
* NOTE: do not remove it.
......@@ -962,19 +970,27 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
return;
}
buf[n] = '\0';
for (int i = n - 1; i >= 0; --i) {
if (buf[i] == '/') {
if (tz) {
tz = buf + i + 1;
break;
}
tz = buf + i + 1;
}
}
if (!tz || 0 == strchr(tz, '/')) {
char *zi = strstr(buf, "zoneinfo");
if (!zi) {
printf("parsing /etc/localtime failed");
return;
}
tz = zi + strlen("zoneinfo") + 1;
//for (int i = n - 1; i >= 0; --i) {
// if (buf[i] == '/') {
// if (tz) {
// tz = buf + i + 1;
// break;
// }
// tz = buf + i + 1;
// }
//}
//if (!tz || 0 == strchr(tz, '/')) {
// printf("parsing /etc/localtime failed");
// return;
//}
setenv("TZ", tz, 1);
tzset();
......
......@@ -64,6 +64,20 @@ int32_t strdequote(char *z) {
return j + 1; // only one quote, do nothing
}
char *strDupUnquo(const char *src) {
if (src == NULL) return NULL;
if (src[0] != '`') return strdup(src);
int32_t len = (int32_t)strlen(src);
if (src[len - 1] != '`') return NULL;
char *ret = taosMemoryMalloc(len);
if (ret == NULL) return NULL;
for (int32_t i = 0; i < len - 1; i++) {
ret[i] = src[i + 1];
}
ret[len - 1] = 0;
return ret;
}
size_t strtrim(char *z) {
int32_t i = 0;
int32_t j = 0;
......
......@@ -141,7 +141,7 @@ if $data01 != 7 then
goto loop1
endi
if $data02 != 9 then
if $data02 != 18 then
print =====data02=$data02
goto loop1
endi
......@@ -151,22 +151,22 @@ if $data03 != 4 then
goto loop1
endi
if $data04 != 1.100000000 then
if $data04 != 1.000000000 then
print ======$data04
return -1
endi
if $data05 != 0.816496581 then
if $data05 != 1.154700538 then
print ======$data05
return -1
endi
if $data06 != 3 then
if $data06 != 4 then
print ======$data06
return -1
endi
if $data07 != 1.100000000 then
if $data07 != 1.000000000 then
print ======$data07
return -1
endi
......@@ -235,7 +235,7 @@ sql create stream streams4 trigger at_once watermark 1d into streamt4 as select
# sql create stream streams6 trigger at_once watermark 1d into streamt6 as select _wstartts, bottom(b,3), a,c from t1 session(ts,10s);
# sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), elapsed(ts), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams7 trigger at_once watermark 1d into streamt7 as select _wstartts, spread(a), hyperloglog(a) from t1 session(ts,10s);
sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s);
# sql create stream streams8 trigger at_once watermark 1d into streamt8 as select _wstartts, histogram(a,"user_input", "[1,3,5,7]", 1), histogram(a,"user_input", "[1,3,5,7]", 0) from t1 session(ts,10s);
sql insert into t1 values(1648791213001,1,1,1,1.0);
sql insert into t1 values(1648791213002,2,3,2,3.4);
sql insert into t1 values(1648791213003,4,9,3,4.8);
......@@ -288,10 +288,10 @@ if $rows == 0 then
goto loop3
endi
sql select * from streamt8;
if $rows == 0 then
print ======$rows
goto loop3
endi
#sql select * from streamt8;
#if $rows == 0 then
# print ======$rows
# goto loop3
#endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
......@@ -33,9 +34,10 @@ sql drop dnode 2
sql alter dnode 1 'debugflag 143'
print =============== step4: create show database
sql create database d1 vgroups 1
sql create database d1 vgroups 1 buffer 3
sql show databases
sql show d1.vgroups
sql use d1
sql show vgroups
print =============== step5: create show stable
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
......@@ -44,8 +46,6 @@ if $rows != 1 then
return -1
endi
goto _OVER
print =============== step6: create show table
sql create table ct1 using stb tags(1000)
sql show tables
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
......@@ -19,7 +20,7 @@ if $rows != 1 then
endi
print =============== step2: create db
sql create database db vgroups 1
sql create database db vgroups 1 buffer 3
sql use db
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
......@@ -28,7 +29,10 @@ sql alter user u1 pass 'taosdata'
sql drop user u1
sql_error alter user u2 sysinfo 0
print =============== step3:
print =============== step3: create drop dnode
sql create dnode $hostname port 7200
sql drop dnode 2
sql alter dnode 1 'debugflag 143'
print =============== stop
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/exec.sh -n dnode1 -s start -v
sql connect
......@@ -19,7 +20,17 @@ if $rows != 1 then
endi
print =============== step2: create db
sql create database db vgroups 1
sql create database d1 vgroups 1 buffer 3
sql show databases
sql use d1
sql show vgroups
print =============== step3: create show stable
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int unsigned)
sql show stables
if $rows != 1 then
return -1
endi
_OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -29,7 +40,7 @@ print ----> start to check if there are ERRORS in vagrind log file for each dnod
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content <= 8 then
if $system_content <= 0 then
return 0
endi
......
......@@ -25,7 +25,7 @@ class TDTestCase:
paraDict = {'dbName': 'db2',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
......@@ -44,7 +44,7 @@ class TDTestCase:
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=1,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
......
#python3 ./test.py -f 2-query/last.py -Q 3
......@@ -295,7 +295,7 @@ python3 ./test.py -f 2-query/Today.py -Q 3
python3 ./test.py -f 2-query/max.py -Q 3
python3 ./test.py -f 2-query/min.py -Q 3
python3 ./test.py -f 2-query/count.py -Q 3
python3 ./test.py -f 2-query/last.py -Q 3
#python3 ./test.py -f 2-query/last.py -Q 3
python3 ./test.py -f 2-query/first.py -Q 3
python3 ./test.py -f 2-query/To_iso8601.py -Q 3
python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 3
......
Subproject commit c885e967e490105999b84d009a15168728dfafaf
Subproject commit 389047db713a3dddfbce292c3260b0864b17d936
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册