未验证 提交 37f7c0f2 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #12749 from taosdata/3.0

3.0
...@@ -351,9 +351,6 @@ bool nodesIsComparisonOp(const SOperatorNode* pOp); ...@@ -351,9 +351,6 @@ bool nodesIsComparisonOp(const SOperatorNode* pOp);
bool nodesIsJsonOp(const SOperatorNode* pOp); bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsRegularOp(const SOperatorNode* pOp); bool nodesIsRegularOp(const SOperatorNode* pOp);
bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery);
void* nodesGetValueFromNode(SValueNode* pNode); void* nodesGetValueFromNode(SValueNode* pNode);
int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value); int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value);
char* nodesGetStrValueFromNode(SValueNode* pNode); char* nodesGetStrValueFromNode(SValueNode* pNode);
......
...@@ -20,17 +20,23 @@ ...@@ -20,17 +20,23 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "tdef.h" #include "tdef.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
typedef uint64_t SyncNodeId; typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
typedef int64_t SyncIndex; typedef int64_t SyncIndex;
typedef uint64_t SyncTerm; typedef uint64_t SyncTerm;
typedef struct SSyncNode SSyncNode;
typedef struct SSyncBuffer SSyncBuffer;
typedef struct SWal SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry;
typedef enum { typedef enum {
TAOS_SYNC_STATE_FOLLOWER = 100, TAOS_SYNC_STATE_FOLLOWER = 100,
TAOS_SYNC_STATE_CANDIDATE = 101, TAOS_SYNC_STATE_CANDIDATE = 101,
...@@ -38,6 +44,17 @@ typedef enum { ...@@ -38,6 +44,17 @@ typedef enum {
TAOS_SYNC_STATE_ERROR = 103, TAOS_SYNC_STATE_ERROR = 103,
} ESyncState; } ESyncState;
typedef enum {
TAOS_SYNC_PROPOSE_SUCCESS = 0,
TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
TAOS_SYNC_PROPOSE_OTHER_ERROR = 2,
} ESyncProposeCode;
typedef enum {
TAOS_SYNC_FSM_CB_SUCCESS = 0,
TAOS_SYNC_FSM_CB_OTHER_ERROR = 1,
} ESyncFsmCbCode;
typedef struct SNodeInfo { typedef struct SNodeInfo {
uint16_t nodePort; uint16_t nodePort;
char nodeFqdn[TSDB_FQDN_LEN]; char nodeFqdn[TSDB_FQDN_LEN];
...@@ -55,11 +72,6 @@ typedef struct SSnapshot { ...@@ -55,11 +72,6 @@ typedef struct SSnapshot {
SyncTerm lastApplyTerm; SyncTerm lastApplyTerm;
} SSnapshot; } SSnapshot;
typedef enum {
TAOS_SYNC_FSM_CB_SUCCESS = 0,
TAOS_SYNC_FSM_CB_OTHER_ERROR,
} ESyncFsmCbCode;
typedef struct SFsmCbMeta { typedef struct SFsmCbMeta {
SyncIndex index; SyncIndex index;
bool isWeak; bool isWeak;
...@@ -68,27 +80,15 @@ typedef struct SFsmCbMeta { ...@@ -68,27 +80,15 @@ typedef struct SFsmCbMeta {
uint64_t seqNum; uint64_t seqNum;
} SFsmCbMeta; } SFsmCbMeta;
struct SRpcMsg;
typedef struct SRpcMsg SRpcMsg;
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* data; void* data;
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
} SSyncFSM; } SSyncFSM;
struct SSyncRaftEntry;
typedef struct SSyncRaftEntry SSyncRaftEntry;
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
// abstract definition of log store in raft // abstract definition of log store in raft
// SWal implements it // SWal implements it
typedef struct SSyncLogStore { typedef struct SSyncLogStore {
...@@ -117,11 +117,6 @@ typedef struct SSyncLogStore { ...@@ -117,11 +117,6 @@ typedef struct SSyncLogStore {
} SSyncLogStore; } SSyncLogStore;
struct SWal;
typedef struct SWal SWal;
struct SEpSet;
typedef struct SEpSet SEpSet;
typedef struct SSyncInfo { typedef struct SSyncInfo {
SyncGroupId vgId; SyncGroupId vgId;
...@@ -130,10 +125,8 @@ typedef struct SSyncInfo { ...@@ -130,10 +125,8 @@ typedef struct SSyncInfo {
SWal* pWal; SWal* pWal;
SSyncFSM* pFsm; SSyncFSM* pFsm;
SMsgCb* msgcb; SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
} SSyncInfo; } SSyncInfo;
int32_t syncInit(); int32_t syncInit();
...@@ -148,27 +141,8 @@ const char* syncGetMyRoleStr(int64_t rid); ...@@ -148,27 +141,8 @@ const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncGetVgId(int64_t rid); int32_t syncGetVgId(int64_t rid);
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
typedef enum { bool syncEnvIsStart();
TAOS_SYNC_PROPOSE_SUCCESS = 0,
TAOS_SYNC_PROPOSE_NOT_LEADER,
TAOS_SYNC_PROPOSE_OTHER_ERROR,
} ESyncProposeCode;
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart();
extern int32_t sDebugFlag;
//-----------------------------------------
struct SSyncNode;
typedef struct SSyncNode SSyncNode;
struct SSyncBuffer;
typedef struct SSyncBuffer SSyncBuffer;
//-----------------------------------------
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -20,9 +20,6 @@ ...@@ -20,9 +20,6 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h"
#include "cJSON.h"
#include "trpc.h" #include "trpc.h"
// ------------------ ds ------------------- // ------------------ ds -------------------
...@@ -32,9 +29,6 @@ typedef struct SRaftId { ...@@ -32,9 +29,6 @@ typedef struct SRaftId {
} SRaftId; } SRaftId;
// ------------------ control ------------------- // ------------------ control -------------------
struct SSyncNode;
typedef struct SSyncNode SSyncNode;
SSyncNode* syncNodeAcquire(int64_t rid); SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode); void syncNodeRelease(SSyncNode* pNode);
......
...@@ -650,6 +650,7 @@ int32_t* taosGetErrno(); ...@@ -650,6 +650,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_FUNCTION_NAME TAOS_DEF_ERROR_CODE(0, 0x264D) #define TSDB_CODE_PAR_INVALID_FUNCTION_NAME TAOS_DEF_ERROR_CODE(0, 0x264D)
#define TSDB_CODE_PAR_COMMENT_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x264E) #define TSDB_CODE_PAR_COMMENT_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x264E)
#define TSDB_CODE_PAR_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x264F) #define TSDB_CODE_PAR_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x264F)
#define TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY TAOS_DEF_ERROR_CODE(0, 0x2650)
//planner //planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
#include "tmsgtype.h" #include "tmsgtype.h"
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tref.h" #include "tref.h"
#include "cJSON.h"
#include "tdataformat.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
...@@ -268,7 +270,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t ...@@ -268,7 +270,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) { if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE; pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) { } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
} }
...@@ -803,6 +805,101 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { ...@@ -803,6 +805,101 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static char* parseTagDatatoJson(void *p){
char* string = NULL;
cJSON *json = cJSON_CreateObject();
if (json == NULL)
{
goto end;
}
int16_t nCols = kvRowNCols(p);
char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) {
SColIdx * pColIdx = kvRowColIdxAt(p, j);
char* val = (char*)(kvRowColVal(p, pColIdx));
if (j == 0){
if(*val == TSDB_DATA_TYPE_NULL){
string = taosMemoryCalloc(1, 8);
sprintf(varDataVal(string), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(string, strlen(varDataVal(string)));
goto end;
}
continue;
}
// json key encode by binary
memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, varDataVal(val), varDataLen(val));
// json value
val += varDataTLen(val);
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *val;
if(type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull();
if (value == NULL)
{
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL;
if (varDataLen(realData) > 0){
char *tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
int32_t length = taosUcs4ToMbs((TdUcs4 *)varDataVal(realData), varDataLen(realData), tagJsonValue);
if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val);
taosMemoryFree(tagJsonValue);
goto end;
}
value = cJSON_CreateString(tagJsonValue);
taosMemoryFree(tagJsonValue);
if (value == NULL)
{
goto end;
}
}else if(varDataLen(realData) == 0){
value = cJSON_CreateString("");
}else{
ASSERT(0);
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else if(type == TSDB_DATA_TYPE_DOUBLE){
double jsonVd = *(double*)(realData);
cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL)
{
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
}else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData);
cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL)
{
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
}else{
ASSERT(0);
}
}
string = cJSON_PrintUnformatted(json);
end:
cJSON_Delete(json);
return string;
}
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) { static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
int32_t type = pResultInfo->fields[i].type; int32_t type = pResultInfo->fields[i].type;
...@@ -833,9 +930,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -833,9 +930,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i]; pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
pResultInfo->row[i] = pResultInfo->pCol[i].pData; pResultInfo->row[i] = pResultInfo->pCol[i].pData;
} }else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
if (type == TSDB_DATA_TYPE_JSON) {
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]); char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
if (p == NULL) { if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -848,6 +943,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -848,6 +943,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (pCol->offset[j] != -1) { if (pCol->offset[j] != -1) {
char* pStart = pCol->offset[j] + pCol->pData; char* pStart = pCol->offset[j] + pCol->pData;
int32_t jsonInnerType = *pStart; int32_t jsonInnerType = *pStart;
char* jsonInnerData = pStart + CHAR_BYTES; char* jsonInnerData = pStart + CHAR_BYTES;
char dst[TSDB_MAX_JSON_TAG_LEN] = {0}; char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
...@@ -855,15 +951,9 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int ...@@ -855,15 +951,9 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L); sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst))); varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) { } else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
int32_t length = char *jsonString = parseTagDatatoJson(jsonInnerData);
taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst)); STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString);
if (length <= 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
varDataVal(jsonInnerData));
length = 0;
}
varDataSetLen(dst, length);
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value" } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
*(char*)varDataVal(dst) = '\"'; *(char*)varDataVal(dst) = '\"';
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
......
...@@ -122,10 +122,14 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con ...@@ -122,10 +122,14 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
dataLen = 0; dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) { } else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES); dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) { } else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES; dataLen = DOUBLE_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) { } else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES; dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_JSON) {
dataLen = kvRowLen(pData + CHAR_BYTES);
} else {
ASSERT(0);
} }
dataLen += CHAR_BYTES; dataLen += CHAR_BYTES;
} }
......
...@@ -40,11 +40,11 @@ bool tsPrintAuth = false; ...@@ -40,11 +40,11 @@ bool tsPrintAuth = false;
// multi process // multi process
int32_t tsMultiProcess = 0; int32_t tsMultiProcess = 0;
int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 128; int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 1024;
int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128; int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 1024;
int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128; int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128; int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128; int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 1024;
int32_t tsNumOfShmThreads = 1; int32_t tsNumOfShmThreads = 1;
// queue & threads // queue & threads
...@@ -380,11 +380,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -380,11 +380,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1; if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_WAL_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2; tsNumOfRpcThreads = tsNumOfCores / 2;
......
...@@ -45,20 +45,20 @@ typedef struct SVnodeCfg SVnodeCfg; ...@@ -45,20 +45,20 @@ typedef struct SVnodeCfg SVnodeCfg;
extern const SVnodeCfg vnodeCfgDefault; extern const SVnodeCfg vnodeCfgDefault;
int vnodeInit(int nthreads); int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup(); void vnodeCleanup();
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int vnodeValidateTableHash(SVnode *pVnode, char *tableFName); int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodeStart(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode); void vnodeStop(SVnode *pVnode);
...@@ -74,8 +74,8 @@ typedef struct SMetaEntry SMetaEntry; ...@@ -74,8 +74,8 @@ typedef struct SMetaEntry SMetaEntry;
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderClear(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader);
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int metaReadNext(SMetaReader *pReader); int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid); const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
#if 1 // refact APIs below (TODO) #if 1 // refact APIs below (TODO)
...@@ -86,7 +86,7 @@ typedef struct SMTbCursor SMTbCursor; ...@@ -86,7 +86,7 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta); SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur); void metaCloseTbCursor(SMTbCursor *pTbCur);
int metaTbCursorNext(SMTbCursor *pTbCur); int32_t metaTbCursorNext(SMTbCursor *pTbCur);
#endif #endif
// tsdb // tsdb
...@@ -124,8 +124,8 @@ typedef struct STqReadHandle STqReadHandle; ...@@ -124,8 +124,8 @@ typedef struct STqReadHandle STqReadHandle;
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
...@@ -207,15 +207,15 @@ struct SMetaReader { ...@@ -207,15 +207,15 @@ struct SMetaReader {
SDecoder coder; SDecoder coder;
SMetaEntry me; SMetaEntry me;
void *pBuf; void *pBuf;
int szBuf; int32_t szBuf;
}; };
struct SMTbCursor { struct SMTbCursor {
TBC *pDbc; TBC *pDbc;
void *pKey; void *pKey;
void *pVal; void *pVal;
int kLen; int32_t kLen;
int vLen; int32_t vLen;
SMetaReader mr; SMetaReader mr;
}; };
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
extern "C" { extern "C" {
#endif #endif
// vnodeDebug ====================
// clang-format off // clang-format off
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
...@@ -34,17 +33,17 @@ extern "C" { ...@@ -34,17 +33,17 @@ extern "C" {
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) #define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
// vnodeCfg ==================== // vnodeCfg.c
extern const SVnodeCfg vnodeCfgDefault; extern const SVnodeCfg vnodeCfgDefault;
int vnodeCheckCfg(const SVnodeCfg*); int32_t vnodeCheckCfg(const SVnodeCfg*);
int vnodeEncodeConfig(const void* pObj, SJson* pJson); int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson);
int vnodeDecodeConfig(const SJson* pJson, void* pObj); int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj);
// vnodeModule ==================== // vnodeModule.c
int vnodeScheduleTask(int (*execute)(void*), void* arg); int32_t vnodeScheduleTask(int32_t (*execute)(void*), void* arg);
// vnodeBufPool ==================== // vnodeBufPool.c
typedef struct SVBufPoolNode SVBufPoolNode; typedef struct SVBufPoolNode SVBufPoolNode;
struct SVBufPoolNode { struct SVBufPoolNode {
SVBufPoolNode* prev; SVBufPoolNode* prev;
...@@ -62,37 +61,29 @@ struct SVBufPool { ...@@ -62,37 +61,29 @@ struct SVBufPool {
SVBufPoolNode node; SVBufPoolNode node;
}; };
int vnodeOpenBufPool(SVnode* pVnode, int64_t size); int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size);
int vnodeCloseBufPool(SVnode* pVnode); int32_t vnodeCloseBufPool(SVnode* pVnode);
void vnodeBufPoolReset(SVBufPool* pPool); void vnodeBufPoolReset(SVBufPool* pPool);
// vnodeQuery ==================== // vnodeQuery.c
int vnodeQueryOpen(SVnode* pVnode); int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode);
int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit ==================== // vnodeCommit.c
int vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
int vnodeShouldCommit(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode);
int vnodeCommit(SVnode* pVnode); int32_t vnodeCommit(SVnode* pVnode);
int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int vnodeSyncCommit(SVnode* pVnode); int32_t vnodeSyncCommit(SVnode* pVnode);
int vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode);
// vnodeCommit ==================== // vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
int32_t vnodeSyncStart(SVnode* pVnode); void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeSyncSetMsgCb(SVnode* pVnode);
int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg);
void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
SSyncFSM* syncVnodeMakeFsm();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -56,8 +56,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { ...@@ -56,8 +56,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1; if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
if (pME->type == TSDB_SUPER_TABLE) { if (pME->type == TSDB_SUPER_TABLE) {
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schema) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schema) < 0) return -1;
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
} else if (pME->type == TSDB_CHILD_TABLE) { } else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
...@@ -67,10 +67,10 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { ...@@ -67,10 +67,10 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schema) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
} else if (pME->type == TSDB_TSMA_TABLE) { } else if (pME->type == TSDB_TSMA_TABLE) {
pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma)); pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma));
if(!pME->smaEntry.tsma) { if (!pME->smaEntry.tsma) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
......
...@@ -425,6 +425,12 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* ...@@ -425,6 +425,12 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
rowLen += pCond->colList[i].bytes; rowLen += pCond->colList[i].bytes;
} }
// make sure the output SSDataBlock size be less than 2MB.
int32_t TWOMB = 2 * 1024 * 1024;
if (pReadHandle->outputCapacity * rowLen > TWOMB) {
pReadHandle->outputCapacity = TWOMB / rowLen;
}
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
if (pReadHandle->suppInfo.pstatis == NULL) { if (pReadHandle->suppInfo.pstatis == NULL) {
...@@ -1302,20 +1308,22 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* ...@@ -1302,20 +1308,22 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey));
if (cacheDataInFileBlockHole) {
// do not load file block into buffer // do not load file block into buffer
int32_t step = ascScan ? 1 : -1; int32_t step = ascScan ? 1 : -1;
TSKEY maxKey = TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step);
ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
cur->rows = cur->rows =
tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle); tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
pTsdbReadHandle->realNumOfRows = cur->rows; pTsdbReadHandle->realNumOfRows = cur->rows;
// update the last key value // update the last key value
pCheckInfo->lastKey = cur->win.ekey + step; pCheckInfo->lastKey = cur->win.ekey + step;
if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
if (!ascScan) {
TSWAP(cur->win.skey, cur->win.ekey); TSWAP(cur->win.skey, cur->win.ekey);
} }
...@@ -1334,18 +1342,16 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* ...@@ -1334,18 +1342,16 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
/* /*
* no data in cache, only load data from file * no data in cache, only load data from file
* during the query processing, data in cache will not be checked anymore. * during the query processing, data in cache will not be checked anymore.
*
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer * Here the buffer is not enough, so only part of file block can be loaded into memory buffer
*/ */
assert(pTsdbReadHandle->outputCapacity >= binfo.rows);
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo); int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &binfo);
if ((cur->pos == 0 && endPos == binfo.rows - 1 && ascScan) || bool wholeBlockReturned = ((abs(cur->pos - endPos) + 1) == binfo.rows);
(cur->pos == (binfo.rows - 1) && endPos == 0 && (!ascScan))) { if (wholeBlockReturned) {
pTsdbReadHandle->realNumOfRows = binfo.rows; pTsdbReadHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows; cur->rows = binfo.rows;
cur->win = binfo.window; cur->win = binfo.window;
cur->mixBlock = false; cur->mixBlock = false;
cur->blockCompleted = true; cur->blockCompleted = true;
...@@ -1356,12 +1362,24 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* ...@@ -1356,12 +1362,24 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
cur->lastKey = binfo.window.skey - 1; cur->lastKey = binfo.window.skey - 1;
cur->pos = -1; cur->pos = -1;
} }
} else { // partially copy to dest buffer } else { // partially copy to dest buffer
// make sure to only load once
bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows -1 && (!ascScan)));
if (pTsdbReadHandle->outputCapacity < binfo.rows && firstTimeExtract) {
code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos); copyAllRemainRowsFromFileBlock(pTsdbReadHandle, pCheckInfo, &binfo, endPos);
cur->mixBlock = true; cur->mixBlock = true;
} }
assert(cur->blockCompleted); if (pTsdbReadHandle->outputCapacity >= binfo.rows) {
ASSERT(cur->blockCompleted);
}
if (cur->rows == binfo.rows) { if (cur->rows == binfo.rows) {
tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s", tsdbDebug("%p whole file block qualified, brange:%" PRId64 "-%" PRId64 ", rows:%d, lastKey:%" PRId64 ", %s",
pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr); pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr);
...@@ -1858,15 +1876,14 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa ...@@ -1858,15 +1876,14 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
TSKEY* tsArray = pCols->cols[0].pData; TSKEY* tsArray = pCols->cols[0].pData;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
int32_t pos = cur->pos; int32_t step = ascScan? 1 : -1;
int32_t start = cur->pos; int32_t start = cur->pos;
int32_t end = endPos; int32_t end = endPos;
if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { if (!ascScan) {
TSWAP(start, end); TSWAP(start, end);
} }
...@@ -1876,11 +1893,11 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa ...@@ -1876,11 +1893,11 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
// the time window should always be ascending order: skey <= ekey // the time window should always be ascending order: skey <= ekey
cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]}; cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]};
cur->mixBlock = (numOfRows != pBlockInfo->rows); cur->mixBlock = (numOfRows != pBlockInfo->rows);
cur->lastKey = tsArray[endPos] + step; cur->lastKey = tsArray[endPos] + step;
cur->blockCompleted = true; cur->blockCompleted = (ascScan? (endPos == pBlockInfo->rows - 1):(endPos == 0));
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases. // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
pos = endPos + step; int32_t pos = endPos + step;
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pTsdbReadHandle); doCheckGeneratedBlockRange(pTsdbReadHandle);
...@@ -1892,20 +1909,44 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa ...@@ -1892,20 +1909,44 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) { int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) {
// NOTE: reverse the order to find the end position in data block // NOTE: reverse the order to find the end position in data block
int32_t endPos = -1; int32_t endPos = -1;
int32_t order = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
int32_t order = ascScan? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pTsdbReadHandle->cur; SQueryFilePos* cur = &pTsdbReadHandle->cur;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) { if (pTsdbReadHandle->outputCapacity >= pBlockInfo->rows) {
endPos = pBlockInfo->rows - 1; if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
cur->mixBlock = (cur->pos != 0); endPos = pBlockInfo->rows - 1;
} else if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) { cur->mixBlock = (cur->pos != 0);
endPos = 0; } else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
cur->mixBlock = (cur->pos != pBlockInfo->rows - 1); endPos = 0;
cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
} else {
assert(pCols->numOfRows > 0);
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
cur->mixBlock = true;
}
} else { } else {
assert(pCols->numOfRows > 0); if (ascScan && pTsdbReadHandle->window.ekey >= pBlockInfo->window.ekey) {
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order); endPos = TMIN(cur->pos + pTsdbReadHandle->outputCapacity - 1, pBlockInfo->rows - 1);
} else if ((!ascScan) && pTsdbReadHandle->window.ekey <= pBlockInfo->window.skey) {
endPos = TMAX(cur->pos - pTsdbReadHandle->outputCapacity + 1, 0);
} else {
ASSERT(pCols->numOfRows > 0);
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pTsdbReadHandle->window.ekey, order);
// current data is more than the capacity
int32_t size = abs(cur->pos - endPos) + 1;
if (size > pTsdbReadHandle->outputCapacity) {
int32_t delta = size - pTsdbReadHandle->outputCapacity;
if (ascScan) {
endPos -= delta;
} else {
endPos += delta;
}
}
}
cur->mixBlock = true; cur->mixBlock = true;
} }
...@@ -2369,7 +2410,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu ...@@ -2369,7 +2410,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu
static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists); static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exists);
static int32_t getDataBlockRv(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) { static int32_t getDataBlock(STsdbReadHandle* pTsdbReadHandle, STableBlockInfo* pNext, bool* exists) {
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
SQueryFilePos* cur = &pTsdbReadHandle->cur; SQueryFilePos* cur = &pTsdbReadHandle->cur;
...@@ -2478,7 +2519,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi ...@@ -2478,7 +2519,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
cur->fid = pTsdbReadHandle->pFileGroup->fid; cur->fid = pTsdbReadHandle->pFileGroup->fid;
STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
return getDataBlockRv(pTsdbReadHandle, pBlockInfo, exists); return getDataBlock(pTsdbReadHandle, pBlockInfo, exists);
} }
static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) { static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
...@@ -2643,7 +2684,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis ...@@ -2643,7 +2684,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
} else { } else {
moveToNextDataBlockInCurrentFile(pTsdbReadHandle); moveToNextDataBlockInCurrentFile(pTsdbReadHandle);
STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pNext = &pTsdbReadHandle->pDataBlockInfo[cur->slot];
return getDataBlockRv(pTsdbReadHandle, pNext, exists); return getDataBlock(pTsdbReadHandle, pNext, exists);
} }
} }
} }
......
...@@ -180,7 +180,6 @@ void vnodeClose(SVnode *pVnode) { ...@@ -180,7 +180,6 @@ void vnodeClose(SVnode *pVnode) {
// start the sync timer after the queue is ready // start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) { int32_t vnodeStart(SVnode *pVnode) {
vnodeSyncSetMsgCb(pVnode);
vnodeSyncStart(pVnode); vnodeSyncStart(pVnode);
return 0; return 0;
} }
......
...@@ -13,71 +13,62 @@ ...@@ -13,71 +13,62 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "vnd.h" #include "vnd.h"
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
SSyncInfo syncInfo; static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
syncInfo.vgId = pVnode->config.vgId; static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
SSyncCfg *pCfg = &(syncInfo.syncCfg); static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
pCfg->replicaNum = pVnode->config.syncCfg.replicaNum; static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
pCfg->myIndex = pVnode->config.syncCfg.myIndex; static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo)); static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", path);
syncInfo.pWal = pVnode->pWal;
syncInfo.pFsm = syncVnodeMakeFsm(pVnode); int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
syncInfo.msgcb = NULL; SSyncInfo syncInfo = {
syncInfo.FpSendMsg = vnodeSyncSendMsg; .vgId = pVnode->config.vgId,
syncInfo.FpEqMsg = vnodeSyncEqMsg; .syncCfg = pVnode->config.syncCfg,
.pWal = pVnode->pWal,
.msgcb = NULL,
.FpSendMsg = vnodeSyncSendMsg,
.FpEqMsg = vnodeSyncEqMsg,
};
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
pVnode->sync = syncOpen(&syncInfo); pVnode->sync = syncOpen(&syncInfo);
assert(pVnode->sync > 0); if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
return -1;
}
// for test
setPingTimerMS(pVnode->sync, 3000); setPingTimerMS(pVnode->sync, 3000);
setElectTimerMS(pVnode->sync, 500); setElectTimerMS(pVnode->sync, 500);
setHeartbeatTimerMS(pVnode->sync, 100); setHeartbeatTimerMS(pVnode->sync, 100);
return 0; return 0;
} }
int32_t vnodeSyncStart(SVnode *pVnode) { void vnodeSyncStart(SVnode *pVnode) {
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync); syncStart(pVnode->sync);
return 0;
} }
void vnodeSyncClose(SVnode *pVnode) { void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
// stop by ref id
syncStop(pVnode->sync);
}
void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); }
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
pMsg->info.noResp = 1;
return tmsgSendReq(pEpSet, pMsg);
}
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SVnode *pVnode = (SVnode *)(pFsm->data);
vnodeGetSnapshot(pVnode, pSnapshot);
/*
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = 0;
pSnapshot->lastApplyTerm = 0;
*/
int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
vnodeGetSnapshot(pFsm->data, pSnapshot);
return 0; return 0;
} }
void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshot != NULL) {
SSnapshot snapshot; SSnapshot snapshot = {0};
pFsm->FpGetSnapshot(pFsm, &snapshot); pFsm->FpGetSnapshot(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
...@@ -128,7 +119,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb ...@@ -128,7 +119,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
} }
} }
void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
...@@ -136,19 +127,19 @@ void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta ...@@ -136,19 +127,19 @@ void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
SSyncFSM *syncVnodeMakeFsm(SVnode *pVnode) { SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode; pFsm->data = pVnode;
pFsm->FpCommitCb = vnodeSyncCommitCb; pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpPreCommitCb = vnodeSyncPreCommitCb; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackCb; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshotCb; pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
return pFsm; return pFsm;
} }
\ No newline at end of file
...@@ -3546,11 +3546,12 @@ _error: ...@@ -3546,11 +3546,12 @@ _error:
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) { int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) {
// todo add more information about exchange operation // todo add more information about exchange operation
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
*order = TSDB_ORDER_ASC; *order = TSDB_ORDER_ASC;
*scanFlag = MAIN_SCAN; *scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
*order = pTableScanInfo->cond.order; *order = pTableScanInfo->cond.order;
*scanFlag = pTableScanInfo->scanFlag; *scanFlag = pTableScanInfo->scanFlag;
...@@ -3910,6 +3911,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -3910,6 +3911,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag); int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
...@@ -4203,7 +4207,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -4203,7 +4207,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->pScalarExprInfo = pScalarExprInfo; pInfo->pScalarExprInfo = pScalarExprInfo;
pInfo->numOfScalarExpr = numOfScalarExpr; pInfo->numOfScalarExpr = numOfScalarExpr;
if (pInfo->pScalarExprInfo != NULL) { if (pInfo->pScalarExprInfo != NULL) {
pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfCols, &pInfo->rowCellInfoOffset); pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
} }
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
...@@ -4311,23 +4315,29 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p ...@@ -4311,23 +4315,29 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
// Make sure the size of SSDataBlock will never exceed the size of 2MB.
int32_t TWOMB = 2 * 1024 * 1024;
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
numOfRows = TWOMB / pResBlock->info.rowSize;
}
initResultSizeInfo(pOperator, numOfRows); initResultSizeInfo(pOperator, numOfRows);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
pOperator->name = "ProjectOperator"; pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = num; pOperator->numOfExprs = num;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
destroyProjectOperatorInfo, NULL, NULL, NULL); destroyProjectOperatorInfo, NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
......
...@@ -300,10 +300,26 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) ...@@ -300,10 +300,26 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
if (fmIsScanPseudoColumnFunc(functionId)) { if (fmIsScanPseudoColumnFunc(functionId)) {
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId); setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
} else { // these are tags } else { // these are tags
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId); const char* p = NULL;
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
const uint8_t *tmp = mr.me.ctbEntry.pTags;
char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if(data == NULL){
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
return;
}
*data = TSDB_DATA_TYPE_JSON;
memcpy(data+1, tmp, kvRowLen(tmp));
p = data;
}else{
p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, p, (p == NULL)); colDataAppend(pColInfoData, i, p, (p == NULL));
} }
if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
taosMemoryFree((void*)p);
}
} }
} }
...@@ -1587,8 +1603,21 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1587,8 +1603,21 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
STR_TO_VARSTR(str, mr.me.name); STR_TO_VARSTR(str, mr.me.name);
colDataAppend(pDst, count, str, false); colDataAppend(pDst, count, str, false);
} else { // it is a tag value } else { // it is a tag value
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId); if(pDst->info.type == TSDB_DATA_TYPE_JSON){
colDataAppend(pDst, count, p, (p == NULL)); const uint8_t *tmp = mr.me.ctbEntry.pTags;
char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1);
if(data == NULL){
qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1);
return NULL;
}
*data = TSDB_DATA_TYPE_JSON;
memcpy(data+1, tmp, kvRowLen(tmp));
colDataAppend(pDst, count, data, false);
taosMemoryFree(data);
}else{
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
colDataAppend(pDst, count, p, (p == NULL));
}
} }
} }
......
...@@ -76,6 +76,11 @@ int32_t firstFunction(SqlFunctionCtx *pCtx); ...@@ -76,6 +76,11 @@ int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx);
int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx); int32_t topFunction(SqlFunctionCtx *pCtx);
int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx);
......
...@@ -493,6 +493,21 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l ...@@ -493,6 +493,21 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return TSDB_CODE_SUCCESS;
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The parameters of UNIQUE can only be columns");
}
pFunc->node.resType = ((SExprNode*)pPara)->resType;
return TSDB_CODE_SUCCESS;
}
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t paraLen = LIST_LENGTH(pFunc->pParameterList); int32_t paraLen = LIST_LENGTH(pFunc->pParameterList);
if (paraLen == 0 || paraLen > 2) { if (paraLen == 0 || paraLen > 2) {
...@@ -878,14 +893,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -878,14 +893,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = lastFinalize .finalizeFunc = lastFinalize
}, },
{ {
.name = "diff", .name = "unique",
.type = FUNCTION_TYPE_DIFF, .type = FUNCTION_TYPE_UNIQUE,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateDiff, .translateFunc = translateUnique,
.getEnvFunc = getDiffFuncEnv, .getEnvFunc = getUniqueFuncEnv,
.initFunc = diffFunctionSetup, .initFunc = uniqueFunctionSetup,
.processFunc = diffFunction, .processFunc = uniqueFunction,
.finalizeFunc = functionFinalize .finalizeFunc = uniqueFinalize
}, },
{ {
.name = "histogram", .name = "histogram",
...@@ -907,6 +922,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -907,6 +922,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = hllFunction, .processFunc = hllFunction,
.finalizeFunc = hllFinalize .finalizeFunc = hllFinalize
}, },
{
.name = "diff",
.type = FUNCTION_TYPE_DIFF,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateDiff,
.getEnvFunc = getDiffFuncEnv,
.initFunc = diffFunctionSetup,
.processFunc = diffFunction,
.finalizeFunc = functionFinalize
},
{ {
.name = "state_count", .name = "state_count",
.type = FUNCTION_TYPE_STATE_COUNT, .type = FUNCTION_TYPE_STATE_COUNT,
......
...@@ -28,12 +28,15 @@ ...@@ -28,12 +28,15 @@
#define TAIL_MAX_POINTS_NUM 100 #define TAIL_MAX_POINTS_NUM 100
#define TAIL_MAX_OFFSET 100 #define TAIL_MAX_OFFSET 100
#define UNIQUE_MAX_RESULT_SIZE (1024*1024*10)
#define HLL_BUCKET_BITS 14 // The bits of the bucket #define HLL_BUCKET_BITS 14 // The bits of the bucket
#define HLL_DATA_BITS (64-HLL_BUCKET_BITS) #define HLL_DATA_BITS (64-HLL_BUCKET_BITS)
#define HLL_BUCKETS (1<<HLL_BUCKET_BITS) #define HLL_BUCKETS (1<<HLL_BUCKET_BITS)
#define HLL_BUCKET_MASK (HLL_BUCKETS-1) #define HLL_BUCKET_MASK (HLL_BUCKETS-1)
#define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2) #define HLL_ALPHA_INF 0.721347520444481703680 // constant for 0.5/ln(2)
typedef struct SSumRes { typedef struct SSumRes {
union { union {
int64_t isum; int64_t isum;
...@@ -197,6 +200,20 @@ typedef struct STailInfo { ...@@ -197,6 +200,20 @@ typedef struct STailInfo {
STailItem **pItems; STailItem **pItems;
} STailInfo; } STailInfo;
typedef struct SUniqueItem {
int64_t timestamp;
bool isNull;
char data[];
} SUniqueItem;
typedef struct SUniqueInfo {
int32_t numOfPoints;
uint8_t colType;
int16_t colBytes;
SHashObj *pHash;
char pItems[];
} SUniqueInfo;
#define SET_VAL(_info, numOfElem, res) \ #define SET_VAL(_info, numOfElem, res) \
do { \ do { \
if ((numOfElem) <= 0) { \ if ((numOfElem) <= 0) { \
...@@ -216,6 +233,18 @@ typedef struct STailInfo { ...@@ -216,6 +233,18 @@ typedef struct STailInfo {
} \ } \
} while (0); } while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ #define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \ do { \
if (((left) < (right)) ^ (sign)) { \ if (((left) < (right)) ^ (sign)) { \
...@@ -748,50 +777,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { ...@@ -748,50 +777,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true; return true;
} }
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx* __ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaries.num; ++_i) { \
SqlFunctionCtx* __ctx = (ctx)->subsidiaries.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_SUBSID_RES(ctx, _ts); \
(num) += 1; \
} \
} while (0)
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t* d = (_t*)((_col)->pData); \
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
} \
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
} \
} while (0)
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
...@@ -1994,6 +1979,99 @@ int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -1994,6 +1979,99 @@ int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
bool getUniqueFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SUniqueInfo) + UNIQUE_MAX_RESULT_SIZE;
return true;
}
bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false;
}
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->numOfPoints = 0;
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
if (pInfo->pHash != NULL) {
taosHashClear(pInfo->pHash);
} else {
pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
}
return true;
}
static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) {
int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
SUniqueItem *pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
if (pHashItem == NULL) {
int32_t size = sizeof(SUniqueItem) + pInfo->colBytes;
SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + pInfo->numOfPoints * size);
pItem->timestamp = ts;
memcpy(pItem->data, data, pInfo->colBytes);
taosHashPut(pInfo->pHash, data, hashKeyBytes, (char *)pItem, sizeof(SUniqueItem*));
pInfo->numOfPoints++;
} else if (pHashItem->timestamp > ts) {
pHashItem->timestamp = ts;
}
}
int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i);
doUniqueAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i));
if (sizeof(SUniqueInfo) + pInfo->numOfPoints * (sizeof(SUniqueItem) + pInfo->colBytes) >= UNIQUE_MAX_RESULT_SIZE) {
taosHashCleanup(pInfo->pHash);
return 0;
}
}
//taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn);
//for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
// int32_t pos = startOffset + i;
// STailItem *pItem = pInfo->pItems[i];
// if (pItem->isNull) {
// colDataAppendNULL(pOutput, pos);
// } else {
// colDataAppend(pOutput, pos, pItem->data, false);
// }
//}
pResInfo->numOfRes = pInfo->numOfPoints;
return TSDB_CODE_SUCCESS;
}
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
for (int32_t i = 0; i < pResInfo->numOfRes; ++i) {
SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes));
colDataAppend(pCol, i, pItem->data, false);
//TODO: handle ts output
}
return pResInfo->numOfRes;
}
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SDiffInfo); pEnv->calcMemSize = sizeof(SDiffInfo);
return true; return true;
...@@ -2106,7 +2184,7 @@ static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SCo ...@@ -2106,7 +2184,7 @@ static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SCo
default: default:
ASSERT(0); ASSERT(0);
} }
} }
int32_t diffFunction(SqlFunctionCtx* pCtx) { int32_t diffFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
......
...@@ -1771,6 +1771,7 @@ static const char* jkSubplanId = "Id"; ...@@ -1771,6 +1771,7 @@ static const char* jkSubplanId = "Id";
static const char* jkSubplanType = "SubplanType"; static const char* jkSubplanType = "SubplanType";
static const char* jkSubplanMsgType = "MsgType"; static const char* jkSubplanMsgType = "MsgType";
static const char* jkSubplanLevel = "Level"; static const char* jkSubplanLevel = "Level";
static const char* jkSubplanDbFName = "DbFName";
static const char* jkSubplanNodeAddr = "NodeAddr"; static const char* jkSubplanNodeAddr = "NodeAddr";
static const char* jkSubplanRootNode = "RootNode"; static const char* jkSubplanRootNode = "RootNode";
static const char* jkSubplanDataSink = "DataSink"; static const char* jkSubplanDataSink = "DataSink";
...@@ -1788,6 +1789,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) { ...@@ -1788,6 +1789,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanLevel, pNode->level); code = tjsonAddIntegerToObject(pJson, jkSubplanLevel, pNode->level);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkSubplanDbFName, pNode->dbFName);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSubplanNodeAddr, queryNodeAddrToJson, &pNode->execNode); code = tjsonAddObject(pJson, jkSubplanNodeAddr, queryNodeAddrToJson, &pNode->execNode);
} }
...@@ -1815,6 +1819,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) { ...@@ -1815,6 +1819,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanLevel, &pNode->level); code = tjsonGetIntValue(pJson, jkSubplanLevel, &pNode->level);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkSubplanDbFName, pNode->dbFName);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkSubplanNodeAddr, jsonToQueryNodeAddr, &pNode->execNode); code = tjsonToObject(pJson, jkSubplanNodeAddr, jsonToQueryNodeAddr, &pNode->execNode);
} }
......
...@@ -1137,10 +1137,6 @@ bool nodesIsRegularOp(const SOperatorNode* pOp) { ...@@ -1137,10 +1137,6 @@ bool nodesIsRegularOp(const SOperatorNode* pOp) {
return false; return false;
} }
bool nodesIsTimeorderQuery(const SNode* pQuery) { return false; }
bool nodesIsTimelineQuery(const SNode* pQuery) { return false; }
typedef struct SCollectColumnsCxt { typedef struct SCollectColumnsCxt {
int32_t errCode; int32_t errCode;
const char* pTableAlias; const char* pTableAlias;
......
...@@ -382,6 +382,35 @@ static bool isInternalPrimaryKey(const SColumnNode* pCol) { ...@@ -382,6 +382,35 @@ static bool isInternalPrimaryKey(const SColumnNode* pCol) {
return PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME); return PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME);
} }
static bool isTimeOrderQuery(SNode* pStmt) {
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
return ((SSelectStmt*)pStmt)->isTimeOrderQuery;
} else {
return false;
}
}
static bool isPrimaryKeyImpl(STempTableNode* pTable, SNode* pExpr) {
if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId);
} else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) {
SFunctionNode* pFunc = (SFunctionNode*)pExpr;
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType) {
return isPrimaryKeyImpl(pTable, nodesListGetNode(pFunc->pParameterList, 0));
} else if (FUNCTION_TYPE_WSTARTTS == pFunc->funcType || FUNCTION_TYPE_WENDTS == pFunc->funcType) {
return true;
}
}
return false;
}
static bool isPrimaryKey(STempTableNode* pTable, SNode* pExpr) {
if (!isTimeOrderQuery(pTable->pSubquery)) {
return false;
}
return isPrimaryKeyImpl(pTable, pExpr);
}
static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) { static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
bool found = false; bool found = false;
if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) { if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
...@@ -404,8 +433,7 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) { ...@@ -404,8 +433,7 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) {
FOREACH(pNode, pProjectList) { FOREACH(pNode, pProjectList) {
SExprNode* pExpr = (SExprNode*)pNode; SExprNode* pExpr = (SExprNode*)pNode;
if (0 == strcmp(pCol->colName, pExpr->aliasName) || if (0 == strcmp(pCol->colName, pExpr->aliasName) ||
((QUERY_NODE_COLUMN == nodeType(pExpr) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId) && (isPrimaryKey((STempTableNode*)pTable, pNode) && isInternalPrimaryKey(pCol))) {
isInternalPrimaryKey(pCol))) {
setColumnInfoByExpr(pTable, pExpr, pCol); setColumnInfoByExpr(pTable, pExpr, pCol);
found = true; found = true;
break; break;
...@@ -454,6 +482,9 @@ static EDealRes translateColumnWithoutPrefix(STranslateContext* pCxt, SColumnNod ...@@ -454,6 +482,9 @@ static EDealRes translateColumnWithoutPrefix(STranslateContext* pCxt, SColumnNod
} }
if (!found) { if (!found) {
if (isInternalPk) { if (isInternalPk) {
if (NULL != pCxt->pCurrStmt->pWindow) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY);
}
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_INTERNAL_PK); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_INTERNAL_PK);
} else { } else {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_COLUMN, pCol->colName); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_COLUMN, pCol->colName);
...@@ -781,7 +812,6 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) ...@@ -781,7 +812,6 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
} }
pCxt->pCurrStmt->hasAggFuncs = true; pCxt->pCurrStmt->hasAggFuncs = true;
pCxt->pCurrStmt->isTimeOrderQuery = false;
if (isCountStar(pFunc)) { if (isCountStar(pFunc)) {
pCxt->errCode = rewriteCountStar(pCxt, pFunc); pCxt->errCode = rewriteCountStar(pCxt, pFunc);
} }
......
...@@ -167,6 +167,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { ...@@ -167,6 +167,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_NOT_ALLOWED_FUNC: case TSDB_CODE_PAR_NOT_ALLOWED_FUNC:
return "Some functions are allowed only in the SELECT list of a query. " return "Some functions are allowed only in the SELECT list of a query. "
"And, cannot be mixed with other non scalar functions or columns."; "And, cannot be mixed with other non scalar functions or columns.";
case TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY:
return "Window query not supported, since the result of subquery not include valid timestamp column";
case TSDB_CODE_OUT_OF_MEMORY: case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory"; return "Out of memory";
default: default:
...@@ -365,8 +367,8 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p ...@@ -365,8 +367,8 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p
if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) { if (keyLen == 0 || taosHashGet(keyHash, jsonKey, keyLen) != NULL) {
continue; continue;
} }
// key: keyLen + VARSTR_HEADER_SIZE, value type: CHAR_BYTES, value reserved: LONG_BYTES // key: keyLen + VARSTR_HEADER_SIZE, value type: CHAR_BYTES, value reserved: DOUBLE_BYTES
tagKV = taosMemoryCalloc(keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + LONG_BYTES, 1); tagKV = taosMemoryCalloc(keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + DOUBLE_BYTES, 1);
if (!tagKV) { if (!tagKV) {
retCode = TSDB_CODE_TSC_OUT_OF_MEMORY; retCode = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto end; goto end;
...@@ -411,13 +413,9 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p ...@@ -411,13 +413,9 @@ int parseJsontoTagData(const char* json, SKVRowBuilder* kvRowBuilder, SMsgBuf* p
} }
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE); char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE);
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES); char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES);
*valueType = *valueType = TSDB_DATA_TYPE_DOUBLE;
(item->valuedouble - (int64_t)(item->valuedouble) == 0) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_DOUBLE; *((double*)valueData) = item->valuedouble;
if (*valueType == TSDB_DATA_TYPE_DOUBLE) tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + DOUBLE_BYTES);
*((double*)valueData) = item->valuedouble;
else if (*valueType == TSDB_DATA_TYPE_BIGINT)
*((int64_t*)valueData) = item->valueint;
tdAddColToKVRow(kvRowBuilder, jsonIndex++, tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES + LONG_BYTES);
} else if (item->type == cJSON_True || item->type == cJSON_False) { } else if (item->type == cJSON_True || item->type == cJSON_False) {
char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE); char* valueType = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE);
char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES); char* valueData = POINTER_SHIFT(tagKV, keyLen + VARSTR_HEADER_SIZE + CHAR_BYTES);
......
...@@ -125,8 +125,6 @@ TEST_F(ParserSelectTest, nonstdFunc) { ...@@ -125,8 +125,6 @@ TEST_F(ParserSelectTest, nonstdFunc) {
useDb("root", "test"); useDb("root", "test");
run("SELECT DIFF(c1) FROM t1"); run("SELECT DIFF(c1) FROM t1");
// run("SELECT DIFF(c1) FROM t1 INTERVAL(10s)");
} }
TEST_F(ParserSelectTest, nonstdFuncSemanticCheck) { TEST_F(ParserSelectTest, nonstdFuncSemanticCheck) {
...@@ -139,12 +137,13 @@ TEST_F(ParserSelectTest, nonstdFuncSemanticCheck) { ...@@ -139,12 +137,13 @@ TEST_F(ParserSelectTest, nonstdFuncSemanticCheck) {
run("SELECT DIFF(c1), count(*) FROM t1", TSDB_CODE_PAR_NOT_ALLOWED_FUNC, PARSER_STAGE_TRANSLATE); run("SELECT DIFF(c1), count(*) FROM t1", TSDB_CODE_PAR_NOT_ALLOWED_FUNC, PARSER_STAGE_TRANSLATE);
run("SELECT DIFF(c1), CSUM(c1) FROM t1", TSDB_CODE_PAR_NOT_ALLOWED_FUNC, PARSER_STAGE_TRANSLATE); run("SELECT DIFF(c1), CSUM(c1) FROM t1", TSDB_CODE_PAR_NOT_ALLOWED_FUNC, PARSER_STAGE_TRANSLATE);
// run("SELECT DIFF(c1) FROM t1 INTERVAL(10s)");
} }
TEST_F(ParserSelectTest, clause) { TEST_F(ParserSelectTest, groupBy) {
useDb("root", "test"); useDb("root", "test");
// GROUP BY clause
run("SELECT COUNT(*) cnt FROM t1 WHERE c1 > 0"); run("SELECT COUNT(*) cnt FROM t1 WHERE c1 > 0");
run("SELECT COUNT(*), c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c2"); run("SELECT COUNT(*), c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c2");
...@@ -154,13 +153,19 @@ TEST_F(ParserSelectTest, clause) { ...@@ -154,13 +153,19 @@ TEST_F(ParserSelectTest, clause) {
run("SELECT COUNT(*), c1, c2 + 10, c1 + c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c2, c1"); run("SELECT COUNT(*), c1, c2 + 10, c1 + c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c2, c1");
run("SELECT COUNT(*), c1 + 10, c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c1 + 10, c2"); run("SELECT COUNT(*), c1 + 10, c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c1 + 10, c2");
}
TEST_F(ParserSelectTest, orderBy) {
useDb("root", "test");
// order by clause
run("SELECT COUNT(*) cnt FROM t1 WHERE c1 > 0 GROUP BY c2 order by cnt"); run("SELECT COUNT(*) cnt FROM t1 WHERE c1 > 0 GROUP BY c2 order by cnt");
run("SELECT COUNT(*) cnt FROM t1 WHERE c1 > 0 GROUP BY c2 order by 1"); run("SELECT COUNT(*) cnt FROM t1 WHERE c1 > 0 GROUP BY c2 order by 1");
}
TEST_F(ParserSelectTest, distinct) {
useDb("root", "test");
// distinct clause
// run("SELECT distinct c1, c2 FROM t1 WHERE c1 > 0 order by c1"); // run("SELECT distinct c1, c2 FROM t1 WHERE c1 > 0 order by c1");
// run("SELECT distinct c1 + 10, c2 FROM t1 WHERE c1 > 0 order by c1 + 10, c2"); // run("SELECT distinct c1 + 10, c2 FROM t1 WHERE c1 > 0 order by c1 + 10, c2");
...@@ -194,6 +199,25 @@ TEST_F(ParserSelectTest, intervalSemanticCheck) { ...@@ -194,6 +199,25 @@ TEST_F(ParserSelectTest, intervalSemanticCheck) {
PARSER_STAGE_TRANSLATE); PARSER_STAGE_TRANSLATE);
} }
TEST_F(ParserSelectTest, subquery) {
useDb("root", "test");
run("SELECT SUM(a) FROM (SELECT MAX(c1) a, ts FROM st1s1 INTERVAL(1m)) INTERVAL(1n)");
run("SELECT SUM(a) FROM (SELECT MAX(c1) a, _wstartts FROM st1s1 INTERVAL(1m)) INTERVAL(1n)");
run("SELECT SUM(a) FROM (SELECT MAX(c1) a, ts FROM st1s1 PARTITION BY TBNAME INTERVAL(1m)) INTERVAL(1n)");
run("SELECT SUM(a) FROM (SELECT MAX(c1) a, _wstartts FROM st1s1 PARTITION BY TBNAME INTERVAL(1m)) INTERVAL(1n)");
}
TEST_F(ParserSelectTest, subquerySemanticError) {
useDb("root", "test");
run("SELECT SUM(a) FROM (SELECT MAX(c1) a FROM st1s1 INTERVAL(1m)) INTERVAL(1n)", TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY,
PARSER_STAGE_TRANSLATE);
}
TEST_F(ParserSelectTest, semanticError) { TEST_F(ParserSelectTest, semanticError) {
useDb("root", "test"); useDb("root", "test");
......
...@@ -899,7 +899,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { ...@@ -899,7 +899,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
} }
int32_t code = 0; int32_t code = 0;
SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst->param}; SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst ? pDst->param : NULL};
// TODO: OPT performance // TODO: OPT performance
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
......
...@@ -922,7 +922,8 @@ static void doReleaseVec(SColumnInfoData* pCol, int32_t type) { ...@@ -922,7 +922,8 @@ static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
} }
} }
char *getJsonValue(char *json, char *key){ //todo char *getJsonValue(char *json, char *key){ //todo
json++; // jump type
int16_t cols = kvRowNCols(json); int16_t cols = kvRowNCols(json);
for (int i = 0; i < cols; ++i) { for (int i = 0; i < cols; ++i) {
SColIdx *pColIdx = kvRowColIdxAt(json, i); SColIdx *pColIdx = kvRowColIdxAt(json, i);
......
...@@ -1035,7 +1035,7 @@ void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){ ...@@ -1035,7 +1035,7 @@ void makeJsonArrow(SSDataBlock **src, SNode **opNode, void *json, char *key){
SNode *pLeft = NULL, *pRight = NULL; SNode *pLeft = NULL, *pRight = NULL;
scltMakeValueNode(&pRight, TSDB_DATA_TYPE_BINARY, keyVar); scltMakeValueNode(&pRight, TSDB_DATA_TYPE_BINARY, keyVar);
scltMakeColumnNode(&pLeft, src, TSDB_DATA_TYPE_JSON, varDataLen(json), 1, json); scltMakeColumnNode(&pLeft, src, TSDB_DATA_TYPE_JSON, kvRowLen(json), 1, json);
scltMakeOpNode(opNode, OP_TYPE_JSON_GET_VALUE, TSDB_DATA_TYPE_JSON, pLeft, pRight); scltMakeOpNode(opNode, OP_TYPE_JSON_GET_VALUE, TSDB_DATA_TYPE_JSON, pLeft, pRight);
} }
...@@ -1088,18 +1088,17 @@ void makeCalculate(void *json, void *key, int32_t rightType, void *rightData, do ...@@ -1088,18 +1088,17 @@ void makeCalculate(void *json, void *key, int32_t rightType, void *rightData, do
}else if(opType == OP_TYPE_ADD || opType == OP_TYPE_SUB || opType == OP_TYPE_MULTI || opType == OP_TYPE_DIV || }else if(opType == OP_TYPE_ADD || opType == OP_TYPE_SUB || opType == OP_TYPE_MULTI || opType == OP_TYPE_DIV ||
opType == OP_TYPE_MOD || opType == OP_TYPE_MINUS){ opType == OP_TYPE_MOD || opType == OP_TYPE_MINUS){
double tmp = *((double *)colDataGetData(column, 0)); printf("1result:%f,except:%f\n", *((double *)colDataGetData(column, 0)), exceptValue);
ASSERT_TRUE(tmp == exceptValue); ASSERT_TRUE(abs(*((double *)colDataGetData(column, 0)) - exceptValue) < 1e-15);
printf("result:%lf\n", tmp);
}else if(opType == OP_TYPE_BIT_AND || opType == OP_TYPE_BIT_OR){ }else if(opType == OP_TYPE_BIT_AND || opType == OP_TYPE_BIT_OR){
printf("2result:%ld,except:%f\n", *((int64_t *)colDataGetData(column, 0)), exceptValue);
ASSERT_EQ(*((int64_t *)colDataGetData(column, 0)), exceptValue); ASSERT_EQ(*((int64_t *)colDataGetData(column, 0)), exceptValue);
printf("result:%ld\n", *((int64_t *)colDataGetData(column, 0)));
}else if(opType == OP_TYPE_GREATER_THAN || opType == OP_TYPE_GREATER_EQUAL || opType == OP_TYPE_LOWER_THAN || }else if(opType == OP_TYPE_GREATER_THAN || opType == OP_TYPE_GREATER_EQUAL || opType == OP_TYPE_LOWER_THAN ||
opType == OP_TYPE_LOWER_EQUAL || opType == OP_TYPE_EQUAL || opType == OP_TYPE_NOT_EQUAL || opType == OP_TYPE_LOWER_EQUAL || opType == OP_TYPE_EQUAL || opType == OP_TYPE_NOT_EQUAL ||
opType == OP_TYPE_IS_NULL || opType == OP_TYPE_IS_NOT_NULL || opType == OP_TYPE_IS_TRUE || opType == OP_TYPE_IS_NULL || opType == OP_TYPE_IS_NOT_NULL || opType == OP_TYPE_IS_TRUE ||
opType == OP_TYPE_LIKE || opType == OP_TYPE_NOT_LIKE || opType == OP_TYPE_MATCH || opType == OP_TYPE_NMATCH){ opType == OP_TYPE_LIKE || opType == OP_TYPE_NOT_LIKE || opType == OP_TYPE_MATCH || opType == OP_TYPE_NMATCH){
printf("3result:%d,except:%f\n", *((bool *)colDataGetData(column, 0)), exceptValue);
ASSERT_EQ(*((bool *)colDataGetData(column, 0)), exceptValue); ASSERT_EQ(*((bool *)colDataGetData(column, 0)), exceptValue);
printf("result:%d\n", *((bool *)colDataGetData(column, 0)));
} }
taosArrayDestroyEx(blockList, scltFreeDataBlock); taosArrayDestroyEx(blockList, scltFreeDataBlock);
...@@ -1114,6 +1113,13 @@ TEST(columnTest, json_column_arith_op) { ...@@ -1114,6 +1113,13 @@ TEST(columnTest, json_column_arith_op) {
tdInitKVRowBuilder(&kvRowBuilder); tdInitKVRowBuilder(&kvRowBuilder);
parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0); parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
char *tmp = (char *)taosMemoryRealloc(row, kvRowLen(row)+1);
if(tmp == NULL){
ASSERT_TRUE(0);
}
memmove(tmp+1, tmp, kvRowLen(tmp));
*tmp = TSDB_DATA_TYPE_JSON;
row = tmp;
const int32_t len = 8; const int32_t len = 8;
EOperatorType op[len] = {OP_TYPE_ADD, OP_TYPE_SUB, OP_TYPE_MULTI, OP_TYPE_DIV, EOperatorType op[len] = {OP_TYPE_ADD, OP_TYPE_SUB, OP_TYPE_MULTI, OP_TYPE_DIV,
...@@ -1166,6 +1172,9 @@ TEST(columnTest, json_column_arith_op) { ...@@ -1166,6 +1172,9 @@ TEST(columnTest, json_column_arith_op) {
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes5[i], op[i]); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes5[i], op[i]);
} }
tdDestroyKVRowBuilder(&kvRowBuilder);
taosMemoryFree(row);
} }
void *prepareNchar(char* rightData){ void *prepareNchar(char* rightData){
...@@ -1186,6 +1195,13 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1186,6 +1195,13 @@ TEST(columnTest, json_column_logic_op) {
tdInitKVRowBuilder(&kvRowBuilder); tdInitKVRowBuilder(&kvRowBuilder);
parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0); parseJsontoTagData(rightv, &kvRowBuilder, NULL, 0);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
char *tmp = (char *)taosMemoryRealloc(row, kvRowLen(row)+1);
if(tmp == NULL){
ASSERT_TRUE(0);
}
memmove(tmp+1, tmp, kvRowLen(tmp));
*tmp = TSDB_DATA_TYPE_JSON;
row = tmp;
const int32_t len = 9; const int32_t len = 9;
const int32_t len1 = 4; const int32_t len1 = 4;
...@@ -1223,7 +1239,7 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1223,7 +1239,7 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json null---------------------\n"); printf("--------------------json null---------------------\n");
key = "k3"; key = "k3";
double eRes2[len+len1] = {DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, true, false, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX}; bool eRes2[len+len1] = {false, false, false, false, false, false, true, false, false, false, false, false, false};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes2[i], op[i]); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes2[i], op[i]);
} }
...@@ -1262,7 +1278,7 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1262,7 +1278,7 @@ TEST(columnTest, json_column_logic_op) {
printf("--------------------json double---------------------\n"); printf("--------------------json double---------------------\n");
key = "k6"; key = "k6";
bool eRes5[len+len1] = {true, false, false, false, false, true, false, true, true, false, false, false, true}; bool eRes5[len+len1] = {true, false, false, false, false, true, false, true, true, false, true, false, true};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes5[i], op[i]); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes5[i], op[i]);
} }
...@@ -1275,7 +1291,7 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1275,7 +1291,7 @@ TEST(columnTest, json_column_logic_op) {
printf("---------------------json not exist--------------------\n"); printf("---------------------json not exist--------------------\n");
key = "k10"; key = "k10";
double eRes10[len+len1] = {DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, true, false, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX, DBL_MAX}; double eRes10[len+len1] = {false, false, false, false, false, false, true, false, false, false, false, false, false};
for(int i = 0; i < len; i++){ for(int i = 0; i < len; i++){
makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes10[i], op[i]); makeCalculate(row, key, TSDB_DATA_TYPE_INT, &input[i], eRes10[i], op[i]);
} }
...@@ -1284,6 +1300,9 @@ TEST(columnTest, json_column_logic_op) { ...@@ -1284,6 +1300,9 @@ TEST(columnTest, json_column_logic_op) {
makeCalculate(row, key, TSDB_DATA_TYPE_NCHAR, rightData, eRes10[i], op[i]); makeCalculate(row, key, TSDB_DATA_TYPE_NCHAR, rightData, eRes10[i], op[i]);
taosMemoryFree(rightData); taosMemoryFree(rightData);
} }
tdDestroyKVRowBuilder(&kvRowBuilder);
taosMemoryFree(row);
} }
TEST(columnTest, smallint_value_add_int_column) { TEST(columnTest, smallint_value_add_int_column) {
......
...@@ -20,135 +20,41 @@ ...@@ -20,135 +20,41 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "cJSON.h"
#include "sync.h" #include "sync.h"
#include "syncTools.h" #include "syncTools.h"
#include "taosdef.h"
#include "tglobal.h"
#include "tlog.h" #include "tlog.h"
#include "ttimer.h" #include "ttimer.h"
#define sFatal(...) \ // clang-format off
{ \ #define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
if (sDebugFlag & DEBUG_FATAL) { \ #define sError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ #define sWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
} \ #define sInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
} #define sDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0)
#define sError(...) \ #define sTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0)
{ \ #define sFatalLong(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
if (sDebugFlag & DEBUG_ERROR) { \ #define sErrorLong(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ #define sWarnLong(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
} \ #define sInfoLong(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLongString("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
} #define sDebugLong(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0)
#define sWarn(...) \ #define sTraceLong(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0)
{ \ // clang-format on
if (sDebugFlag & DEBUG_WARN) { \
taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ typedef struct SyncTimeout SyncTimeout;
} \ typedef struct SyncClientRequest SyncClientRequest;
} typedef struct SyncPing SyncPing;
#define sInfo(...) \ typedef struct SyncPingReply SyncPingReply;
{ \ typedef struct SyncRequestVote SyncRequestVote;
if (sDebugFlag & DEBUG_INFO) { \ typedef struct SyncRequestVoteReply SyncRequestVoteReply;
taosPrintLog("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \ typedef struct SyncAppendEntries SyncAppendEntries;
} \
}
#define sDebug(...) \
{ \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
} \
}
#define sTrace(...) \
{ \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
} \
}
#define sFatalLong(...) \
{ \
if (sDebugFlag & DEBUG_FATAL) { \
taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
} \
}
#define sErrorLong(...) \
{ \
if (sDebugFlag & DEBUG_ERROR) { \
taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \
}
#define sWarnLong(...) \
{ \
if (sDebugFlag & DEBUG_WARN) { \
taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
}
#define sInfoLong(...) \
{ \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLongString("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
}
#define sDebugLong(...) \
{ \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLongString("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
} \
}
#define sTraceLong(...) \
{ \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLongString("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
} \
}
struct SyncTimeout;
typedef struct SyncTimeout SyncTimeout;
struct SyncClientRequest;
typedef struct SyncClientRequest SyncClientRequest;
struct SyncPing;
typedef struct SyncPing SyncPing;
struct SyncPingReply;
typedef struct SyncPingReply SyncPingReply;
struct SyncRequestVote;
typedef struct SyncRequestVote SyncRequestVote;
struct SyncRequestVoteReply;
typedef struct SyncRequestVoteReply SyncRequestVoteReply;
struct SyncAppendEntries;
typedef struct SyncAppendEntries SyncAppendEntries;
struct SyncAppendEntriesReply;
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
typedef struct SSyncEnv SSyncEnv;
struct SSyncEnv; typedef struct SRaftStore SRaftStore;
typedef struct SSyncEnv SSyncEnv; typedef struct SVotesGranted SVotesGranted;
typedef struct SVotesRespond SVotesRespond;
struct SRaftStore; typedef struct SSyncIndexMgr SSyncIndexMgr;
typedef struct SRaftStore SRaftStore; typedef struct SRaftCfg SRaftCfg;
typedef struct SSyncRespMgr SSyncRespMgr;
struct SVotesGranted;
typedef struct SVotesGranted SVotesGranted;
struct SVotesRespond;
typedef struct SVotesRespond SVotesRespond;
struct SSyncIndexMgr;
typedef struct SSyncIndexMgr SSyncIndexMgr;
struct SRaftCfg;
typedef struct SRaftCfg SRaftCfg;
struct SSyncRespMgr;
typedef struct SSyncRespMgr SSyncRespMgr;
typedef struct SSyncNode { typedef struct SSyncNode {
// init by SSyncInfo // init by SSyncInfo
......
...@@ -674,10 +674,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp ...@@ -674,10 +674,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
SEpSet epSet; SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet); syncUtilraftId2EpSet(destRaftId, &epSet);
if (pSyncNode->FpSendMsg != NULL) { if (pSyncNode->FpSendMsg != NULL) {
pMsg->info.noResp = 1;
// htonl // htonl
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg); pSyncNode->FpSendMsg(&epSet, pMsg);
} else { } else {
sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL"); sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
...@@ -689,10 +689,10 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S ...@@ -689,10 +689,10 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
SEpSet epSet; SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet); syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
if (pSyncNode->FpSendMsg != NULL) { if (pSyncNode->FpSendMsg != NULL) {
pMsg->info.noResp = 1;
// htonl // htonl
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg); pSyncNode->FpSendMsg(&epSet, pMsg);
} else { } else {
sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL"); sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
......
...@@ -97,11 +97,12 @@ int main(int argc, char** argv) { ...@@ -97,11 +97,12 @@ int main(int argc, char** argv) {
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
SyncPingReply* pSyncMsg = SyncPingReply* pSyncMsg =
syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncIOSendMsgTest"); syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncIOSendMsgTest");
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
SEpSet epSet; SEpSet epSet;
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
rpcMsg.info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, &rpcMsg); pSyncNode->FpSendMsg(&epSet, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "os.h" #include "os.h"
#include "tdb.h" #include "tdb.h"
#include <shared_mutex>
#include <string> #include <string>
#include <thread> #include <thread>
#include <vector> #include <vector>
...@@ -118,7 +119,7 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in ...@@ -118,7 +119,7 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in
return cret; return cret;
} }
TEST(tdb_test, simple_insert1) { TEST(tdb_test, DISABLED_simple_insert1) {
int ret; int ret;
TDB *pEnv; TDB *pEnv;
TTB *pDb; TTB *pDb;
...@@ -238,7 +239,7 @@ TEST(tdb_test, simple_insert1) { ...@@ -238,7 +239,7 @@ TEST(tdb_test, simple_insert1) {
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
} }
TEST(tdb_test, simple_insert2) { TEST(tdb_test, DISABLED_simple_insert2) {
int ret; int ret;
TDB *pEnv; TDB *pEnv;
TTB *pDb; TTB *pDb;
...@@ -325,7 +326,7 @@ TEST(tdb_test, simple_insert2) { ...@@ -325,7 +326,7 @@ TEST(tdb_test, simple_insert2) {
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
} }
TEST(tdb_test, simple_delete1) { TEST(tdb_test, DISABLED_simple_delete1) {
int ret; int ret;
TTB *pDb; TTB *pDb;
char key[128]; char key[128];
...@@ -420,7 +421,7 @@ TEST(tdb_test, simple_delete1) { ...@@ -420,7 +421,7 @@ TEST(tdb_test, simple_delete1) {
tdbClose(pEnv); tdbClose(pEnv);
} }
TEST(tdb_test, simple_upsert1) { TEST(tdb_test, DISABLED_simple_upsert1) {
int ret; int ret;
TDB *pEnv; TDB *pEnv;
TTB *pDb; TTB *pDb;
...@@ -485,12 +486,12 @@ TEST(tdb_test, simple_upsert1) { ...@@ -485,12 +486,12 @@ TEST(tdb_test, simple_upsert1) {
tdbClose(pEnv); tdbClose(pEnv);
} }
TEST(tdb_test, multi_thread_query) { TEST(tdb_test, DISABLED_multi_thread_query) {
int ret; int ret;
TDB *pEnv; TDB *pEnv;
TTB *pDb; TTB *pDb;
tdb_cmpr_fn_t compFunc; tdb_cmpr_fn_t compFunc;
int nData = 20000; int nData = 100000;
TXN txn; TXN txn;
taosRemoveDir("tdb"); taosRemoveDir("tdb");
...@@ -597,4 +598,132 @@ TEST(tdb_test, multi_thread_query) { ...@@ -597,4 +598,132 @@ TEST(tdb_test, multi_thread_query) {
// Close Env // Close Env
ret = tdbClose(pEnv); ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
}
TEST(tdb_test, multi_thread1) {
#if 0
int ret;
TDB *pDb;
TTB *pTb;
tdb_cmpr_fn_t compFunc;
int nData = 10000000;
TXN txn;
std::shared_timed_mutex mutex;
taosRemoveDir("tdb");
// Open Env
ret = tdbOpen("tdb", 512, 1, &pDb);
GTEST_ASSERT_EQ(ret, 0);
ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb);
GTEST_ASSERT_EQ(ret, 0);
auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) {
TXN txn = {0};
char key[128];
char val[128];
SPoolMem *pPool = openPool();
txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED;
txn.txnId = -1;
txn.xMalloc = poolMalloc;
txn.xFree = poolFree;
txn.xArg = pPool;
tdbBegin(pDb, &txn);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
{
std::lock_guard<std::shared_timed_mutex> wmutex(*mu);
int ret = tdbTbInsert(pTb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
if (pPool->size > 1024 * 1024) {
tdbCommit(pDb, &txn);
clearPool(pPool);
tdbBegin(pDb, &txn);
}
}
tdbCommit(pDb, &txn);
closePool(pPool);
*stop = 1;
};
auto query = [](TTB *pTb, int *stop, std::shared_timed_mutex *mu) {
TBC *pDBC;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int ret;
TXN txn;
SPoolMem *pPool = openPool();
txn.flags = 0;
txn.txnId = 0;
txn.xMalloc = poolMalloc;
txn.xFree = poolFree;
txn.xArg = pPool;
for (;;) {
if (*stop) break;
clearPool(pPool);
int count = 0;
{
std::shared_lock<std::shared_timed_mutex> rMutex(*mu);
ret = tdbTbcOpen(pTb, &pDBC, &txn);
GTEST_ASSERT_EQ(ret, 0);
tdbTbcMoveToFirst(pDBC);
for (;;) {
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
count++;
}
std::cout << count << std::endl;
tdbTbcClose(pDBC);
}
usleep(500000);
}
closePool(pPool);
tdbFree(pKey);
tdbFree(pVal);
};
std::vector<std::thread> threads;
int nThreads = 10;
int stop = 0;
for (int i = 0; i < nThreads; i++) {
if (i == 0) {
threads.push_back(std::thread(insert, pDb, pTb, nData, &stop, &mutex));
} else {
threads.push_back(std::thread(query, pTb, &stop, &mutex));
}
}
for (auto &th : threads) {
th.join();
}
// Close a database
tdbTbClose(pTb);
// Close Env
ret = tdbClose(pDb);
GTEST_ASSERT_EQ(ret, 0);
#endif
} }
\ No newline at end of file
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ========== prepare stb and ctb
sql create database db vgroups 1
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 float, t3 binary(16)) comment "abd"
sql create table db.ctb using db.stb tags(1, 2, "3")
sql insert into db.ctb values(now, 1, "2")
sql show db.stables
if $rows != 1 then
return -1
endi
if $data[0][0] != stb then
return -1
endi
if $data[0][1] != db then
return -1
endi
if $data[0][3] != 3 then
return -1
endi
if $data[0][4] != 3 then
return -1
endi
if $data[0][6] != abd then
return -1
endi
sql show db.tables
if $rows != 1 then
return -1
endi
if $data[0][0] != ctb then
return -1
endi
if $data[0][1] != db then
return -1
endi
if $data[0][3] != 3 then
return -1
endi
if $data[0][4] != stb then
return -1
endi
if $data[0][6] != 2 then
return -1
endi
if $data[0][9] != CHILD_TABLE then
return -1
endi
sql select * from db.stb
if $rows != 1 then
return -1
endi
if $data[0][1] != 1 then
return -1
endi
if $data[0][2] != 2 then
return -1
endi
if $data[0][3] != 1 then
return -1
endi
print ========== add column
sql alter table db.stb add column c3 int
sql alter table db.stb add column c4 bigint
sql alter table db.stb add column c5 binary(12)
sql show db.stables
if $data[0][3] != 6 then
return -1
endi
sql show db.tables
if $data[0][3] != 6 then
return -1
endi
sql select * from db.stb
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
return -1
endi
if $data[0][1] != 1 then
return -1
endi
if $data[0][2] != 2 then
return -1
endi
if $data[0][3] != NULL then
return -1
endi
if $data[0][4] != NULL then
return -1
endi
if $data[0][5] != NULL then
return -1
endi
if $data[0][6] != 1 then
return -1
endi
#run user/pass_alter.sim
#run user/basic1.sim
#run user/privilege2.sim
#run user/user_len.sim
#run user/privilege1.sim
#run user/pass_len.sim
#run tstream/basic1.sim
#run tstream/basic0.sim
#run table/basic1.sim
#run trans/create_db.sim
#run stable/alter1.sim
#run stable/vnode3.sim
#run stable/metrics.sim
#run stable/show.sim
#run stable/values.sim
#run stable/dnode3.sim
#run stable/refcount.sim
#run stable/disk.sim
#run db/basic1.sim
#run db/basic3.sim
#run db/basic7.sim
#run db/basic6.sim
#run db/create_all_options.sim
#run db/basic2.sim
#run db/error1.sim
#run db/taosdlog.sim
#run db/alter_option.sim
#run mnode/basic1.sim
#run parser/fourArithmetic-basic.sim
#run parser/groupby-basic.sim
#run snode/basic1.sim
#run query/time_process.sim
#run query/stddev.sim
#run query/interval-offset.sim
#run query/charScalarFunction.sim
#run query/complex_select.sim
#run query/explain.sim
#run query/crash_sql.sim
#run query/diff.sim
#run query/complex_limit.sim
#run query/complex_having.sim
#run query/udf.sim
#run query/complex_group.sim
#run query/interval.sim
#run query/session.sim
print ========> dead lock failed when 2 rows in outputCapacity
run query/scalarFunction.sim
run query/scalarNull.sim
run query/complex_where.sim
run tmq/basic1.sim
run tmq/basic4.sim
run tmq/basic1Of2Cons.sim
run tmq/prepareBasicEnv-1vgrp.sim
run tmq/topic.sim
run tmq/basic4Of2Cons.sim
run tmq/prepareBasicEnv-4vgrp.sim
run tmq/basic3.sim
run tmq/basic2Of2Cons.sim
run tmq/basic2.sim
run tmq/basic3Of2Cons.sim
run tmq/basic2Of2ConsOverlap.sim
run tmq/clearConsume.sim
run qnode/basic1.sim
run dnode/basic1.sim
run show/basic.sim
run insert/basic1.sim
run insert/basic0.sim
run insert/backquote.sim
run insert/null.sim
run sync/oneReplica1VgElectWithInsert.sim
run sync/threeReplica1VgElect.sim
run sync/oneReplica1VgElect.sim
run sync/insertDataByRunBack.sim
run sync/threeReplica1VgElectWihtInsert.sim
run sma/tsmaCreateInsertData.sim
run sma/rsmaCreateInsertQuery.sim
run valgrind/checkError.sim
run bnode/basic1.sim
...@@ -1079,6 +1079,291 @@ class TDTestCase: ...@@ -1079,6 +1079,291 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 10 end ...... ") tdLog.printNoPrefix("======== test case 10 end ...... ")
def tmqCase11(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 11: ")
self.initConsumerTable()
# create and start thread
parameterDict = {'cfg': '', \
'actionType': 0, \
'dbName': 'db11', \
'dropFlag': 1, \
'vgroups': 4, \
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,\
parameterDict["dbName"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\
parameterDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("start to check consume result")
expectRows = 1
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
self.initConsumerInfoTable()
consumerId = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor")
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("again check consume result")
expectRows = 2
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 11 end ...... ")
def tmqCase12(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 12: ")
self.initConsumerTable()
# create and start thread
parameterDict = {'cfg': '', \
'actionType': 0, \
'dbName': 'db12', \
'dropFlag': 1, \
'vgroups': 4, \
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,\
parameterDict["dbName"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\
parameterDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("start to check consume result")
expectRows = 1
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
self.initConsumerInfoTable()
consumerId = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor")
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("again check consume result")
expectRows = 2
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 12 end ...... ")
def tmqCase13(self, cfgPath, buildPath):
tdLog.printNoPrefix("======== test case 13: ")
self.initConsumerTable()
# create and start thread
parameterDict = {'cfg': '', \
'actionType': 0, \
'dbName': 'db13', \
'dropFlag': 1, \
'vgroups': 4, \
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 10000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,\
parameterDict["dbName"],\
parameterDict["stbName"],\
parameterDict["ctbNum"],\
parameterDict["rowsPerTbl"],\
parameterDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt/4,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 5
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("start to check consume result")
expectRows = 1
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt/4:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt/4))
tdLog.exit("tmq consume rows error!")
self.initConsumerInfoTable()
consumerId = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
self.insertConsumerInfo(consumerId, expectrowcnt/2,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor")
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("again check consume result")
expectRows = 2
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt*(1/2+1/4):
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*(1/2+1/4)))
tdLog.exit("tmq consume rows error!")
self.initConsumerInfoTable()
consumerId = 2
ifManualCommit = 1
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor")
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("again check consume result")
expectRows = 3
resultList = self.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 13 end ...... ")
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
...@@ -1099,8 +1384,10 @@ class TDTestCase: ...@@ -1099,8 +1384,10 @@ class TDTestCase:
self.tmqCase7(cfgPath, buildPath) self.tmqCase7(cfgPath, buildPath)
self.tmqCase8(cfgPath, buildPath) self.tmqCase8(cfgPath, buildPath)
self.tmqCase9(cfgPath, buildPath) self.tmqCase9(cfgPath, buildPath)
self.tmqCase10(cfgPath, buildPath) self.tmqCase10(cfgPath, buildPath)
self.tmqCase11(cfgPath, buildPath)
self.tmqCase12(cfgPath, buildPath)
self.tmqCase13(cfgPath, buildPath)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -315,6 +315,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i ...@@ -315,6 +315,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
memcpy(buf, val, length); memcpy(buf, val, length);
buf[length] = 0; buf[length] = 0;
taosFprintfFile(pFile, "\'%s\'", buf); taosFprintfFile(pFile, "\'%s\'", buf);
...@@ -384,19 +385,25 @@ void shellPrintNChar(const char *str, int32_t length, int32_t width) { ...@@ -384,19 +385,25 @@ void shellPrintNChar(const char *str, int32_t length, int32_t width) {
while (pos < length) { while (pos < length) {
TdWchar wc; TdWchar wc;
int32_t bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX); int32_t bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
if (bytes == 0) { if (bytes <= 0) {
break; break;
} }
pos += bytes;
if (pos > length) { if (pos + bytes > length) {
break; break;
} }
int w = 0;
#ifdef WINDOWS #ifdef WINDOWS
int32_t w = bytes; w = bytes;
#else #else
int32_t w = taosWcharWidth(wc); if(*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r'){
w = bytes;
}else{
w = taosWcharWidth(wc);
}
#endif #endif
pos += bytes;
if (w <= 0) { if (w <= 0) {
continue; continue;
} }
...@@ -496,6 +503,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t ...@@ -496,6 +503,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
shellPrintNChar(val, length, width); shellPrintNChar(val, length, width);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
...@@ -604,7 +612,6 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) { ...@@ -604,7 +612,6 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
return TMAX(25, width); return TMAX(25, width);
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
if (field->bytes > shell.args.displayWidth) { if (field->bytes > shell.args.displayWidth) {
return TMAX(shell.args.displayWidth, width); return TMAX(shell.args.displayWidth, width);
...@@ -612,7 +619,8 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) { ...@@ -612,7 +619,8 @@ int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision) {
return TMAX(field->bytes, width); return TMAX(field->bytes, width);
} }
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON: {
int16_t bytes = field->bytes * TSDB_NCHAR_SIZE; int16_t bytes = field->bytes * TSDB_NCHAR_SIZE;
if (bytes > shell.args.displayWidth) { if (bytes > shell.args.displayWidth) {
return TMAX(shell.args.displayWidth, width); return TMAX(shell.args.displayWidth, width);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册