diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h
index b08e0aff3de6be2677b296407d1551d587c5de60..298dffcc839e22226a89932b2571a90ffaa197d0 100644
--- a/include/libs/nodes/querynodes.h
+++ b/include/libs/nodes/querynodes.h
@@ -351,9 +351,6 @@ bool nodesIsComparisonOp(const SOperatorNode* pOp);
bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsRegularOp(const SOperatorNode* pOp);
-bool nodesIsTimeorderQuery(const SNode* pQuery);
-bool nodesIsTimelineQuery(const SNode* pQuery);
-
void* nodesGetValueFromNode(SValueNode* pNode);
int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value);
char* nodesGetStrValueFromNode(SValueNode* pNode);
diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h
index 831063c606e3c65ac9caba11802a9b92332fe184..9b6593e4b5bb4c8aad5018b3b92f73c7e1d52794 100644
--- a/include/libs/sync/sync.h
+++ b/include/libs/sync/sync.h
@@ -20,17 +20,23 @@
extern "C" {
#endif
-#include "os.h"
-
#include "cJSON.h"
#include "tdef.h"
#include "tmsgcb.h"
+#define SYNC_INDEX_BEGIN 0
+#define SYNC_INDEX_INVALID -1
+
typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef uint64_t SyncTerm;
+typedef struct SSyncNode SSyncNode;
+typedef struct SSyncBuffer SSyncBuffer;
+typedef struct SWal SWal;
+typedef struct SSyncRaftEntry SSyncRaftEntry;
+
typedef enum {
TAOS_SYNC_STATE_FOLLOWER = 100,
TAOS_SYNC_STATE_CANDIDATE = 101,
@@ -38,6 +44,17 @@ typedef enum {
TAOS_SYNC_STATE_ERROR = 103,
} ESyncState;
+typedef enum {
+ TAOS_SYNC_PROPOSE_SUCCESS = 0,
+ TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
+ TAOS_SYNC_PROPOSE_OTHER_ERROR = 2,
+} ESyncProposeCode;
+
+typedef enum {
+ TAOS_SYNC_FSM_CB_SUCCESS = 0,
+ TAOS_SYNC_FSM_CB_OTHER_ERROR = 1,
+} ESyncFsmCbCode;
+
typedef struct SNodeInfo {
uint16_t nodePort;
char nodeFqdn[TSDB_FQDN_LEN];
@@ -55,11 +72,6 @@ typedef struct SSnapshot {
SyncTerm lastApplyTerm;
} SSnapshot;
-typedef enum {
- TAOS_SYNC_FSM_CB_SUCCESS = 0,
- TAOS_SYNC_FSM_CB_OTHER_ERROR,
-} ESyncFsmCbCode;
-
typedef struct SFsmCbMeta {
SyncIndex index;
bool isWeak;
@@ -68,27 +80,15 @@ typedef struct SFsmCbMeta {
uint64_t seqNum;
} SFsmCbMeta;
-struct SRpcMsg;
-typedef struct SRpcMsg SRpcMsg;
-
typedef struct SSyncFSM {
void* data;
-
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
-
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
-
} SSyncFSM;
-struct SSyncRaftEntry;
-typedef struct SSyncRaftEntry SSyncRaftEntry;
-
-#define SYNC_INDEX_BEGIN 0
-#define SYNC_INDEX_INVALID -1
-
// abstract definition of log store in raft
// SWal implements it
typedef struct SSyncLogStore {
@@ -117,11 +117,6 @@ typedef struct SSyncLogStore {
} SSyncLogStore;
-struct SWal;
-typedef struct SWal SWal;
-
-struct SEpSet;
-typedef struct SEpSet SEpSet;
typedef struct SSyncInfo {
SyncGroupId vgId;
@@ -130,10 +125,8 @@ typedef struct SSyncInfo {
SWal* pWal;
SSyncFSM* pFsm;
SMsgCb* msgcb;
-
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
-
} SSyncInfo;
int32_t syncInit();
@@ -148,27 +141,8 @@ const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncGetVgId(int64_t rid);
-
-typedef enum {
- TAOS_SYNC_PROPOSE_SUCCESS = 0,
- TAOS_SYNC_PROPOSE_NOT_LEADER,
- TAOS_SYNC_PROPOSE_OTHER_ERROR,
-} ESyncProposeCode;
-
-int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
-
-bool syncEnvIsStart();
-
-extern int32_t sDebugFlag;
-
-//-----------------------------------------
-struct SSyncNode;
-typedef struct SSyncNode SSyncNode;
-
-struct SSyncBuffer;
-typedef struct SSyncBuffer SSyncBuffer;
-//-----------------------------------------
-
+int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
+bool syncEnvIsStart();
const char* syncStr(ESyncState state);
#ifdef __cplusplus
diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h
index 01c25b93cc7e3264f05280d8eb87fa0b732a7bd8..4b160c9e6163946edd6fee236ca99f4c665a0f15 100644
--- a/include/libs/sync/syncTools.h
+++ b/include/libs/sync/syncTools.h
@@ -20,9 +20,6 @@
extern "C" {
#endif
-#include "os.h"
-
-#include "cJSON.h"
#include "trpc.h"
// ------------------ ds -------------------
@@ -32,9 +29,6 @@ typedef struct SRaftId {
} SRaftId;
// ------------------ control -------------------
-struct SSyncNode;
-typedef struct SSyncNode SSyncNode;
-
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
diff --git a/include/util/taoserror.h b/include/util/taoserror.h
index 5c251e7a275f67b09e7ce56a4b999935ceb83022..66287099cdc81783c1a7b1ba6e42c8265945cf63 100644
--- a/include/util/taoserror.h
+++ b/include/util/taoserror.h
@@ -650,6 +650,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_FUNCTION_NAME TAOS_DEF_ERROR_CODE(0, 0x264D)
#define TSDB_CODE_PAR_COMMENT_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x264E)
#define TSDB_CODE_PAR_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x264F)
+#define TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY TAOS_DEF_ERROR_CODE(0, 0x2650)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c
index 685b0cfa0403831cddaad89d0f3b02f31e5da4a4..1dda4c3024381be4f8a762a2c13dc9d23f70633f 100644
--- a/source/client/src/clientImpl.c
+++ b/source/client/src/clientImpl.c
@@ -23,6 +23,8 @@
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tref.h"
+#include "cJSON.h"
+#include "tdataformat.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
@@ -268,7 +270,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
- } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
+ } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
}
@@ -803,6 +805,101 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return TSDB_CODE_SUCCESS;
}
+static char* parseTagDatatoJson(void *p){
+ char* string = NULL;
+ cJSON *json = cJSON_CreateObject();
+ if (json == NULL)
+ {
+ goto end;
+ }
+
+ int16_t nCols = kvRowNCols(p);
+ char tagJsonKey[256] = {0};
+ for (int j = 0; j < nCols; ++j) {
+ SColIdx * pColIdx = kvRowColIdxAt(p, j);
+ char* val = (char*)(kvRowColVal(p, pColIdx));
+ if (j == 0){
+ if(*val == TSDB_DATA_TYPE_NULL){
+ string = taosMemoryCalloc(1, 8);
+ sprintf(varDataVal(string), "%s", TSDB_DATA_NULL_STR_L);
+ varDataSetLen(string, strlen(varDataVal(string)));
+ goto end;
+ }
+ continue;
+ }
+
+ // json key encode by binary
+ memset(tagJsonKey, 0, sizeof(tagJsonKey));
+ memcpy(tagJsonKey, varDataVal(val), varDataLen(val));
+ // json value
+ val += varDataTLen(val);
+ char* realData = POINTER_SHIFT(val, CHAR_BYTES);
+ char type = *val;
+ if(type == TSDB_DATA_TYPE_NULL) {
+ cJSON* value = cJSON_CreateNull();
+ if (value == NULL)
+ {
+ goto end;
+ }
+ cJSON_AddItemToObject(json, tagJsonKey, value);
+ }else if(type == TSDB_DATA_TYPE_NCHAR) {
+ cJSON* value = NULL;
+ if (varDataLen(realData) > 0){
+ char *tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
+ int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(realData), varDataLen(realData), tagJsonValue);
+ if (length < 0) {
+ tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val);
+ taosMemoryFree(tagJsonValue);
+ goto end;
+ }
+ value = cJSON_CreateString(tagJsonValue);
+ taosMemoryFree(tagJsonValue);
+ if (value == NULL)
+ {
+ goto end;
+ }
+ }else if(varDataLen(realData) == 0){
+ value = cJSON_CreateString("");
+ }else{
+ ASSERT(0);
+ }
+
+ cJSON_AddItemToObject(json, tagJsonKey, value);
+ }else if(type == TSDB_DATA_TYPE_DOUBLE){
+ double jsonVd = *(double*)(realData);
+ cJSON* value = cJSON_CreateNumber(jsonVd);
+ if (value == NULL)
+ {
+ goto end;
+ }
+ cJSON_AddItemToObject(json, tagJsonKey, value);
+// }else if(type == TSDB_DATA_TYPE_BIGINT){
+// int64_t jsonVd = *(int64_t*)(realData);
+// cJSON* value = cJSON_CreateNumber((double)jsonVd);
+// if (value == NULL)
+// {
+// goto end;
+// }
+// cJSON_AddItemToObject(json, tagJsonKey, value);
+ }else if (type == TSDB_DATA_TYPE_BOOL) {
+ char jsonVd = *(char*)(realData);
+ cJSON* value = cJSON_CreateBool(jsonVd);
+ if (value == NULL)
+ {
+ goto end;
+ }
+ cJSON_AddItemToObject(json, tagJsonKey, value);
+ }else{
+ ASSERT(0);
+ }
+
+ }
+ string = cJSON_PrintUnformatted(json);
+end:
+ cJSON_Delete(json);
+ return string;
+}
+
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t type = pResultInfo->fields[i].type;
@@ -833,9 +930,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
pResultInfo->row[i] = pResultInfo->pCol[i].pData;
- }
-
- if (type == TSDB_DATA_TYPE_JSON) {
+ }else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@@ -848,6 +943,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData;
+
int32_t jsonInnerType = *pStart;
char* jsonInnerData = pStart + CHAR_BYTES;
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
@@ -855,15 +951,9 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
- int32_t length =
- taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst));
-
- if (length <= 0) {
- tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
- varDataVal(jsonInnerData));
- length = 0;
- }
- varDataSetLen(dst, length);
+ char *jsonString = parseTagDatatoJson(jsonInnerData);
+ STR_TO_VARSTR(dst, jsonString);
+ taosMemoryFree(jsonString);
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
*(char*)varDataVal(dst) = '\"';
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c
index 4d77f4eb71e3a2e59b6d3fca3996507e12eac8c5..736cb9854958d3ae8cc0e34321347ef8cb8d0d9d 100644
--- a/source/common/src/tdatablock.c
+++ b/source/common/src/tdatablock.c
@@ -122,10 +122,14 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
- } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
- dataLen = LONG_BYTES;
+ } else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
+ dataLen = DOUBLE_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
+ } else if (*pData == TSDB_DATA_TYPE_JSON) {
+ dataLen = kvRowLen(pData + CHAR_BYTES);
+ } else {
+ ASSERT(0);
}
dataLen += CHAR_BYTES;
}
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index 7b5663c0a92845c0e81bde044ea42cf2f7e18303..d74d5a4d4e731cdf1cec4d3e61b2d30a33b467c0 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -40,11 +40,11 @@ bool tsPrintAuth = false;
// multi process
int32_t tsMultiProcess = 0;
-int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 128;
-int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128;
-int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
-int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
-int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128;
+int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 1024;
+int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 1024;
+int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
+int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
+int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsNumOfShmThreads = 1;
// queue & threads
@@ -380,11 +380,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
- if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
- if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
- if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
- if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
- if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1;
+ if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
+ if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
+ if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
+ if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
+ if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index b48a8775ce7924976ab3863d82c1907a404b8354..68d4216bae23c5cbf766ee887d6ade0c2e24e1a1 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -45,20 +45,20 @@ typedef struct SVnodeCfg SVnodeCfg;
extern const SVnodeCfg vnodeCfgDefault;
-int vnodeInit(int nthreads);
+int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup();
-int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
+int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode);
-int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
-int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
-int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
-int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
-int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
-int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
+int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
+int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
+int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
+int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
+int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
+int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
-int vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
+int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode);
@@ -74,8 +74,8 @@ typedef struct SMetaEntry SMetaEntry;
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderClear(SMetaReader *pReader);
-int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
-int metaReadNext(SMetaReader *pReader);
+int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
+int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
#if 1 // refact APIs below (TODO)
@@ -86,7 +86,7 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
-int metaTbCursorNext(SMTbCursor *pTbCur);
+int32_t metaTbCursorNext(SMTbCursor *pTbCur);
#endif
// tsdb
@@ -124,8 +124,8 @@ typedef struct STqReadHandle STqReadHandle;
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
-int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
-int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
+int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
+int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
@@ -207,15 +207,15 @@ struct SMetaReader {
SDecoder coder;
SMetaEntry me;
void *pBuf;
- int szBuf;
+ int32_t szBuf;
};
struct SMTbCursor {
TBC *pDbc;
void *pKey;
void *pVal;
- int kLen;
- int vLen;
+ int32_t kLen;
+ int32_t vLen;
SMetaReader mr;
};
diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h
index a034833a57a172d2aecc10d69a6e2e138a5422f0..eb3382ac4cd46a602a214b09b5a8debeaf15087f 100644
--- a/source/dnode/vnode/src/inc/vnd.h
+++ b/source/dnode/vnode/src/inc/vnd.h
@@ -24,7 +24,6 @@
extern "C" {
#endif
-// vnodeDebug ====================
// clang-format off
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
@@ -34,17 +33,17 @@ extern "C" {
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
-// vnodeCfg ====================
+// vnodeCfg.c
extern const SVnodeCfg vnodeCfgDefault;
-int vnodeCheckCfg(const SVnodeCfg*);
-int vnodeEncodeConfig(const void* pObj, SJson* pJson);
-int vnodeDecodeConfig(const SJson* pJson, void* pObj);
+int32_t vnodeCheckCfg(const SVnodeCfg*);
+int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson);
+int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj);
-// vnodeModule ====================
-int vnodeScheduleTask(int (*execute)(void*), void* arg);
+// vnodeModule.c
+int32_t vnodeScheduleTask(int32_t (*execute)(void*), void* arg);
-// vnodeBufPool ====================
+// vnodeBufPool.c
typedef struct SVBufPoolNode SVBufPoolNode;
struct SVBufPoolNode {
SVBufPoolNode* prev;
@@ -62,37 +61,29 @@ struct SVBufPool {
SVBufPoolNode node;
};
-int vnodeOpenBufPool(SVnode* pVnode, int64_t size);
-int vnodeCloseBufPool(SVnode* pVnode);
-void vnodeBufPoolReset(SVBufPool* pPool);
+int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size);
+int32_t vnodeCloseBufPool(SVnode* pVnode);
+void vnodeBufPoolReset(SVBufPool* pPool);
-// vnodeQuery ====================
-int vnodeQueryOpen(SVnode* pVnode);
-void vnodeQueryClose(SVnode* pVnode);
-int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
+// vnodeQuery.c
+int32_t vnodeQueryOpen(SVnode* pVnode);
+void vnodeQueryClose(SVnode* pVnode);
+int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
-// vnodeCommit ====================
-int vnodeBegin(SVnode* pVnode);
-int vnodeShouldCommit(SVnode* pVnode);
-int vnodeCommit(SVnode* pVnode);
-int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
-int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
-int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
-int vnodeSyncCommit(SVnode* pVnode);
-int vnodeAsyncCommit(SVnode* pVnode);
+// vnodeCommit.c
+int32_t vnodeBegin(SVnode* pVnode);
+int32_t vnodeShouldCommit(SVnode* pVnode);
+int32_t vnodeCommit(SVnode* pVnode);
+int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
+int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
+int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
+int32_t vnodeSyncCommit(SVnode* pVnode);
+int32_t vnodeAsyncCommit(SVnode* pVnode);
-// vnodeCommit ====================
+// vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
-int32_t vnodeSyncStart(SVnode* pVnode);
+void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
-void vnodeSyncSetMsgCb(SVnode* pVnode);
-int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg);
-int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg);
-void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
-void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
-void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
-int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
-SSyncFSM* syncVnodeMakeFsm();
#ifdef __cplusplus
}
diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c
index ae915b26f96efc2c20950c8a5ef0e496f18cc26b..b91622619fbb2675b24c028ce7065ce3b7c6096d 100644
--- a/source/dnode/vnode/src/meta/metaEntry.c
+++ b/source/dnode/vnode/src/meta/metaEntry.c
@@ -56,8 +56,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) {
- if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schema) < 0) return -1;
- if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
+ if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schema) < 0) return -1;
+ if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
} else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
@@ -67,10 +67,10 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
- if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schema) < 0) return -1;
+ if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) {
pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma));
- if(!pME->smaEntry.tsma) {
+ if (!pME->smaEntry.tsma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index 06c3b2913206722a5bb5f66b4c38ecc0f04387d2..652b38a86c97672804b9794ff7b5d8cb868b6e0b 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -425,6 +425,12 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
rowLen += pCond->colList[i].bytes;
}
+ // make sure the output SSDataBlock size be less than 2MB.
+ int32_t TWOMB = 2 * 1024 * 1024;
+ if (pReadHandle->outputCapacity * rowLen > TWOMB) {
+ pReadHandle->outputCapacity = TWOMB / rowLen;
+ }
+
// allocate buffer in order to load data blocks from file
pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
if (pReadHandle->suppInfo.pstatis == NULL) {
@@ -1302,20 +1308,22 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
- if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
- (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
+
+ bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
+ (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey));
+ if (cacheDataInFileBlockHole) {
// do not load file block into buffer
int32_t step = ascScan ? 1 : -1;
- TSKEY maxKey =
- ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
+ TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step);
cur->rows =
tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
pTsdbReadHandle->realNumOfRows = cur->rows;
// update the last key value
pCheckInfo->lastKey = cur->win.ekey + step;
- if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
+
+ if (!ascScan) {
TSWAP(cur->win.skey, cur->win.ekey);
}
@@ -1334,18 +1342,16 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
/*
* no data in cache, only load data from file
* during the query processing, data in cache will not be checked anymore.
- *
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer
*/
- assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
- if ((cur->pos == 0 && endPos == binfo.rows - 1 && ascScan) ||
- (cur->pos == (binfo.rows - 1) && endPos == 0 && (!ascScan))) {
+ bool wholeBlockReturned = ((abs(cur->pos - endPos) + 1) == binfo.rows);
+ if (wholeBlockReturned) {
pTsdbReadHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows;
- cur->win = binfo.window;
+ cur->win = binfo.window;
cur->mixBlock = false;
cur->blockCompleted = true;
@@ -1356,12 +1362,24 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
cur->lastKey = binfo.window.skey - 1;
cur->pos = -1;
}
- } else { // partially copy to dest buffer
+ } else { // partially copy to dest buffer
+ // make sure to only load once
+ bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows -1 && (!ascScan)));
+ if (pTsdbReadHandle->outputCapacity < binfo.rows && firstTimeExtract) {
+ code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot);
+ if (code != TSDB_CODE_SUCCESS) {
+ return code;
+ }
+ }
+
copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
cur->mixBlock = true;
}
- assert(cur->blockCompleted);
+ if (pTsdbReadHandle->outputCapacity >= binfo.rows) {
+ ASSERT(cur->blockCompleted);
+ }
+
if (cur->rows == binfo.rows) {
tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
@@ -1858,15 +1876,14 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
TSKEY* tsArray = pCols->cols[0].pData;
- int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
- int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
+ bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
- int32_t pos = cur->pos;
+ int32_t step = ascScan? 1 : -1;
int32_t start = cur->pos;
int32_t end = endPos;
- if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
+ if (!ascScan) {
TSWAP(start, end);
}
@@ -1876,11 +1893,11 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
// the time window should always be ascending order: skey <= ekey
cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
cur->mixBlock = (numOfRows != pBlockInfo->rows);
- cur->lastKey = tsArray[endPos] + step;
- cur->blockCompleted = true;
+ cur->lastKey = tsArray[endPos] + step;
+ cur->blockCompleted = (ascScan? (endPos == pBlockInfo->rows - 1):(endPos == 0));
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
- pos = endPos + step;
+ int32_t pos = endPos + step;
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pTsdbReadHandle);
@@ -1892,20 +1909,44 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
// NOTE: reverse the order to find the end position in data block
int32_t endPos = -1;
- int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
+ bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
+ int32_t order = ascScan? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pTsdbReadHandle->cur;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
- if (ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
- endPos = pBlockInfo->rows - 1;
- cur->mixBlock = (cur->pos != 0);
- } else if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
- endPos = 0;
- cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
+ if (pTsdbReadHandle->outputCapacity >= pBlockInfo->rows) {
+ if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
+ endPos = pBlockInfo->rows - 1;
+ cur->mixBlock = (cur->pos != 0);
+ } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
+ endPos = 0;
+ cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
+ } else {
+ assert(pCols->numOfRows > 0);
+ endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
+ cur->mixBlock = true;
+ }
} else {
- assert(pCols->numOfRows > 0);
- endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
+ if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
+ endPos = TMIN(cur->pos + pTsdbReadHandle->outputCapacity - 1, pBlockInfo->rows - 1);
+ } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
+ endPos = TMAX(cur->pos - pTsdbReadHandle->outputCapacity + 1, 0);
+ } else {
+ ASSERT(pCols->numOfRows > 0);
+ endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
+
+ // current data is more than the capacity
+ int32_t size = abs(cur->pos - endPos) + 1;
+ if (size > pTsdbReadHandle->outputCapacity) {
+ int32_t delta = size - pTsdbReadHandle->outputCapacity;
+ if (ascScan) {
+ endPos -= delta;
+ } else {
+ endPos += delta;
+ }
+ }
+ }
cur->mixBlock = true;
}
@@ -2369,7 +2410,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
-static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
+static int32_t getDataBlock(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
SQueryFilePos* cur = &pTsdbReadHandle->cur;
@@ -2478,7 +2519,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
cur->fid = pTsdbReadHandle->pFileGroup->fid;
STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
- return getDataBlockRv(pTsdbReadHandle, pBlockInfo, exists);
+ return getDataBlock(pTsdbReadHandle, pBlockInfo, exists);
}
static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
@@ -2643,7 +2684,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
} else {
moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
- return getDataBlockRv(pTsdbReadHandle, pNext, exists);
+ return getDataBlock(pTsdbReadHandle, pNext, exists);
}
}
}
diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c
index ef86ac86e417a880f1aab8a26d3dadeade5c9162..f0af677641c3e08cd4d848ec4c98b28fce4662ee 100644
--- a/source/dnode/vnode/src/vnd/vnodeOpen.c
+++ b/source/dnode/vnode/src/vnd/vnodeOpen.c
@@ -180,7 +180,6 @@ void vnodeClose(SVnode *pVnode) {
// start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) {
- vnodeSyncSetMsgCb(pVnode);
vnodeSyncStart(pVnode);
return 0;
}
diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c
index bcef95baff6417684a4a39063648814c35149221..8659c418070cdccf4dc9c3164d36f5548199f030 100644
--- a/source/dnode/vnode/src/vnd/vnodeSync.c
+++ b/source/dnode/vnode/src/vnd/vnodeSync.c
@@ -13,71 +13,62 @@
* along with this program. If not, see .
*/
+#define _DEFAULT_SOURCE
#include "vnd.h"
-int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
- SSyncInfo syncInfo;
- syncInfo.vgId = pVnode->config.vgId;
- SSyncCfg *pCfg = &(syncInfo.syncCfg);
- pCfg->replicaNum = pVnode->config.syncCfg.replicaNum;
- pCfg->myIndex = pVnode->config.syncCfg.myIndex;
- memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo));
-
- snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", path);
- syncInfo.pWal = pVnode->pWal;
+static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
+static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
+static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
+static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
+static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
+static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
+static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
- syncInfo.pFsm = syncVnodeMakeFsm(pVnode);
- syncInfo.msgcb = NULL;
- syncInfo.FpSendMsg = vnodeSyncSendMsg;
- syncInfo.FpEqMsg = vnodeSyncEqMsg;
+int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
+ SSyncInfo syncInfo = {
+ .vgId = pVnode->config.vgId,
+ .syncCfg = pVnode->config.syncCfg,
+ .pWal = pVnode->pWal,
+ .msgcb = NULL,
+ .FpSendMsg = vnodeSyncSendMsg,
+ .FpEqMsg = vnodeSyncEqMsg,
+ };
+
+ snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
+ syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
pVnode->sync = syncOpen(&syncInfo);
- assert(pVnode->sync > 0);
+ if (pVnode->sync <= 0) {
+ vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
+ return -1;
+ }
- // for test
setPingTimerMS(pVnode->sync, 3000);
setElectTimerMS(pVnode->sync, 500);
setHeartbeatTimerMS(pVnode->sync, 100);
-
return 0;
}
-int32_t vnodeSyncStart(SVnode *pVnode) {
+void vnodeSyncStart(SVnode *pVnode) {
+ syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync);
- return 0;
}
-void vnodeSyncClose(SVnode *pVnode) {
- // stop by ref id
- syncStop(pVnode->sync);
-}
-
-void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); }
+void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
-int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
- pMsg->info.noResp = 1;
- return tmsgSendReq(pEpSet, pMsg);
-}
-
-int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
- SVnode *pVnode = (SVnode *)(pFsm->data);
- vnodeGetSnapshot(pVnode, pSnapshot);
-
- /*
- pSnapshot->data = NULL;
- pSnapshot->lastApplyIndex = 0;
- pSnapshot->lastApplyTerm = 0;
- */
+int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
+int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
+ vnodeGetSnapshot(pFsm->data, pSnapshot);
return 0;
}
-void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
+void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) {
- SSnapshot snapshot;
+ SSnapshot snapshot = {0};
pFsm->FpGetSnapshot(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex;
}
@@ -128,7 +119,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
}
}
-void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
+void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
@@ -136,19 +127,19 @@ void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}
-void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
+void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}
-SSyncFSM *syncVnodeMakeFsm(SVnode *pVnode) {
- SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
+SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
+ SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode;
- pFsm->FpCommitCb = vnodeSyncCommitCb;
- pFsm->FpPreCommitCb = vnodeSyncPreCommitCb;
- pFsm->FpRollBackCb = vnodeSyncRollBackCb;
- pFsm->FpGetSnapshot = vnodeSyncGetSnapshotCb;
+ pFsm->FpCommitCb = vnodeSyncCommitMsg;
+ pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
+ pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
+ pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
return pFsm;
-}
+}
\ No newline at end of file
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 168589148ec5b23d17bc1d1110ac93dbb440221b..37aeb367f521d389b53a52b2e45c0dda6bcbbb13 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -3546,11 +3546,12 @@ _error:
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) {
// todo add more information about exchange operation
- if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
+ int32_t type = pOperator->operatorType;
+ if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
*order = TSDB_ORDER_ASC;
*scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS;
- } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
+ } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = pOperator->info;
*order = pTableScanInfo->cond.order;
*scanFlag = pTableScanInfo->scanFlag;
@@ -3910,6 +3911,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
+ if (code != TSDB_CODE_SUCCESS) {
+ longjmp(pTaskInfo->env, code);
+ }
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
@@ -4203,7 +4207,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->pScalarExprInfo = pScalarExprInfo;
pInfo->numOfScalarExpr = numOfScalarExpr;
if (pInfo->pScalarExprInfo != NULL) {
- pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfCols, &pInfo->rowCellInfoOffset);
+ pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
}
pOperator->name = "TableAggregate";
@@ -4311,23 +4315,29 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
+ // Make sure the size of SSDataBlock will never exceed the size of 2MB.
+ int32_t TWOMB = 2 * 1024 * 1024;
+ if (numOfRows * pResBlock->info.rowSize > TWOMB) {
+ numOfRows = TWOMB / pResBlock->info.rowSize;
+ }
initResultSizeInfo(pOperator, numOfRows);
+
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo);
- pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
- pOperator->name = "ProjectOperator";
+ pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
+ pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
- pOperator->blocking = false;
- pOperator->status = OP_NOT_OPENED;
- pOperator->info = pInfo;
- pOperator->pExpr = pExprInfo;
- pOperator->numOfExprs = num;
+ pOperator->blocking = false;
+ pOperator->status = OP_NOT_OPENED;
+ pOperator->info = pInfo;
+ pOperator->pExpr = pExprInfo;
+ pOperator->numOfExprs = num;
+ pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
destroyProjectOperatorInfo, NULL, NULL, NULL);
- pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index f5122a26efcd5103a483174ed3576466e44e536c..8b6ab96b6df3471f88f91d8d277eb1c127112a29 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -300,10 +300,26 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
if (fmIsScanPseudoColumnFunc(functionId)) {
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
} else { // these are tags
- const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
+ const char* p = NULL;
+ if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
+ const uint8_t *tmp = mr.me.ctbEntry.pTags;
+ char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
+ if(data == NULL){
+ qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
+ return;
+ }
+ *data = TSDB_DATA_TYPE_JSON;
+ memcpy(data+1, tmp, kvRowLen(tmp));
+ p = data;
+ }else{
+ p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
+ }
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL));
}
+ if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
+ taosMemoryFree((void*)p);
+ }
}
}
@@ -1587,8 +1603,21 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
STR_TO_VARSTR(str, mr.me.name);
colDataAppend(pDst, count, str, false);
} else { // it is a tag value
- const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
- colDataAppend(pDst, count, p, (p == NULL));
+ if(pDst->info.type == TSDB_DATA_TYPE_JSON){
+ const uint8_t *tmp = mr.me.ctbEntry.pTags;
+ char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
+ if(data == NULL){
+ qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
+ return NULL;
+ }
+ *data = TSDB_DATA_TYPE_JSON;
+ memcpy(data+1, tmp, kvRowLen(tmp));
+ colDataAppend(pDst, count, data, false);
+ taosMemoryFree(data);
+ }else{
+ const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
+ colDataAppend(pDst, count, p, (p == NULL));
+ }
}
}
diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h
index a20d0e471828c7dc19c508325bed30608c693a13..b75b52f5b362b76e924ff9e310b91b0075666734 100644
--- a/source/libs/function/inc/builtinsimpl.h
+++ b/source/libs/function/inc/builtinsimpl.h
@@ -76,6 +76,11 @@ int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t lastFunction(SqlFunctionCtx *pCtx);
int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
+bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
+bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
+int32_t uniqueFunction(SqlFunctionCtx *pCtx);
+int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
+
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx);
int32_t bottomFunction(SqlFunctionCtx *pCtx);
diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c
index 5358930df000b3d758667ae6879354eefbe41a5d..85c69d028b113f856be55308988b027de748c6cc 100644
--- a/source/libs/function/src/builtins.c
+++ b/source/libs/function/src/builtins.c
@@ -493,6 +493,21 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return TSDB_CODE_SUCCESS;
}
+static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
+ if (1 != LIST_LENGTH(pFunc->pParameterList)) {
+ return TSDB_CODE_SUCCESS;
+ }
+
+ SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
+ if (QUERY_NODE_COLUMN != nodeType(pPara)) {
+ return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
+ "The parameters of UNIQUE can only be columns");
+ }
+
+ pFunc->node.resType = ((SExprNode*)pPara)->resType;
+ return TSDB_CODE_SUCCESS;
+}
+
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t paraLen = LIST_LENGTH(pFunc->pParameterList);
if (paraLen == 0 || paraLen > 2) {
@@ -878,14 +893,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = lastFinalize
},
{
- .name = "diff",
- .type = FUNCTION_TYPE_DIFF,
- .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
- .translateFunc = translateDiff,
- .getEnvFunc = getDiffFuncEnv,
- .initFunc = diffFunctionSetup,
- .processFunc = diffFunction,
- .finalizeFunc = functionFinalize
+ .name = "unique",
+ .type = FUNCTION_TYPE_UNIQUE,
+ .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
+ .translateFunc = translateUnique,
+ .getEnvFunc = getUniqueFuncEnv,
+ .initFunc = uniqueFunctionSetup,
+ .processFunc = uniqueFunction,
+ .finalizeFunc = uniqueFinalize
},
{
.name = "histogram",
@@ -907,6 +922,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = hllFunction,
.finalizeFunc = hllFinalize
},
+ {
+ .name = "diff",
+ .type = FUNCTION_TYPE_DIFF,
+ .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
+ .translateFunc = translateDiff,
+ .getEnvFunc = getDiffFuncEnv,
+ .initFunc = diffFunctionSetup,
+ .processFunc = diffFunction,
+ .finalizeFunc = functionFinalize
+ },
{
.name = "state_count",
.type = FUNCTION_TYPE_STATE_COUNT,
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index d54cc96611787a245b4e0c5b6d9030873f14ff38..79d4e4f22548bee86a07c4306d98ba99c1b44266 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -28,12 +28,15 @@
#define TAIL_MAX_POINTS_NUM 100
#define TAIL_MAX_OFFSET 100
+#define UNIQUE_MAX_RESULT_SIZE (1024*1024*10)
+
#define HLL_BUCKET_BITS 14 // The bits of the bucket
#define HLL_DATA_BITS (64-HLL_BUCKET_BITS)
#define HLL_BUCKETS (1<subsidiaries.num; ++_i) { \
+ SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
+ if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
+ __ctx->tag.i = (ts); \
+ __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
+ } \
+ __ctx->fpSet.process(__ctx); \
+ } \
+ } while (0)
+
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
@@ -748,50 +777,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true;
}
-#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
-#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
-
-#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
- do { \
- for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
- SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
- __ctx->fpSet.process(__ctx); \
- } \
- } while (0);
-
-#define DO_UPDATE_SUBSID_RES(ctx, ts) \
- do { \
- for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
- SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
- if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
- __ctx->tag.i = (ts); \
- __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
- } \
- __ctx->fpSet.process(__ctx); \
- } \
- } while (0)
-
-#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
- do { \
- if (((left) < (right)) ^ (sign)) { \
- (left) = (right); \
- DO_UPDATE_SUBSID_RES(ctx, _ts); \
- (num) += 1; \
- } \
- } while (0)
-
-#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
- do { \
- _t* d = (_t*)((_col)->pData); \
- for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
- if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
- continue; \
- } \
- TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
- UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
- } \
- } while (0)
-
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
@@ -1994,6 +1979,99 @@ int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
+bool getUniqueFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
+ pEnv->calcMemSize = sizeof(SUniqueInfo) + UNIQUE_MAX_RESULT_SIZE;
+ return true;
+}
+
+bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
+ if (!functionSetup(pCtx, pResInfo)) {
+ return false;
+ }
+
+ SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
+ pInfo->numOfPoints = 0;
+ pInfo->colType = pCtx->resDataInfo.type;
+ pInfo->colBytes = pCtx->resDataInfo.bytes;
+ if (pInfo->pHash != NULL) {
+ taosHashClear(pInfo->pHash);
+ } else {
+ pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
+ }
+ return true;
+}
+
+static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) {
+ int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
+
+ SUniqueItem *pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
+ if (pHashItem == NULL) {
+ int32_t size = sizeof(SUniqueItem) + pInfo->colBytes;
+ SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + pInfo->numOfPoints * size);
+ pItem->timestamp = ts;
+ memcpy(pItem->data, data, pInfo->colBytes);
+
+ taosHashPut(pInfo->pHash, data, hashKeyBytes, (char *)pItem, sizeof(SUniqueItem*));
+ pInfo->numOfPoints++;
+ } else if (pHashItem->timestamp > ts) {
+ pHashItem->timestamp = ts;
+ }
+
+}
+
+int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
+ SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
+ SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
+
+ SInputColumnInfoData* pInput = &pCtx->input;
+ TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
+
+ SColumnInfoData* pInputCol = pInput->pData[0];
+ SColumnInfoData* pTsOutput = pCtx->pTsOutput;
+ SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
+
+ int32_t startOffset = pCtx->offset;
+ for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
+ char* data = colDataGetData(pInputCol, i);
+ doUniqueAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i));
+
+ if (sizeof(SUniqueInfo) + pInfo->numOfPoints * (sizeof(SUniqueItem) + pInfo->colBytes) >= UNIQUE_MAX_RESULT_SIZE) {
+ taosHashCleanup(pInfo->pHash);
+ return 0;
+ }
+ }
+
+ //taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn);
+
+ //for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
+ // int32_t pos = startOffset + i;
+ // STailItem *pItem = pInfo->pItems[i];
+ // if (pItem->isNull) {
+ // colDataAppendNULL(pOutput, pos);
+ // } else {
+ // colDataAppend(pOutput, pos, pItem->data, false);
+ // }
+ //}
+
+ pResInfo->numOfRes = pInfo->numOfPoints;
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
+ SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
+ SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
+ int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
+ SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
+
+ for (int32_t i = 0; i < pResInfo->numOfRes; ++i) {
+ SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes));
+ colDataAppend(pCol, i, pItem->data, false);
+ //TODO: handle ts output
+ }
+
+ return pResInfo->numOfRes;
+}
+
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SDiffInfo);
return true;
@@ -2106,7 +2184,7 @@ static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SCo
default:
ASSERT(0);
}
- }
+}
int32_t diffFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c
index bc49f36afe121db0c676662c972f088a660f897c..0e8f530b0eb8d209f73cf349a4ca8dd590a2e304 100644
--- a/source/libs/nodes/src/nodesCodeFuncs.c
+++ b/source/libs/nodes/src/nodesCodeFuncs.c
@@ -1771,6 +1771,7 @@ static const char* jkSubplanId = "Id";
static const char* jkSubplanType = "SubplanType";
static const char* jkSubplanMsgType = "MsgType";
static const char* jkSubplanLevel = "Level";
+static const char* jkSubplanDbFName = "DbFName";
static const char* jkSubplanNodeAddr = "NodeAddr";
static const char* jkSubplanRootNode = "RootNode";
static const char* jkSubplanDataSink = "DataSink";
@@ -1788,6 +1789,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanLevel, pNode->level);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddStringToObject(pJson, jkSubplanDbFName, pNode->dbFName);
+ }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSubplanNodeAddr, queryNodeAddrToJson, &pNode->execNode);
}
@@ -1815,6 +1819,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanLevel, &pNode->level);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonGetStringValue(pJson, jkSubplanDbFName, pNode->dbFName);
+ }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkSubplanNodeAddr, jsonToQueryNodeAddr, &pNode->execNode);
}
diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c
index 476b3b278678a906e0b1c71240ff809b4f4d394e..9fb9d8e5514da34620acfb0e385d5e550c041660 100644
--- a/source/libs/nodes/src/nodesUtilFuncs.c
+++ b/source/libs/nodes/src/nodesUtilFuncs.c
@@ -1137,10 +1137,6 @@ bool nodesIsRegularOp(const SOperatorNode* pOp) {
return false;
}
-bool nodesIsTimeorderQuery(const SNode* pQuery) { return false; }
-
-bool nodesIsTimelineQuery(const SNode* pQuery) { return false; }
-
typedef struct SCollectColumnsCxt {
int32_t errCode;
const char* pTableAlias;
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index cd1c91f84cbef0a21993e6db4e27acb492d087f2..dbb29699fc33970febce8844d12442ac5e2d7d54 100644
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -382,6 +382,35 @@ static bool isInternalPrimaryKey(const SColumnNode* pCol) {
return PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME);
}
+static bool isTimeOrderQuery(SNode* pStmt) {
+ if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
+ return ((SSelectStmt*)pStmt)->isTimeOrderQuery;
+ } else {
+ return false;
+ }
+}
+
+static bool isPrimaryKeyImpl(STempTableNode* pTable, SNode* pExpr) {
+ if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
+ return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId);
+ } else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) {
+ SFunctionNode* pFunc = (SFunctionNode*)pExpr;
+ if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType) {
+ return isPrimaryKeyImpl(pTable, nodesListGetNode(pFunc->pParameterList, 0));
+ } else if (FUNCTION_TYPE_WSTARTTS == pFunc->funcType || FUNCTION_TYPE_WENDTS == pFunc->funcType) {
+ return true;
+ }
+ }
+ return false;
+}
+
+static bool isPrimaryKey(STempTableNode* pTable, SNode* pExpr) {
+ if (!isTimeOrderQuery(pTable->pSubquery)) {
+ return false;
+ }
+ return isPrimaryKeyImpl(pTable, pExpr);
+}
+
static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
bool found = false;
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
@@ -404,8 +433,7 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
FOREACH(pNode, pProjectList) {
SExprNode* pExpr = (SExprNode*)pNode;
if (0 == strcmp(pCol->colName, pExpr->aliasName) ||
- ((QUERY_NODE_COLUMN == nodeType(pExpr) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId) &&
- isInternalPrimaryKey(pCol))) {
+ (isPrimaryKey((STempTableNode*)pTable, pNode) && isInternalPrimaryKey(pCol))) {
setColumnInfoByExpr(pTable, pExpr, pCol);
found = true;
break;
@@ -454,6 +482,9 @@ static EDealRes translateColumnWithoutPrefix(STranslateContext* pCxt, SColumnNod
}
if (!found) {
if (isInternalPk) {
+ if (NULL != pCxt->pCurrStmt->pWindow) {
+ return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY);
+ }
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_INTERNAL_PK);
} else {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_COLUMN, pCol->colName);
@@ -781,7 +812,6 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
}
pCxt->pCurrStmt->hasAggFuncs = true;
- pCxt->pCurrStmt->isTimeOrderQuery = false;
if (isCountStar(pFunc)) {
pCxt->errCode = rewriteCountStar(pCxt, pFunc);
}
diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c
index 11884bc10dcc62706938d5a0a6b115b9374b71b4..fe21915b1ae100948ab2b485d799456aafbda639 100644
--- a/source/libs/parser/src/parUtil.c
+++ b/source/libs/parser/src/parUtil.c
@@ -167,6 +167,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_NOT_ALLOWED_FUNC:
return "Some functions are allowed only in the SELECT list of a query. "
"And, cannot be mixed with other non scalar functions or columns.";
+ case TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY:
+ return "Window query not supported, since the result of subquery not include valid timestamp column";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:
@@ -365,8 +367,8 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p
if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) {
continue;
}
- // key: keyLen + VARSTR_HEADER_SIZE, value type: CHAR_BYTES, value reserved: LONG_BYTES
- tagKV = taosMemoryCalloc(keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + LONG_BYTES, 1);
+ // key: keyLen + VARSTR_HEADER_SIZE, value type: CHAR_BYTES, value reserved: DOUBLE_BYTES
+ tagKV = taosMemoryCalloc(keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + DOUBLE_BYTES, 1);
if (!tagKV) {
retCode = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end;
@@ -411,13 +413,9 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p
}
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE);
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES);
- *valueType =
- (item->valuedouble - (int64_t)(item->valuedouble) == 0) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_DOUBLE;
- if (*valueType == TSDB_DATA_TYPE_DOUBLE)
- *((double*)valueData) = item->valuedouble;
- else if (*valueType == TSDB_DATA_TYPE_BIGINT)
- *((int64_t*)valueData) = item->valueint;
- tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + LONG_BYTES);
+ *valueType = TSDB_DATA_TYPE_DOUBLE;
+ *((double*)valueData) = item->valuedouble;
+ tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + DOUBLE_BYTES);
} else if (item->type == cJSON_True || item->type == cJSON_False) {
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE);
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES);
diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp
index ca72d8e8b6088df1f1b519b20bd0741663c42796..9f8d5b48029f3004736a6ed9b77d14108adcba8c 100644
--- a/source/libs/parser/test/parSelectTest.cpp
+++ b/source/libs/parser/test/parSelectTest.cpp
@@ -125,8 +125,6 @@ TEST_F(ParserSelectTest, nonstdFunc) {
useDb("root", "test");
run("SELECT DIFF(c1) FROM t1");
-
- // run("SELECT DIFF(c1) FROM t1 INTERVAL(10s)");
}
TEST_F(ParserSelectTest, nonstdFuncSemanticCheck) {
@@ -139,12 +137,13 @@ TEST_F(ParserSelectTest, nonstdFuncSemanticCheck) {
run("SELECT DIFF(c1), count(*) FROM t1", TSDB_CODE_PAR_NOT_ALLOWED_FUNC, PARSER_STAGE_TRANSLATE);
run("SELECT DIFF(c1), CSUM(c1) FROM t1", TSDB_CODE_PAR_NOT_ALLOWED_FUNC, PARSER_STAGE_TRANSLATE);
+
+ // run("SELECT DIFF(c1) FROM t1 INTERVAL(10s)");
}
-TEST_F(ParserSelectTest, clause) {
+TEST_F(ParserSelectTest, groupBy) {
useDb("root", "test");
- // GROUP BY clause
run("SELECT COUNT(*) cnt FROM t1 WHERE c1 > 0");
run("SELECT COUNT(*), c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c2");
@@ -154,13 +153,19 @@ TEST_F(ParserSelectTest, clause) {
run("SELECT COUNT(*), c1, c2 + 10, c1 + c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c2, c1");
run("SELECT COUNT(*), c1 + 10, c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c1 + 10, c2");
+}
+
+TEST_F(ParserSelectTest, orderBy) {
+ useDb("root", "test");
- // order by clause
run("SELECT COUNT(*) cnt FROM t1 WHERE c1 > 0 GROUP BY c2 order by cnt");
run("SELECT COUNT(*) cnt FROM t1 WHERE c1 > 0 GROUP BY c2 order by 1");
+}
+
+TEST_F(ParserSelectTest, distinct) {
+ useDb("root", "test");
- // distinct clause
// run("SELECT distinct c1, c2 FROM t1 WHERE c1 > 0 order by c1");
// run("SELECT distinct c1 + 10, c2 FROM t1 WHERE c1 > 0 order by c1 + 10, c2");
@@ -194,6 +199,25 @@ TEST_F(ParserSelectTest, intervalSemanticCheck) {
PARSER_STAGE_TRANSLATE);
}
+TEST_F(ParserSelectTest, subquery) {
+ useDb("root", "test");
+
+ run("SELECT SUM(a) FROM (SELECT MAX(c1) a, ts FROM st1s1 INTERVAL(1m)) INTERVAL(1n)");
+
+ run("SELECT SUM(a) FROM (SELECT MAX(c1) a, _wstartts FROM st1s1 INTERVAL(1m)) INTERVAL(1n)");
+
+ run("SELECT SUM(a) FROM (SELECT MAX(c1) a, ts FROM st1s1 PARTITION BY TBNAME INTERVAL(1m)) INTERVAL(1n)");
+
+ run("SELECT SUM(a) FROM (SELECT MAX(c1) a, _wstartts FROM st1s1 PARTITION BY TBNAME INTERVAL(1m)) INTERVAL(1n)");
+}
+
+TEST_F(ParserSelectTest, subquerySemanticError) {
+ useDb("root", "test");
+
+ run("SELECT SUM(a) FROM (SELECT MAX(c1) a FROM st1s1 INTERVAL(1m)) INTERVAL(1n)", TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY,
+ PARSER_STAGE_TRANSLATE);
+}
+
TEST_F(ParserSelectTest, semanticError) {
useDb("root", "test");
diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c
index 7e3dbaf7d02595b04ea6a744660579e223ba29fc..49ed3ab48bfc96a5f081e28fd806457c807958a9 100644
--- a/source/libs/scalar/src/scalar.c
+++ b/source/libs/scalar/src/scalar.c
@@ -899,7 +899,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
}
int32_t code = 0;
- SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst->param};
+ SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst ? pDst->param : NULL};
// TODO: OPT performance
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c
index 19453bf7600128de3abd8a5f096ebf9c023b62ec..145ed69a775a2a42bc04664b012ba1efe3995bdb 100644
--- a/source/libs/scalar/src/sclvector.c
+++ b/source/libs/scalar/src/sclvector.c
@@ -922,7 +922,8 @@ static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
}
}
-char *getJsonValue(char *json, char *key){ //todo
+char *getJsonValue(char *json, char *key){ //todo
+ json++; // jump type
int16_t cols = kvRowNCols(json);
for (int i = 0; i < cols; ++i) {
SColIdx *pColIdx = kvRowColIdxAt(json, i);
diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp
index 49a5f5b9a46a7bdc87c093969f9612dfa5ac1d81..627c3c438c4c4cdb93ec91276326d5ffc6f8c2cf 100644
--- a/source/libs/scalar/test/scalar/scalarTests.cpp
+++ b/source/libs/scalar/test/scalar/scalarTests.cpp
@@ -1035,7 +1035,7 @@ void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){
SNode *pLeft = NULL, *pRight = NULL;
scltMakeValueNode(&pRight, TSDB_DATA_TYPE_BINARY, keyVar);
- scltMakeColumnNode(&pLeft, src, TSDB_DATA_TYPE_JSON, varDataLen(json), 1, json);
+ scltMakeColumnNode(&pLeft, src, TSDB_DATA_TYPE_JSON, kvRowLen(json), 1, json);
scltMakeOpNode(opNode, OP_TYPE_JSON_GET_VALUE, TSDB_DATA_TYPE_JSON, pLeft, pRight);
}
@@ -1088,18 +1088,17 @@ void makeCalculate(void *json, void *key, int32_t rightType, void *rightData, do
}else if(opType == OP_TYPE_ADD || opType == OP_TYPE_SUB || opType == OP_TYPE_MULTI || opType == OP_TYPE_DIV ||
opType == OP_TYPE_MOD || opType == OP_TYPE_MINUS){
- double tmp = *((double *)colDataGetData(column, 0));
- ASSERT_TRUE(tmp == exceptValue);
- printf("result:%lf\n", tmp);
+ printf("1result:%f,except:%f\n", *((double *)colDataGetData(column, 0)), exceptValue);
+ ASSERT_TRUE(abs(*((double *)colDataGetData(column, 0)) - exceptValue) < 1e-15);
}else if(opType == OP_TYPE_BIT_AND || opType == OP_TYPE_BIT_OR){
+ printf("2result:%ld,except:%f\n", *((int64_t *)colDataGetData(column, 0)), exceptValue);
ASSERT_EQ(*((int64_t *)colDataGetData(column, 0)), exceptValue);
- printf("result:%ld\n", *((int64_t *)colDataGetData(column, 0)));
}else if(opType == OP_TYPE_GREATER_THAN || opType == OP_TYPE_GREATER_EQUAL || opType == OP_TYPE_LOWER_THAN ||
opType == OP_TYPE_LOWER_EQUAL || opType == OP_TYPE_EQUAL || opType == OP_TYPE_NOT_EQUAL ||
opType == OP_TYPE_IS_NULL || opType == OP_TYPE_IS_NOT_NULL || opType == OP_TYPE_IS_TRUE ||
opType == OP_TYPE_LIKE || opType == OP_TYPE_NOT_LIKE || opType == OP_TYPE_MATCH || opType == OP_TYPE_NMATCH){
+ printf("3result:%d,except:%f\n", *((bool *)colDataGetData(column, 0)), exceptValue);
ASSERT_EQ(*((bool *)colDataGetData(column, 0)), exceptValue);
- printf("result:%d\n", *((bool *)colDataGetData(column, 0)));
}
taosArrayDestroyEx(blockList, scltFreeDataBlock);
@@ -1114,6 +1113,13 @@ TEST(columnTest, json_column_arith_op) {
tdInitKVRowBuilder(&kvRowBuilder);
parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
+ char *tmp = (char *)taosMemoryRealloc(row, kvRowLen(row)+1);
+ if(tmp == NULL){
+ ASSERT_TRUE(0);
+ }
+ memmove(tmp+1, tmp, kvRowLen(tmp));
+ *tmp = TSDB_DATA_TYPE_JSON;
+ row = tmp;
const int32_t len = 8;
EOperatorType op[len] = {OP_TYPE_ADD, OP_TYPE_SUB, OP_TYPE_MULTI, OP_TYPE_DIV,
@@ -1166,6 +1172,9 @@ TEST(columnTest, json_column_arith_op) {
for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes5[i], op[i]);
}
+
+ tdDestroyKVRowBuilder(&kvRowBuilder);
+ taosMemoryFree(row);
}
void *prepareNchar(char* rightData){
@@ -1186,6 +1195,13 @@ TEST(columnTest, json_column_logic_op) {
tdInitKVRowBuilder(&kvRowBuilder);
parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
+ char *tmp = (char *)taosMemoryRealloc(row, kvRowLen(row)+1);
+ if(tmp == NULL){
+ ASSERT_TRUE(0);
+ }
+ memmove(tmp+1, tmp, kvRowLen(tmp));
+ *tmp = TSDB_DATA_TYPE_JSON;
+ row = tmp;
const int32_t len = 9;
const int32_t len1 = 4;
@@ -1223,7 +1239,7 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json null---------------------\n");
key = "k3";
- double eRes2[len+len1] = {DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, true, false, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX};
+ bool eRes2[len+len1] = {false, false, false, false, false, false, true, false, false, false, false, false, false};
for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes2[i], op[i]);
}
@@ -1262,7 +1278,7 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json double---------------------\n");
key = "k6";
- bool eRes5[len+len1] = {true, false, false, false, false, true, false, true, true, false, false, false, true};
+ bool eRes5[len+len1] = {true, false, false, false, false, true, false, true, true, false, true, false, true};
for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes5[i], op[i]);
}
@@ -1275,7 +1291,7 @@ TEST(columnTest, json_column_logic_op) {
printf("---------------------json not exist--------------------\n");
key = "k10";
- double eRes10[len+len1] = {DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, true, false, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX};
+ double eRes10[len+len1] = {false, false, false, false, false, false, true, false, false, false, false, false, false};
for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes10[i], op[i]);
}
@@ -1284,6 +1300,9 @@ TEST(columnTest, json_column_logic_op) {
makeCalculate(row, key, TSDB_DATA_TYPE_NCHAR, rightData, eRes10[i], op[i]);
taosMemoryFree(rightData);
}
+
+ tdDestroyKVRowBuilder(&kvRowBuilder);
+ taosMemoryFree(row);
}
TEST(columnTest, smallint_value_add_int_column) {
diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h
index 36f22db05f3921be2a71cae15c9f33c8098306c6..768e1c1cf1b55486dea6c98dae7e6df9ed2f891a 100644
--- a/source/libs/sync/inc/syncInt.h
+++ b/source/libs/sync/inc/syncInt.h
@@ -20,135 +20,41 @@
extern "C" {
#endif
-#include
-#include
-#include
-#include "cJSON.h"
#include "sync.h"
#include "syncTools.h"
-#include "taosdef.h"
-#include "tglobal.h"
#include "tlog.h"
#include "ttimer.h"
-#define sFatal(...) \
- { \
- if (sDebugFlag & DEBUG_FATAL) { \
- taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
- } \
- }
-#define sError(...) \
- { \
- if (sDebugFlag & DEBUG_ERROR) { \
- taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
- } \
- }
-#define sWarn(...) \
- { \
- if (sDebugFlag & DEBUG_WARN) { \
- taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
- } \
- }
-#define sInfo(...) \
- { \
- if (sDebugFlag & DEBUG_INFO) { \
- taosPrintLog("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \
- } \
- }
-#define sDebug(...) \
- { \
- if (sDebugFlag & DEBUG_DEBUG) { \
- taosPrintLog("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
- } \
- }
-#define sTrace(...) \
- { \
- if (sDebugFlag & DEBUG_TRACE) { \
- taosPrintLog("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
- } \
- }
-
-#define sFatalLong(...) \
- { \
- if (sDebugFlag & DEBUG_FATAL) { \
- taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
- } \
- }
-#define sErrorLong(...) \
- { \
- if (sDebugFlag & DEBUG_ERROR) { \
- taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
- } \
- }
-#define sWarnLong(...) \
- { \
- if (sDebugFlag & DEBUG_WARN) { \
- taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
- } \
- }
-#define sInfoLong(...) \
- { \
- if (sDebugFlag & DEBUG_INFO) { \
- taosPrintLongString("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \
- } \
- }
-#define sDebugLong(...) \
- { \
- if (sDebugFlag & DEBUG_DEBUG) { \
- taosPrintLongString("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
- } \
- }
-#define sTraceLong(...) \
- { \
- if (sDebugFlag & DEBUG_TRACE) { \
- taosPrintLongString("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
- } \
- }
-
-struct SyncTimeout;
-typedef struct SyncTimeout SyncTimeout;
-
-struct SyncClientRequest;
-typedef struct SyncClientRequest SyncClientRequest;
-
-struct SyncPing;
-typedef struct SyncPing SyncPing;
-
-struct SyncPingReply;
-typedef struct SyncPingReply SyncPingReply;
-
-struct SyncRequestVote;
-typedef struct SyncRequestVote SyncRequestVote;
-
-struct SyncRequestVoteReply;
-typedef struct SyncRequestVoteReply SyncRequestVoteReply;
-
-struct SyncAppendEntries;
-typedef struct SyncAppendEntries SyncAppendEntries;
-
-struct SyncAppendEntriesReply;
+// clang-format off
+#define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
+#define sError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
+#define sWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
+#define sInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
+#define sDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0)
+#define sTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0)
+#define sFatalLong(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
+#define sErrorLong(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
+#define sWarnLong(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
+#define sInfoLong(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLongString("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
+#define sDebugLong(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0)
+#define sTraceLong(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0)
+// clang-format on
+
+typedef struct SyncTimeout SyncTimeout;
+typedef struct SyncClientRequest SyncClientRequest;
+typedef struct SyncPing SyncPing;
+typedef struct SyncPingReply SyncPingReply;
+typedef struct SyncRequestVote SyncRequestVote;
+typedef struct SyncRequestVoteReply SyncRequestVoteReply;
+typedef struct SyncAppendEntries SyncAppendEntries;
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
-
-struct SSyncEnv;
-typedef struct SSyncEnv SSyncEnv;
-
-struct SRaftStore;
-typedef struct SRaftStore SRaftStore;
-
-struct SVotesGranted;
-typedef struct SVotesGranted SVotesGranted;
-
-struct SVotesRespond;
-typedef struct SVotesRespond SVotesRespond;
-
-struct SSyncIndexMgr;
-typedef struct SSyncIndexMgr SSyncIndexMgr;
-
-struct SRaftCfg;
-typedef struct SRaftCfg SRaftCfg;
-
-struct SSyncRespMgr;
-typedef struct SSyncRespMgr SSyncRespMgr;
+typedef struct SSyncEnv SSyncEnv;
+typedef struct SRaftStore SRaftStore;
+typedef struct SVotesGranted SVotesGranted;
+typedef struct SVotesRespond SVotesRespond;
+typedef struct SSyncIndexMgr SSyncIndexMgr;
+typedef struct SRaftCfg SRaftCfg;
+typedef struct SSyncRespMgr SSyncRespMgr;
typedef struct SSyncNode {
// init by SSyncInfo
diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c
index 56389de88a4137fe460e5e4c4e484d3f6ce7c304..d9ff60bbe22b573db34331e5aabbd04b06ff5616 100644
--- a/source/libs/sync/src/syncMain.c
+++ b/source/libs/sync/src/syncMain.c
@@ -674,10 +674,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
if (pSyncNode->FpSendMsg != NULL) {
- pMsg->info.noResp = 1;
// htonl
syncUtilMsgHtoN(pMsg->pCont);
+ pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg);
} else {
sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
@@ -689,10 +689,10 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
if (pSyncNode->FpSendMsg != NULL) {
- pMsg->info.noResp = 1;
// htonl
syncUtilMsgHtoN(pMsg->pCont);
+ pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg);
} else {
sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp
index b8a9bec1087b41e60f94b11ea2ec1b0a651a98b9..630d96054bef043134b76233683551abe682bd6c 100644
--- a/source/libs/sync/test/syncIOSendMsgTest.cpp
+++ b/source/libs/sync/test/syncIOSendMsgTest.cpp
@@ -97,11 +97,12 @@ int main(int argc, char** argv) {
for (int i = 0; i < 10; ++i) {
SyncPingReply* pSyncMsg =
syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncIOSendMsgTest");
- SRpcMsg rpcMsg;
+ SRpcMsg rpcMsg = {0};
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
SEpSet epSet;
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
+ rpcMsg.info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, &rpcMsg);
taosMsleep(1000);
diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp
index fee3447884c08f9208be98615318609e344c6cd1..5a3b45cca5aef3a709e5811104b0970a838a52bd 100644
--- a/source/libs/tdb/test/tdbTest.cpp
+++ b/source/libs/tdb/test/tdbTest.cpp
@@ -4,6 +4,7 @@
#include "os.h"
#include "tdb.h"
+#include
#include
#include
#include
@@ -118,7 +119,7 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in
return cret;
}
-TEST(tdb_test, simple_insert1) {
+TEST(tdb_test, DISABLED_simple_insert1) {
int ret;
TDB *pEnv;
TTB *pDb;
@@ -238,7 +239,7 @@ TEST(tdb_test, simple_insert1) {
GTEST_ASSERT_EQ(ret, 0);
}
-TEST(tdb_test, simple_insert2) {
+TEST(tdb_test, DISABLED_simple_insert2) {
int ret;
TDB *pEnv;
TTB *pDb;
@@ -325,7 +326,7 @@ TEST(tdb_test, simple_insert2) {
GTEST_ASSERT_EQ(ret, 0);
}
-TEST(tdb_test, simple_delete1) {
+TEST(tdb_test, DISABLED_simple_delete1) {
int ret;
TTB *pDb;
char key[128];
@@ -420,7 +421,7 @@ TEST(tdb_test, simple_delete1) {
tdbClose(pEnv);
}
-TEST(tdb_test, simple_upsert1) {
+TEST(tdb_test, DISABLED_simple_upsert1) {
int ret;
TDB *pEnv;
TTB *pDb;
@@ -485,12 +486,12 @@ TEST(tdb_test, simple_upsert1) {
tdbClose(pEnv);
}
-TEST(tdb_test, multi_thread_query) {
+TEST(tdb_test, DISABLED_multi_thread_query) {
int ret;
TDB *pEnv;
TTB *pDb;
tdb_cmpr_fn_t compFunc;
- int nData = 20000;
+ int nData = 100000;
TXN txn;
taosRemoveDir("tdb");
@@ -597,4 +598,132 @@ TEST(tdb_test, multi_thread_query) {
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
+}
+
+TEST(tdb_test, multi_thread1) {
+#if 0
+ int ret;
+ TDB *pDb;
+ TTB *pTb;
+ tdb_cmpr_fn_t compFunc;
+ int nData = 10000000;
+ TXN txn;
+
+ std::shared_timed_mutex mutex;
+
+ taosRemoveDir("tdb");
+
+ // Open Env
+ ret = tdbOpen("tdb", 512, 1, &pDb);
+ GTEST_ASSERT_EQ(ret, 0);
+
+ ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb);
+ GTEST_ASSERT_EQ(ret, 0);
+
+ auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) {
+ TXN txn = {0};
+ char key[128];
+ char val[128];
+ SPoolMem *pPool = openPool();
+
+ txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED;
+ txn.txnId = -1;
+ txn.xMalloc = poolMalloc;
+ txn.xFree = poolFree;
+ txn.xArg = pPool;
+ tdbBegin(pDb, &txn);
+ for (int iData = 1; iData <= nData; iData++) {
+ sprintf(key, "key%d", iData);
+ sprintf(val, "value%d", iData);
+ {
+ std::lock_guard wmutex(*mu);
+
+ int ret = tdbTbInsert(pTb, key, strlen(key), val, strlen(val), &txn);
+
+ GTEST_ASSERT_EQ(ret, 0);
+ }
+
+ if (pPool->size > 1024 * 1024) {
+ tdbCommit(pDb, &txn);
+
+ clearPool(pPool);
+ tdbBegin(pDb, &txn);
+ }
+ }
+
+ tdbCommit(pDb, &txn);
+ closePool(pPool);
+
+ *stop = 1;
+ };
+
+ auto query = [](TTB *pTb, int *stop, std::shared_timed_mutex *mu) {
+ TBC *pDBC;
+ void *pKey = NULL;
+ void *pVal = NULL;
+ int vLen, kLen;
+ int ret;
+ TXN txn;
+
+ SPoolMem *pPool = openPool();
+ txn.flags = 0;
+ txn.txnId = 0;
+ txn.xMalloc = poolMalloc;
+ txn.xFree = poolFree;
+ txn.xArg = pPool;
+
+ for (;;) {
+ if (*stop) break;
+
+ clearPool(pPool);
+ int count = 0;
+ {
+ std::shared_lock rMutex(*mu);
+
+ ret = tdbTbcOpen(pTb, &pDBC, &txn);
+ GTEST_ASSERT_EQ(ret, 0);
+
+ tdbTbcMoveToFirst(pDBC);
+
+ for (;;) {
+ ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
+ if (ret < 0) break;
+ count++;
+ }
+
+ std::cout << count << std::endl;
+
+ tdbTbcClose(pDBC);
+ }
+
+ usleep(500000);
+ }
+
+ closePool(pPool);
+ tdbFree(pKey);
+ tdbFree(pVal);
+ };
+
+ std::vector threads;
+ int nThreads = 10;
+ int stop = 0;
+ for (int i = 0; i < nThreads; i++) {
+ if (i == 0) {
+ threads.push_back(std::thread(insert, pDb, pTb, nData, &stop, &mutex));
+ } else {
+ threads.push_back(std::thread(query, pTb, &stop, &mutex));
+ }
+ }
+
+ for (auto &th : threads) {
+ th.join();
+ }
+
+ // Close a database
+ tdbTbClose(pTb);
+
+ // Close Env
+ ret = tdbClose(pDb);
+ GTEST_ASSERT_EQ(ret, 0);
+#endif
}
\ No newline at end of file
diff --git a/tests/script/tsim/stable/add_column.sim b/tests/script/tsim/stable/add_column.sim
new file mode 100644
index 0000000000000000000000000000000000000000..acacc13524f0db3723c8036339bde4a3476208fd
--- /dev/null
+++ b/tests/script/tsim/stable/add_column.sim
@@ -0,0 +1,107 @@
+system sh/stop_dnodes.sh
+system sh/deploy.sh -n dnode1 -i 1
+system sh/exec.sh -n dnode1 -s start
+sql connect
+
+print ========== prepare stb and ctb
+sql create database db vgroups 1
+sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd"
+sql create table db.ctb using db.stb tags(1, 2, "3")
+sql insert into db.ctb values(now, 1, "2")
+
+sql show db.stables
+if $rows != 1 then
+ return -1
+endi
+if $data[0][0] != stb then
+ return -1
+endi
+if $data[0][1] != db then
+ return -1
+endi
+if $data[0][3] != 3 then
+ return -1
+endi
+if $data[0][4] != 3 then
+ return -1
+endi
+if $data[0][6] != abd then
+ return -1
+endi
+
+sql show db.tables
+if $rows != 1 then
+ return -1
+endi
+if $data[0][0] != ctb then
+ return -1
+endi
+if $data[0][1] != db then
+ return -1
+endi
+if $data[0][3] != 3 then
+ return -1
+endi
+if $data[0][4] != stb then
+ return -1
+endi
+if $data[0][6] != 2 then
+ return -1
+endi
+if $data[0][9] != CHILD_TABLE then
+ return -1
+endi
+
+sql select * from db.stb
+if $rows != 1 then
+ return -1
+endi
+if $data[0][1] != 1 then
+ return -1
+endi
+if $data[0][2] != 2 then
+ return -1
+endi
+if $data[0][3] != 1 then
+ return -1
+endi
+
+print ========== add column
+sql alter table db.stb add column c3 int
+sql alter table db.stb add column c4 bigint
+sql alter table db.stb add column c5 binary(12)
+
+sql show db.stables
+if $data[0][3] != 6 then
+ return -1
+endi
+
+sql show db.tables
+if $data[0][3] != 6 then
+ return -1
+endi
+
+sql select * from db.stb
+print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
+if $rows != 1 then
+ return -1
+endi
+if $data[0][1] != 1 then
+ return -1
+endi
+if $data[0][2] != 2 then
+ return -1
+endi
+if $data[0][3] != NULL then
+ return -1
+endi
+if $data[0][4] != NULL then
+ return -1
+endi
+if $data[0][5] != NULL then
+ return -1
+endi
+if $data[0][6] != 1 then
+ return -1
+endi
+
diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim
new file mode 100644
index 0000000000000000000000000000000000000000..e32abe4b7ff8850f9818113bed5f006c2182392e
--- /dev/null
+++ b/tests/script/tsim/testsuit.sim
@@ -0,0 +1,79 @@
+#run user/pass_alter.sim
+#run user/basic1.sim
+#run user/privilege2.sim
+#run user/user_len.sim
+#run user/privilege1.sim
+#run user/pass_len.sim
+#run tstream/basic1.sim
+#run tstream/basic0.sim
+#run table/basic1.sim
+#run trans/create_db.sim
+#run stable/alter1.sim
+#run stable/vnode3.sim
+#run stable/metrics.sim
+#run stable/show.sim
+#run stable/values.sim
+#run stable/dnode3.sim
+#run stable/refcount.sim
+#run stable/disk.sim
+#run db/basic1.sim
+#run db/basic3.sim
+#run db/basic7.sim
+#run db/basic6.sim
+#run db/create_all_options.sim
+#run db/basic2.sim
+#run db/error1.sim
+#run db/taosdlog.sim
+#run db/alter_option.sim
+#run mnode/basic1.sim
+#run parser/fourArithmetic-basic.sim
+#run parser/groupby-basic.sim
+#run snode/basic1.sim
+#run query/time_process.sim
+#run query/stddev.sim
+#run query/interval-offset.sim
+#run query/charScalarFunction.sim
+#run query/complex_select.sim
+#run query/explain.sim
+#run query/crash_sql.sim
+#run query/diff.sim
+#run query/complex_limit.sim
+#run query/complex_having.sim
+#run query/udf.sim
+#run query/complex_group.sim
+#run query/interval.sim
+#run query/session.sim
+
+print ========> dead lock failed when 2 rows in outputCapacity
+run query/scalarFunction.sim
+run query/scalarNull.sim
+run query/complex_where.sim
+run tmq/basic1.sim
+run tmq/basic4.sim
+run tmq/basic1Of2Cons.sim
+run tmq/prepareBasicEnv-1vgrp.sim
+run tmq/topic.sim
+run tmq/basic4Of2Cons.sim
+run tmq/prepareBasicEnv-4vgrp.sim
+run tmq/basic3.sim
+run tmq/basic2Of2Cons.sim
+run tmq/basic2.sim
+run tmq/basic3Of2Cons.sim
+run tmq/basic2Of2ConsOverlap.sim
+run tmq/clearConsume.sim
+run qnode/basic1.sim
+run dnode/basic1.sim
+run show/basic.sim
+run insert/basic1.sim
+run insert/basic0.sim
+run insert/backquote.sim
+run insert/null.sim
+run sync/oneReplica1VgElectWithInsert.sim
+run sync/threeReplica1VgElect.sim
+run sync/oneReplica1VgElect.sim
+run sync/insertDataByRunBack.sim
+run sync/threeReplica1VgElectWihtInsert.sim
+run sma/tsmaCreateInsertData.sim
+run sma/rsmaCreateInsertQuery.sim
+run valgrind/checkError.sim
+run bnode/basic1.sim
diff --git a/tests/system-test/7-tmq/subscribeStb.py b/tests/system-test/7-tmq/subscribeStb.py
index ec412920b479ed14251a58176985d0928f2cc2ee..6fcc2d5e5fab3bff2bb6ff295dda242a43f52b98 100644
--- a/tests/system-test/7-tmq/subscribeStb.py
+++ b/tests/system-test/7-tmq/subscribeStb.py
@@ -1079,6 +1079,291 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 10 end ...... ")
+ def tmqCase11(self, cfgPath, buildPath):
+ tdLog.printNoPrefix("======== test case 11: ")
+
+ self.initConsumerTable()
+
+ # create and start thread
+ parameterDict = {'cfg': '', \
+ 'actionType': 0, \
+ 'dbName': 'db11', \
+ 'dropFlag': 1, \
+ 'vgroups': 4, \
+ 'replica': 1, \
+ 'stbName': 'stb1', \
+ 'ctbNum': 10, \
+ 'rowsPerTbl': 10000, \
+ 'batchNum': 100, \
+ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000
+ parameterDict['cfg'] = cfgPath
+
+ self.create_database(tdSql, parameterDict["dbName"])
+ self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
+ self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
+ self.insert_data(tdSql,\
+ parameterDict["dbName"],\
+ parameterDict["stbName"],\
+ parameterDict["ctbNum"],\
+ parameterDict["rowsPerTbl"],\
+ parameterDict["batchNum"])
+
+ tdLog.info("create topics from stb1")
+ topicFromStb1 = 'topic_stb1'
+
+ tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
+ consumerId = 0
+ expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
+ topicList = topicFromStb1
+ ifcheckdata = 0
+ ifManualCommit = 1
+ keyList = 'group.id:cgrp1,\
+ enable.auto.commit:false,\
+ auto.commit.interval.ms:6000,\
+ auto.offset.reset:none'
+ self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ tdLog.info("start consume processor")
+ pollDelay = 5
+ showMsg = 1
+ showRow = 1
+ self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
+
+ tdLog.info("start to check consume result")
+ expectRows = 1
+ resultList = self.selectConsumeResult(expectRows)
+ totalConsumeRows = 0
+ for i in range(expectRows):
+ totalConsumeRows += resultList[i]
+
+ if totalConsumeRows != 0:
+ tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
+ tdLog.exit("tmq consume rows error!")
+
+ self.initConsumerInfoTable()
+ consumerId = 1
+ keyList = 'group.id:cgrp1,\
+ enable.auto.commit:false,\
+ auto.commit.interval.ms:6000,\
+ auto.offset.reset:none'
+ self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ tdLog.info("again start consume processor")
+ self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
+
+ tdLog.info("again check consume result")
+ expectRows = 2
+ resultList = self.selectConsumeResult(expectRows)
+ totalConsumeRows = 0
+ for i in range(expectRows):
+ totalConsumeRows += resultList[i]
+
+ if totalConsumeRows != 0:
+ tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
+ tdLog.exit("tmq consume rows error!")
+
+ tdSql.query("drop topic %s"%topicFromStb1)
+
+ tdLog.printNoPrefix("======== test case 11 end ...... ")
+
+ def tmqCase12(self, cfgPath, buildPath):
+ tdLog.printNoPrefix("======== test case 12: ")
+
+ self.initConsumerTable()
+
+ # create and start thread
+ parameterDict = {'cfg': '', \
+ 'actionType': 0, \
+ 'dbName': 'db12', \
+ 'dropFlag': 1, \
+ 'vgroups': 4, \
+ 'replica': 1, \
+ 'stbName': 'stb1', \
+ 'ctbNum': 10, \
+ 'rowsPerTbl': 10000, \
+ 'batchNum': 100, \
+ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000
+ parameterDict['cfg'] = cfgPath
+
+ self.create_database(tdSql, parameterDict["dbName"])
+ self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
+ self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
+ self.insert_data(tdSql,\
+ parameterDict["dbName"],\
+ parameterDict["stbName"],\
+ parameterDict["ctbNum"],\
+ parameterDict["rowsPerTbl"],\
+ parameterDict["batchNum"])
+
+ tdLog.info("create topics from stb1")
+ topicFromStb1 = 'topic_stb1'
+
+ tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
+ consumerId = 0
+ expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
+ topicList = topicFromStb1
+ ifcheckdata = 0
+ ifManualCommit = 0
+ keyList = 'group.id:cgrp1,\
+ enable.auto.commit:false,\
+ auto.commit.interval.ms:6000,\
+ auto.offset.reset:earliest'
+ self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ tdLog.info("start consume processor")
+ pollDelay = 5
+ showMsg = 1
+ showRow = 1
+ self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
+
+ tdLog.info("start to check consume result")
+ expectRows = 1
+ resultList = self.selectConsumeResult(expectRows)
+ totalConsumeRows = 0
+ for i in range(expectRows):
+ totalConsumeRows += resultList[i]
+
+ if totalConsumeRows != expectrowcnt/4:
+ tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
+ tdLog.exit("tmq consume rows error!")
+
+ self.initConsumerInfoTable()
+ consumerId = 1
+ keyList = 'group.id:cgrp1,\
+ enable.auto.commit:false,\
+ auto.commit.interval.ms:6000,\
+ auto.offset.reset:none'
+ self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ tdLog.info("again start consume processor")
+ self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
+
+ tdLog.info("again check consume result")
+ expectRows = 2
+ resultList = self.selectConsumeResult(expectRows)
+ totalConsumeRows = 0
+ for i in range(expectRows):
+ totalConsumeRows += resultList[i]
+
+ if totalConsumeRows != expectrowcnt/4:
+ tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
+ tdLog.exit("tmq consume rows error!")
+
+ tdSql.query("drop topic %s"%topicFromStb1)
+
+ tdLog.printNoPrefix("======== test case 12 end ...... ")
+
+ def tmqCase13(self, cfgPath, buildPath):
+ tdLog.printNoPrefix("======== test case 13: ")
+
+ self.initConsumerTable()
+
+ # create and start thread
+ parameterDict = {'cfg': '', \
+ 'actionType': 0, \
+ 'dbName': 'db13', \
+ 'dropFlag': 1, \
+ 'vgroups': 4, \
+ 'replica': 1, \
+ 'stbName': 'stb1', \
+ 'ctbNum': 10, \
+ 'rowsPerTbl': 10000, \
+ 'batchNum': 100, \
+ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000
+ parameterDict['cfg'] = cfgPath
+
+ self.create_database(tdSql, parameterDict["dbName"])
+ self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
+ self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
+ self.insert_data(tdSql,\
+ parameterDict["dbName"],\
+ parameterDict["stbName"],\
+ parameterDict["ctbNum"],\
+ parameterDict["rowsPerTbl"],\
+ parameterDict["batchNum"])
+
+ tdLog.info("create topics from stb1")
+ topicFromStb1 = 'topic_stb1'
+
+ tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
+ consumerId = 0
+ expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
+ topicList = topicFromStb1
+ ifcheckdata = 0
+ ifManualCommit = 1
+ keyList = 'group.id:cgrp1,\
+ enable.auto.commit:false,\
+ auto.commit.interval.ms:6000,\
+ auto.offset.reset:earliest'
+ self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ tdLog.info("start consume processor")
+ pollDelay = 5
+ showMsg = 1
+ showRow = 1
+ self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
+
+ tdLog.info("start to check consume result")
+ expectRows = 1
+ resultList = self.selectConsumeResult(expectRows)
+ totalConsumeRows = 0
+ for i in range(expectRows):
+ totalConsumeRows += resultList[i]
+
+ if totalConsumeRows != expectrowcnt/4:
+ tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
+ tdLog.exit("tmq consume rows error!")
+
+ self.initConsumerInfoTable()
+ consumerId = 1
+ ifManualCommit = 1
+ keyList = 'group.id:cgrp1,\
+ enable.auto.commit:false,\
+ auto.commit.interval.ms:6000,\
+ auto.offset.reset:none'
+ self.insertConsumerInfo(consumerId, expectrowcnt/2,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ tdLog.info("again start consume processor")
+ self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
+
+ tdLog.info("again check consume result")
+ expectRows = 2
+ resultList = self.selectConsumeResult(expectRows)
+ totalConsumeRows = 0
+ for i in range(expectRows):
+ totalConsumeRows += resultList[i]
+
+ if totalConsumeRows != expectrowcnt*(1/2+1/4):
+ tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*(1/2+1/4)))
+ tdLog.exit("tmq consume rows error!")
+
+ self.initConsumerInfoTable()
+ consumerId = 2
+ ifManualCommit = 1
+ keyList = 'group.id:cgrp1,\
+ enable.auto.commit:false,\
+ auto.commit.interval.ms:6000,\
+ auto.offset.reset:none'
+ self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
+
+ tdLog.info("again start consume processor")
+ self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
+
+ tdLog.info("again check consume result")
+ expectRows = 3
+ resultList = self.selectConsumeResult(expectRows)
+ totalConsumeRows = 0
+ for i in range(expectRows):
+ totalConsumeRows += resultList[i]
+
+ if totalConsumeRows != expectrowcnt:
+ tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
+ tdLog.exit("tmq consume rows error!")
+
+ tdSql.query("drop topic %s"%topicFromStb1)
+
+ tdLog.printNoPrefix("======== test case 13 end ...... ")
+
def run(self):
tdSql.prepare()
@@ -1099,8 +1384,10 @@ class TDTestCase:
self.tmqCase7(cfgPath, buildPath)
self.tmqCase8(cfgPath, buildPath)
self.tmqCase9(cfgPath, buildPath)
- self.tmqCase10(cfgPath, buildPath)
-
+ self.tmqCase10(cfgPath, buildPath)
+ self.tmqCase11(cfgPath, buildPath)
+ self.tmqCase12(cfgPath, buildPath)
+ self.tmqCase13(cfgPath, buildPath)
def stop(self):
tdSql.close()
diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c
index 21474c316f48a5cb8b06b9a3e8a24916133ce06b..d6c295a2229a8c471ba119ddde87d79f5fe4370c 100644
--- a/tools/shell/src/shellEngine.c
+++ b/tools/shell/src/shellEngine.c
@@ -315,6 +315,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
+ case TSDB_DATA_TYPE_JSON:
memcpy(buf, val, length);
buf[length] = 0;
taosFprintfFile(pFile, "\'%s\'", buf);
@@ -384,19 +385,25 @@ void shellPrintNChar(const char *str, int32_t length, int32_t width) {
while (pos < length) {
TdWchar wc;
int32_t bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
- if (bytes == 0) {
+ if (bytes <= 0) {
break;
}
- pos += bytes;
- if (pos > length) {
+
+ if (pos + bytes > length) {
break;
}
-
+ int w = 0;
#ifdef WINDOWS
- int32_t w = bytes;
+ w = bytes;
#else
- int32_t w = taosWcharWidth(wc);
+ if(*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r'){
+ w = bytes;
+ }else{
+ w = taosWcharWidth(wc);
+ }
#endif
+ pos += bytes;
+
if (w <= 0) {
continue;
}
@@ -496,6 +503,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
+ case TSDB_DATA_TYPE_JSON:
shellPrintNChar(val, length, width);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
@@ -604,7 +612,6 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
case TSDB_DATA_TYPE_DOUBLE:
return TMAX(25, width);
- case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_BINARY:
if (field->bytes > shell.args.displayWidth) {
return TMAX(shell.args.displayWidth, width);
@@ -612,7 +619,8 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
return TMAX(field->bytes, width);
}
- case TSDB_DATA_TYPE_NCHAR: {
+ case TSDB_DATA_TYPE_NCHAR:
+ case TSDB_DATA_TYPE_JSON: {
int16_t bytes = field->bytes * TSDB_NCHAR_SIZE;
if (bytes > shell.args.displayWidth) {
return TMAX(shell.args.displayWidth, width);