提交 36dc9da4 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feature/qnode

......@@ -5,6 +5,7 @@ AccessModifierOffset: -1
AlignAfterOpenBracket: Align
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: true
AlignConsecutiveMacros: true
AlignEscapedNewlinesLeft: true
AlignOperands: true
AlignTrailingComments: true
......
......@@ -28,7 +28,7 @@ int32_t init_env() {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -42,25 +42,33 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
pRes =
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)");
pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");
pRes = taos_query(pConn, "create table if not exists ct1 using st1 tags(2000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
return -1;
}
pRes = taos_query(pConn, "create table if not exists ct3 using st1 tags(3000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
......@@ -82,12 +90,40 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
pRes = taos_query(pConn, "create topic test_stb_topic_1 as select * from tu1");
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1 from ct1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
#if 0
pRes = taos_query(pConn, "insert into tu1 values(now, 1, 1.0, 'bi1')");
if (taos_errno(pRes) != 0) {
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu1 values(now+1d, 1, 1.0, 'bi1')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu2 values(now, 2, 2.0, 'bi2')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tu2 values(now+1d, 2, 2.0, 'bi2')");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
#endif
taos_close(pConn);
return 0;
}
......@@ -115,7 +151,7 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_list_append(topic_list, "topic_ctb_column");
return topic_list;
}
......@@ -215,8 +251,8 @@ int main(int argc, char* argv[]) {
if (argc > 1) {
printf("env init\n");
code = init_env();
create_topic();
}
create_topic();
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
/*perf_loop(tmq, topic_list);*/
......
......@@ -20,7 +20,7 @@
#include "taos.h"
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 7010);
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
......@@ -65,7 +65,7 @@ int32_t init_env() {
int32_t create_stream() {
printf("create stream\n");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 7010);
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
......
......@@ -37,6 +37,14 @@ enum {
TMQ_MSG_TYPE__EP_RSP,
};
enum {
STREAM_TRIGGER__AT_ONCE = 1,
STREAM_TRIGGER__WINDOW_CLOSE,
STREAM_TRIGGER__BY_COUNT,
STREAM_TRIGGER__BY_BATCH_COUNT,
STREAM_TRIGGER__BY_EVENT_TIME,
};
typedef struct {
uint32_t numOfTables;
SArray* pGroupList;
......@@ -54,13 +62,16 @@ typedef struct SColumnDataAgg {
} SColumnDataAgg;
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rows;
int32_t rowSize;
int16_t numOfCols;
int16_t hasVarCol;
union {int64_t uid; int64_t blockId;};
int64_t groupId; // no need to serialize
STimeWindow window;
int32_t rows;
int32_t rowSize;
int16_t numOfCols;
int16_t hasVarCol;
union {
int64_t uid;
int64_t blockId;
};
int64_t groupId; // no need to serialize
} SDataBlockInfo;
typedef struct SSDataBlock {
......@@ -93,7 +104,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
int32_t tEncodeDataBlocks(void** buf, const SArray* blocks);
void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData) ;
void colDataDestroy(SColumnInfoData* pColData);
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols,
......@@ -239,7 +250,7 @@ typedef struct SSessionWindow {
SColumn col;
} SSessionWindow;
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
......
......@@ -47,21 +47,21 @@ extern "C" {
#define TD_VTYPE_NONE 0x0U // none or unknown/undefined
#define TD_VTYPE_NULL 0x01U // null val
#define TD_VTYPE_NORM 0x02U // normal val: not none, not null
#define TD_VTYPE_MAX 0x03U //
#define TD_VTYPE_MAX 0x03U //
#define TD_VTYPE_NONE_BYTE 0x0U
#define TD_VTYPE_NULL_BYTE 0x55U
#define TD_VTYPE_NORM_BYTE 0xAAU
#define TD_ROWS_ALL_NORM 0x01U
#define TD_ROWS_ALL_NORM 0x01U
#define TD_ROWS_NULL_NORM 0x0U
#define TD_COL_ROWS_NORM(c) ((c)->bitmap == TD_ROWS_ALL_NORM) // all rows of SDataCol/SBlockCol is NORM
#define TD_COL_ROWS_NORM(c) ((c)->bitmap == TD_ROWS_ALL_NORM) // all rows of SDataCol/SBlockCol is NORM
#define TD_SET_COL_ROWS_BTIMAP(c, v) ((c)->bitmap = (v))
#define TD_SET_COL_ROWS_NORM(c) TD_SET_COL_ROWS_BTIMAP((c), TD_ROWS_ALL_NORM)
#define TD_SET_COL_ROWS_MISC(c) TD_SET_COL_ROWS_BTIMAP((c), TD_ROWS_NULL_NORM)
#define TD_SET_COL_ROWS_NORM(c) TD_SET_COL_ROWS_BTIMAP((c), TD_ROWS_ALL_NORM)
#define TD_SET_COL_ROWS_MISC(c) TD_SET_COL_ROWS_BTIMAP((c), TD_ROWS_NULL_NORM)
#define KvConvertRatio (0.9f)
#define KvConvertRatio (0.9f)
#define isSelectKVRow(klen, tlen) ((klen) < ((tlen)*KvConvertRatio))
#ifdef TD_SUPPORT_BITMAP
......@@ -98,7 +98,7 @@ typedef void *SRow;
typedef struct {
TDRowValT valType;
void * val;
void *val;
} SCellVal;
typedef struct {
......@@ -158,43 +158,43 @@ typedef struct {
int16_t nBitmaps;
int16_t nBoundBitmaps;
int32_t offset;
void * pBitmap;
void * pOffset;
void *pBitmap;
void *pOffset;
int32_t extendedRowSize;
} SRowBuilder;
#define TD_ROW_HEAD_LEN (sizeof(STSRow))
#define TD_ROW_HEAD_LEN (sizeof(STSRow))
#define TD_ROW_NCOLS_LEN (sizeof(col_id_t))
#define TD_ROW_INFO(r) ((r)->info)
#define TD_ROW_TYPE(r) ((r)->type)
#define TD_ROW_DELETE(r) ((r)->del)
#define TD_ROW_ENDIAN(r) ((r)->endian)
#define TD_ROW_SVER(r) ((r)->sver)
#define TD_ROW_NCOLS(r) ((r)->data) // only valid for SKvRow
#define TD_ROW_DATA(r) ((r)->data)
#define TD_ROW_LEN(r) ((r)->len)
#define TD_ROW_KEY(r) ((r)->ts)
#define TD_ROW_INFO(r) ((r)->info)
#define TD_ROW_TYPE(r) ((r)->type)
#define TD_ROW_DELETE(r) ((r)->del)
#define TD_ROW_ENDIAN(r) ((r)->endian)
#define TD_ROW_SVER(r) ((r)->sver)
#define TD_ROW_NCOLS(r) ((r)->data) // only valid for SKvRow
#define TD_ROW_DATA(r) ((r)->data)
#define TD_ROW_LEN(r) ((r)->len)
#define TD_ROW_KEY(r) ((r)->ts)
#define TD_ROW_KEY_ADDR(r) (r)
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and
// (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) (schemaTLen(s) + TD_ROW_HEAD_LEN)
#define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i))
#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t))
#define TD_ROW_SET_DELETE(r) (TD_ROW_DELETE(r) = 1)
#define TD_ROW_SET_SVER(r, v) (TD_ROW_SVER(r) = (v))
#define TD_ROW_SET_LEN(r, l) (TD_ROW_LEN(r) = (l))
#define TD_ROW_SET_INFO(r, i) (TD_ROW_INFO(r) = (i))
#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t))
#define TD_ROW_SET_DELETE(r) (TD_ROW_DELETE(r) = 1)
#define TD_ROW_SET_SVER(r, v) (TD_ROW_SVER(r) = (v))
#define TD_ROW_SET_LEN(r, l) (TD_ROW_LEN(r) = (l))
#define TD_ROW_SET_NCOLS(r, n) (*(col_id_t *)TD_ROW_NCOLS(r) = (n))
#define TD_ROW_IS_DELETED(r) (TD_ROW_DELETE(r) == 1)
#define TD_IS_TP_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_TP)
#define TD_IS_KV_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_KV)
#define TD_IS_TP_ROW_T(t) ((t) == TD_ROW_TP)
#define TD_IS_KV_ROW_T(t) ((t) == TD_ROW_KV)
#define TD_IS_TP_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_TP)
#define TD_IS_KV_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_KV)
#define TD_IS_TP_ROW_T(t) ((t) == TD_ROW_TP)
#define TD_IS_KV_ROW_T(t) ((t) == TD_ROW_KV)
#define TD_BOOL_STR(b) ((b) ? "true" : "false")
#define TD_BOOL_STR(b) ((b) ? "true" : "false")
#define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert))
#define TD_ROW_COL_IDX(r) POINTER_SHIFT(TD_ROW_DATA(r), sizeof(col_id_t))
......@@ -275,7 +275,7 @@ static FORCE_INLINE int32_t tdSetBitmapValType(void *pBitmap, int16_t colIdx, TD
}
int16_t nBytes = colIdx / TD_VTYPE_PARTS;
int16_t nOffset = colIdx & TD_VTYPE_OPTR;
char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
char *pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
switch (nOffset) {
case 0:
*pDestByte = ((*pDestByte) & 0x3F) | (valType << 6);
......@@ -313,7 +313,7 @@ static FORCE_INLINE int32_t tdGetBitmapValType(void *pBitmap, int16_t colIdx, TD
}
int16_t nBytes = colIdx / TD_VTYPE_PARTS;
int16_t nOffset = colIdx & TD_VTYPE_OPTR;
char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
char *pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
switch (nOffset) {
case 0:
*pValType = (((*pDestByte) & 0xC0) >> 6);
......@@ -620,7 +620,7 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowVa
if (tdValIsNorm(valType, val, colType)) {
// ts key stored in STSRow.ts
SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(row), offset);
char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
char *ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
pColIdx->colId = colId;
pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN
......@@ -638,7 +638,7 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowVa
// NULL/None value
else {
SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(TD_ROW_COL_IDX(row), offset);
char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
char *ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
pColIdx->colId = colId;
pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN
const void *nullVal = getNullValue(colType);
......@@ -775,8 +775,8 @@ static FORCE_INLINE int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, v
typedef struct {
STSchema *pSchema;
STSRow * pRow;
void * pBitmap;
STSRow *pRow;
void *pBitmap;
uint32_t offset;
col_id_t maxColId;
col_id_t colIdx; // [PRIMARYKEY_TIMESTAMP_COL_ID, nSchemaCols], PRIMARYKEY_TIMESTAMP_COL_ID equals 1
......@@ -881,7 +881,7 @@ static FORCE_INLINE bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colTy
// internal
static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx,
SCellVal *pVal) {
STSRow * pRow = pIter->pRow;
STSRow *pRow = pIter->pRow;
SKvRowIdx *pKvIdx = NULL;
bool colFound = false;
col_id_t kvNCols = tdRowGetNCols(pRow);
......@@ -1076,7 +1076,7 @@ typedef struct {
typedef struct {
STSchema *pSchema;
STSRow * pRow;
STSRow *pRow;
} STSRowReader;
typedef struct {
......
......@@ -61,16 +61,16 @@ extern "C" {
} \
}
#define WAL_HEAD_VER 0
#define WAL_HEAD_VER 0
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log"
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_CUR_FAILED 1
......@@ -150,14 +150,15 @@ typedef struct SWal {
} SWal; // WAL HANDLE
typedef struct SWalReadHandle {
SWal *pWal;
TdFilePtr pReadLogTFile;
TdFilePtr pReadIdxTFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int64_t status; // if cursor valid
SWalHead *pHead;
SWal *pWal;
TdFilePtr pReadLogTFile;
TdFilePtr pReadIdxTFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int64_t status; // if cursor valid
TdThreadMutex mutex;
SWalHead *pHead;
} SWalReadHandle;
#pragma pack(pop)
......@@ -191,6 +192,7 @@ int32_t walEndSnapshot(SWal *);
SWalReadHandle *walOpenReadHandle(SWal *);
void walCloseReadHandle(SWalReadHandle *);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead);
// deprecated
#if 0
......
......@@ -181,6 +181,11 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
}
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
// drop table if exists not_exists_table
if (NULL == pQuery->pCmdMsg) {
return TSDB_CODE_SUCCESS;
}
SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
pRequest->type = pMsgInfo->msgType;
pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
......
......@@ -255,6 +255,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
break;
}
msg = NULL;
taosReadAllQitems(tmq->mqueue, tmq->qall);
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
......@@ -787,7 +788,7 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
static bool noPrintSchema;
char pBuf[128];
SMqPollRsp* pRsp = &tmq_message->msg;
int32_t colNum = pRsp->schema->nCols;
int32_t colNum = 2;
if (!noPrintSchema) {
printf("|");
for (int32_t i = 0; i < colNum; i++) {
......@@ -838,6 +839,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) {
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
tsem_post(&tmq->rspSem);
tscWarn("discard rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch);
return 0;
......@@ -886,6 +888,9 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto WRITE_QUEUE_FAIL;
}
tscError("tmq recv poll: vg %d, req offset %ld, rsp offset %ld", pParam->pVg->vgId, pRsp->msg.reqOffset,
pRsp->msg.rspOffset);
pRsp->vg = pParam->pVg;
taosWriteQitem(tmq->mqueue, pRsp);
atomic_add_fetch_32(&tmq->readyRequest, 1);
......@@ -902,6 +907,7 @@ WRITE_QUEUE_FAIL:
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/
/*printf("tmq update ep epoch %d to epoch %d\n", tmq->epoch, epoch);*/
bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
......@@ -932,6 +938,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
for (int32_t k = 0; k < vgNumCur; k++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
/*printf("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey);*/
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
}
break;
......@@ -945,9 +952,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset;
/*printf("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset);*/
if (pOffset != NULL) {
offset = *pOffset;
/*printf("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey);*/
}
/*printf("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset);*/
SMqClientVg clientVg = {
.pollCnt = 0,
.currentOffset = offset,
......@@ -1195,6 +1205,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) {
/*printf("skip vg %d\n", pVg->vgId);*/
continue;
}
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
......@@ -1238,6 +1249,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t transporterId = 0;
/*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1);
/*tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset);*/
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
tmq->pollCnt++;
......
......@@ -53,7 +53,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
tSerializeSStatusReq(pHead, contLen, &req);
taosArrayDestroy(req.pVloads);
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)0x9527};
pMgmt->statusSent = 1;
dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle);
......
......@@ -30,18 +30,13 @@ static struct {
} global = {0};
static void dndStopDnode(int signum, void *info, void *ctx) {
dInfo("signal:%d is received", signum);
dInfo("system signal:%d received", signum);
SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode);
if (pDnode != NULL) {
dndHandleEvent(pDnode, DND_EVENT_STOP);
}
}
static void dndHandleChild(int signum, void *info, void *ctx) {
dInfo("signal:%d is received", signum);
dndHandleEvent(global.pDnode, DND_EVENT_CHILD);
}
static void dndSetSignalHandle() {
taosSetSignal(SIGTERM, dndStopDnode);
taosSetSignal(SIGHUP, dndStopDnode);
......@@ -50,8 +45,8 @@ static void dndSetSignalHandle() {
taosSetSignal(SIGBREAK, dndStopDnode);
if (!tsMultiProcess) {
} else if (global.ntype == DNODE) {
taosSetSignal(SIGCHLD, dndHandleChild);
} else if (global.ntype == DNODE || global.ntype == NODE_MAX) {
taosIgnSignal(SIGCHLD);
} else {
taosKillChildOnParentStopped();
}
......@@ -74,14 +69,14 @@ static int32_t dndParseArgs(int32_t argc, char const *argv[]) {
tstrncpy(global.apolloUrl, argv[++i], PATH_MAX);
} else if (strcmp(argv[i], "-e") == 0) {
tstrncpy(global.envFile, argv[++i], PATH_MAX);
} else if (strcmp(argv[i], "-k") == 0) {
global.generateGrant = true;
} else if (strcmp(argv[i], "-n") == 0) {
global.ntype = atoi(argv[++i]);
if (global.ntype <= DNODE || global.ntype > NODE_MAX) {
printf("'-n' range is [1-5], default is 0\n");
return -1;
}
} else if (strcmp(argv[i], "-k") == 0) {
global.generateGrant = true;
} else if (strcmp(argv[i], "-C") == 0) {
global.dumpConfig = true;
} else if (strcmp(argv[i], "-V") == 0) {
......@@ -139,7 +134,7 @@ static int32_t dndInitLog() {
static void dndSetProcInfo(int32_t argc, char **argv) {
taosSetProcPath(argc, argv);
if (global.ntype != DNODE) {
if (global.ntype != DNODE && global.ntype != NODE_MAX) {
const char *name = dndNodeProcStr(global.ntype);
taosSetProcName(argc, argv, name);
}
......@@ -147,14 +142,14 @@ static void dndSetProcInfo(int32_t argc, char **argv) {
static int32_t dndRunDnode() {
if (dndInit() != 0) {
dError("failed to initialize environment since %s", terrstr());
dError("failed to init environment since %s", terrstr());
return -1;
}
SDnodeOpt option = dndGetOpt();
SDnode *pDnode = dndCreate(&option);
if (pDnode == NULL) {
dError("failed to to create dnode object since %s", terrstr());
dError("failed to to create dnode since %s", terrstr());
return -1;
} else {
global.pDnode = pDnode;
......@@ -184,7 +179,6 @@ int main(int argc, char const *argv[]) {
return -1;
}
dndSetProcInfo(argc, (char **)argv);
if (global.generateGrant) {
dndGenerateGrant();
return 0;
......@@ -212,5 +206,6 @@ int main(int argc, char const *argv[]) {
return 0;
}
dndSetProcInfo(argc, (char **)argv);
return dndRunDnode();
}
......@@ -136,6 +136,7 @@ typedef struct SDnode {
const char *dndNodeLogStr(ENodeType ntype);
const char *dndNodeProcStr(ENodeType ntype);
const char *dndEventStr(EDndEvent ev);
EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndInt.h"
#include "wal.h"
static int8_t once = DND_ENV_INIT;
int32_t dndInit() {
dDebug("start to init dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
terrno = TSDB_CODE_REPEAT_INIT;
dError("failed to init dnode env since %s", terrstr());
return -1;
}
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
SMonCfg monCfg = {0};
monCfg.maxLogs = tsMonitorMaxLogs;
monCfg.port = tsMonitorPort;
monCfg.server = tsMonitorFqdn;
monCfg.comp = tsMonitorComp;
if (monInit(&monCfg) != 0) {
dError("failed to init monitor since %s", terrstr());
return -1;
}
dInfo("dnode env is initialized");
return 0;
}
void dndCleanup() {
dDebug("start to cleanup dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("dnode env is already cleaned up");
return;
}
monCleanup();
walCleanUp();
taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up");
}
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
}
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
void dndSetStatus(SDnode *pDnode, EDndStatus status) {
if (pDnode->status != status) {
dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status));
pDnode->status = status;
}
}
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupReq *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
pStartup->finished = 0;
}
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
}
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
rpcSendResponse(&rpcRsp);
}
......@@ -128,7 +128,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
}
pWrapper->procId = pid;
dInfo("node:%s, run in new process, pid:%d", pWrapper->name, pid);
dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
return 0;
}
......@@ -150,7 +150,7 @@ static SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
}
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode start to run in single process");
dInfo("dnode run in single process");
for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
......@@ -189,7 +189,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
}
static int32_t dndRunInParentProcess(SDnode *pDnode) {
dInfo("dnode start to run in parent process");
dInfo("dnode run in parent process");
SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
if (dndOpenNode(pDWrapper) != 0) {
dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
......@@ -201,12 +201,13 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
int64_t shmsize = 1024 * 1024 * 2; // size will be a configuration item
int32_t shmsize = 1024 * 1024 * 2; // size will be a configuration item
if (taosCreateShm(&pWrapper->shm, shmsize) != 0) {
terrno = TAOS_SYSTEM_ERROR(terrno);
dError("node:%s, failed to create shm size:%" PRId64 " since %s", pWrapper->name, shmsize, terrstr());
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
return -1;
}
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
SProcCfg cfg = dndGenProcCfg(pWrapper);
cfg.isChild = false;
......@@ -262,21 +263,21 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_MAX) continue;
if (pWrapper->procId != 0 && !taosProcExists(pWrapper->procId)) {
dInfo("node:%s, process not exist, pid:%d", pWrapper->name, pWrapper->procId);
if (pWrapper->procId <= 0 || !taosProcExists(pWrapper->procId)) {
dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
dndNewProc(pWrapper, n);
}
taosMsleep(100);
}
taosMsleep(100);
}
return 0;
}
static int32_t dndRunInChildProcess(SDnode *pDnode) {
dInfo("dnode start to run in child process");
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
dInfo("%s run in child process", pWrapper->name);
SMsgCb msgCb = dndCreateMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&msgCb);
......
......@@ -179,7 +179,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
}
}
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == DNODE) {
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
if (pWrapper->shm.id >= 0) {
......@@ -194,10 +194,10 @@ int32_t dndReadShmFile(SDnode *pDnode) {
dError("shmid:%d, failed to attach shm since %s", pWrapper->shm.id, terrstr());
goto _OVER;
}
dDebug("shmid:%d, is attached, size:%d", pWrapper->shm.id, pWrapper->shm.size);
dInfo("node:%s, shmid:%d is attached, size:%d", pWrapper->name, pWrapper->shm.id, pWrapper->shm.size);
}
dDebug("successed to open %s", file);
dDebug("successed to load %s", file);
code = 0;
_OVER:
......
......@@ -15,82 +15,186 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
#include "wal.h"
static int8_t once = DND_ENV_INIT;
static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes;
pDnode->serverPort = pOption->serverPort;
pDnode->dataDir = strdup(pOption->dataDir);
pDnode->localEp = strdup(pOption->localEp);
pDnode->localFqdn = strdup(pOption->localFqdn);
pDnode->firstEp = strdup(pOption->firstEp);
pDnode->secondEp = strdup(pOption->secondEp);
pDnode->disks = pOption->disks;
pDnode->numOfDisks = pOption->numOfDisks;
pDnode->ntype = pOption->ntype;
pDnode->rebootTime = taosGetTimestampMs();
int32_t dndInit() {
dDebug("start to init dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
terrno = TSDB_CODE_REPEAT_INIT;
dError("failed to init dnode env since %s", terrstr());
if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL ||
pDnode->secondEp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
SMonCfg monCfg = {0};
monCfg.maxLogs = tsMonitorMaxLogs;
monCfg.port = tsMonitorPort;
monCfg.server = tsMonitorFqdn;
monCfg.comp = tsMonitorComp;
if (monInit(&monCfg) != 0) {
dError("failed to init monitor since %s", terrstr());
return -1;
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
pDnode->lockfile = dndCheckRunning(pDnode->dataDir);
if (pDnode->lockfile == NULL) {
return -1;
}
}
dInfo("dnode env is initialized");
return 0;
}
void dndCleanup() {
dDebug("start to cleanup dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("dnode env is already cleaned up");
return;
static void dndClearVars(SDnode *pDnode) {
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
taosMemoryFreeClear(pMgmt->path);
}
monCleanup();
walCleanUp();
taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up");
if (pDnode->lockfile != NULL) {
taosUnLockFile(pDnode->lockfile);
taosCloseFile(&pDnode->lockfile);
pDnode->lockfile = NULL;
}
taosMemoryFreeClear(pDnode->localEp);
taosMemoryFreeClear(pDnode->localFqdn);
taosMemoryFreeClear(pDnode->firstEp);
taosMemoryFreeClear(pDnode->secondEp);
taosMemoryFreeClear(pDnode->dataDir);
taosMemoryFree(pDnode);
dDebug("dnode memory is cleared, data:%p", pDnode);
}
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
SDnode *dndCreate(const SDnodeOpt *pOption) {
dDebug("start to create dnode object");
int32_t code = -1;
char path[PATH_MAX] = {0};
SDnode *pDnode = NULL;
pDnode = taosMemoryCalloc(1, sizeof(SDnode));
if (pDnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (dndInitVars(pDnode, pOption) != 0) {
dError("failed to init variables since %s", terrstr());
goto _OVER;
}
dndSetStatus(pDnode, DND_STAT_INIT);
dmGetMgmtFp(&pDnode->wrappers[DNODE]);
mmGetMgmtFp(&pDnode->wrappers[MNODE]);
vmGetMgmtFp(&pDnode->wrappers[VNODES]);
qmGetMgmtFp(&pDnode->wrappers[QNODE]);
smGetMgmtFp(&pDnode->wrappers[SNODE]);
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path);
pWrapper->shm.id = -1;
pWrapper->pDnode = pDnode;
if (pWrapper->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
pWrapper->procType = PROC_SINGLE;
taosInitRWLatch(&pWrapper->latch);
}
if (dndInitMsgHandle(pDnode) != 0) {
dError("failed to msg handles since %s", terrstr());
goto _OVER;
}
if (dndReadShmFile(pDnode) != 0) {
dError("failed to read shm file since %s", terrstr());
goto _OVER;
}
SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]);
tmsgSetDefaultMsgCb(&msgCb);
dInfo("dnode is created, data:%p", pDnode);
code = 0;
_OVER:
if (code != 0 && pDnode) {
dndClearVars(pDnode);
pDnode = NULL;
dError("failed to create dnode since %s", terrstr());
}
return pDnode;
}
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
void dndClose(SDnode *pDnode) {
if (pDnode == NULL) return;
void dndSetStatus(SDnode *pDnode, EDndStatus status) {
if (pDnode->status != status) {
dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status));
pDnode->status = status;
if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
dError("dnode is shutting down, data:%p", pDnode);
return;
}
dInfo("start to close dnode, data:%p", pDnode);
dndSetStatus(pDnode, DND_STAT_STOPPED);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dndCloseNode(pWrapper);
}
dndClearVars(pDnode);
dInfo("dnode is closed, data:%p", pDnode);
}
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
SStartupReq *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
pStartup->finished = 0;
void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
dInfo("dnode receive %s event, data:%p", dndEventStr(event), pDnode);
if (event == DND_EVENT_STOP) {
pDnode->event = event;
}
}
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pRetWrapper = NULL;
}
taosRUnLockLatch(&pWrapper->latch);
return pRetWrapper;
}
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
dndGetStartup(pDnode, pStartup);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0;
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
rpcSendResponse(&rpcRsp);
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
code = -1;
}
taosRUnLockLatch(&pWrapper->latch);
return code;
}
void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return;
taosRLockLatch(&pWrapper->latch);
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosRUnLockLatch(&pWrapper->latch);
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
}
\ No newline at end of file
......@@ -66,7 +66,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
code = (*msgFp)(pWrapper, pMsg);
} else if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle,
dTrace("msg:%p, is created and put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle,
pRpc->ahandle, pMsg->user);
code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ);
} else {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndInt.h"
static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes;
pDnode->serverPort = pOption->serverPort;
pDnode->dataDir = strdup(pOption->dataDir);
pDnode->localEp = strdup(pOption->localEp);
pDnode->localFqdn = strdup(pOption->localFqdn);
pDnode->firstEp = strdup(pOption->firstEp);
pDnode->secondEp = strdup(pOption->secondEp);
pDnode->disks = pOption->disks;
pDnode->numOfDisks = pOption->numOfDisks;
pDnode->ntype = pOption->ntype;
pDnode->rebootTime = taosGetTimestampMs();
if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL ||
pDnode->secondEp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
pDnode->lockfile = dndCheckRunning(pDnode->dataDir);
if (pDnode->lockfile == NULL) {
return -1;
}
}
return 0;
}
static void dndClearVars(SDnode *pDnode) {
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
taosMemoryFreeClear(pMgmt->path);
}
if (pDnode->lockfile != NULL) {
taosUnLockFile(pDnode->lockfile);
taosCloseFile(&pDnode->lockfile);
pDnode->lockfile = NULL;
}
taosMemoryFreeClear(pDnode->localEp);
taosMemoryFreeClear(pDnode->localFqdn);
taosMemoryFreeClear(pDnode->firstEp);
taosMemoryFreeClear(pDnode->secondEp);
taosMemoryFreeClear(pDnode->dataDir);
taosMemoryFree(pDnode);
dDebug("dnode object memory is cleared, data:%p", pDnode);
}
SDnode *dndCreate(const SDnodeOpt *pOption) {
dDebug("start to create dnode object");
int32_t code = -1;
char path[PATH_MAX] = {0};
SDnode *pDnode = NULL;
pDnode = taosMemoryCalloc(1, sizeof(SDnode));
if (pDnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (dndInitVars(pDnode, pOption) != 0) {
dError("failed to init variables since %s", terrstr());
goto _OVER;
}
dndSetStatus(pDnode, DND_STAT_INIT);
dmGetMgmtFp(&pDnode->wrappers[DNODE]);
mmGetMgmtFp(&pDnode->wrappers[MNODE]);
vmGetMgmtFp(&pDnode->wrappers[VNODES]);
qmGetMgmtFp(&pDnode->wrappers[QNODE]);
smGetMgmtFp(&pDnode->wrappers[SNODE]);
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
pWrapper->path = strdup(path);
pWrapper->shm.id = -1;
pWrapper->pDnode = pDnode;
if (pWrapper->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
pWrapper->procType = PROC_SINGLE;
taosInitRWLatch(&pWrapper->latch);
}
if (dndInitMsgHandle(pDnode) != 0) {
dError("failed to msg handles since %s", terrstr());
goto _OVER;
}
if (dndReadShmFile(pDnode) != 0) {
dError("failed to read shm file since %s", terrstr());
goto _OVER;
}
SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]);
tmsgSetDefaultMsgCb(&msgCb);
dInfo("dnode object is created, data:%p", pDnode);
code = 0;
_OVER:
if (code != 0 && pDnode) {
dndClearVars(pDnode);
pDnode = NULL;
dError("failed to create dnode object since %s", terrstr());
}
return pDnode;
}
void dndClose(SDnode *pDnode) {
if (pDnode == NULL) return;
if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
dError("dnode is shutting down, data:%p", pDnode);
return;
}
dInfo("start to close dnode, data:%p", pDnode);
dndSetStatus(pDnode, DND_STAT_STOPPED);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dndCloseNode(pWrapper);
}
dndClearVars(pDnode);
dInfo("dnode object is closed, data:%p", pDnode);
}
void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
dInfo("dnode object receive event %d, data:%p", event, pDnode);
if (event == DND_EVENT_STOP) {
pDnode->event = event;
}
}
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pRetWrapper = NULL;
}
taosRUnLockLatch(&pWrapper->latch);
return pRetWrapper;
}
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
code = -1;
}
taosRUnLockLatch(&pWrapper->latch);
return code;
}
void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return;
taosRLockLatch(&pWrapper->latch);
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
taosRUnLockLatch(&pWrapper->latch);
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
}
\ No newline at end of file
......@@ -62,3 +62,16 @@ const char *dndNodeProcStr(ENodeType ntype) {
return "taosd";
}
}
const char *dndEventStr(EDndEvent ev) {
switch (ev) {
case DND_EVENT_START:
return "start";
case DND_EVENT_STOP:
return "stop";
case DND_EVENT_CHILD:
return "child";
default:
return "UNKNOWN";
}
}
\ No newline at end of file
......@@ -735,6 +735,9 @@ typedef struct {
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
int32_t fixedSinkVgId; // 0 for shuffle
int64_t smaId; // 0 for unused
int8_t trigger;
int32_t triggerParam;
int64_t waterMark;
char* sql;
char* logicalPlan;
char* physicalPlan;
......
......@@ -74,4 +74,4 @@ int tsdbApplyRtn(STsdbRepo *pRepo);
}
#endif
#endif /* _TD_TSDB_COMMIT_H_ */
\ No newline at end of file
#endif /* _TD_TSDB_COMMIT_H_ */
......@@ -79,4 +79,4 @@ static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock,
}
#endif
#endif /*_TD_TSDB_DEF_H_*/
\ No newline at end of file
#endif /*_TD_TSDB_DEF_H_*/
......@@ -194,7 +194,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
......
......@@ -27,5 +27,5 @@ void metaCloseUidGnrt(SMeta *pMeta) { /* TODO */
tb_uid_t metaGenerateUid(SMeta *pMeta) {
// Generate a new table UID
return ++(pMeta->uidGnrt.nextUid);
return tGenIdPI32();
}
......@@ -250,7 +250,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
return 0;
}
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset;
......@@ -264,6 +264,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
fetchOffset = pReq->currentOffset + 1;
}
/*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/
SMqPollRsp rsp = {
/*.consumerId = consumerId,*/
.numOfTopics = 0,
......@@ -288,62 +290,77 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
rsp.reqOffset = pReq->currentOffset;
rsp.skipLogNum = 0;
SWalHead* pHead;
while (1) {
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
SWalReadHead* pHead;
if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
break;
}
int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
/*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType,
* pReq->epoch);*/
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
/*printf("from topic %s from consumer\n", pTopic->topicName, consumerId);*/
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock;
SSDataBlock* pDataBlock = NULL;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(false);
}
if (pDataBlock == NULL) {
fetchOffset++;
pos = fetchOffset % TQ_BUFFER_SIZE;
rsp.skipLogNum++;
/*pos = fetchOffset % TQ_BUFFER_SIZE;*/
break;
}
taosArrayPush(pRes, pDataBlock);
rsp.schema = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
}
if (taosArrayGetSize(pRes) == 0) {
fetchOffset++;
rsp.skipLogNum++;
taosArrayDestroy(pRes);
continue;
}
rsp.schema = pTopic->buffer.output[workerId].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
rsp.numOfTopics = 1;
rsp.pBlockData = pRes;
rsp.numOfTopics = 1;
rsp.pBlockData = pRes;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp);
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
return 0;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
taosMemoryFree(pHead);
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp);
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
/*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset,
* pHead->msgType,*/
/*pReq->epoch);*/
tmsgSendRsp(pMsg);
taosMemoryFree(pHead);
return 0;
} else {
taosMemoryFree(pHead);
fetchOffset++;
rsp.skipLogNum++;
}
......@@ -368,6 +385,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
/*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/
/*}*/
return 0;
......@@ -432,7 +450,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
};
pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
ASSERT(pTopic->buffer.output[i].task);
}
printf("set topic %s to consumer %ld\n", pTopic->topicName, req.consumerId);
taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.consumerId);
......
......@@ -168,6 +168,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
break;
}
if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {
/*if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {*/
taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock);
return NULL;
}
......
......@@ -1394,7 +1394,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
REPO_ID(pRepo), TABLE_UID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
return 0;
......
......@@ -4068,4 +4068,4 @@ void getTableListfromSkipList(tExprNode *pExpr, SSkipList *pSkipList, SArray *re
//apply the hierarchical filter expression to every node in skiplist to find the qualified nodes
applyFilterToSkipListNode(pSkipList, pExpr, result, param);
}
#endif
\ No newline at end of file
#endif
......@@ -66,7 +66,7 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg);
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
......
......@@ -165,6 +165,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// }
break;
case TDMT_VND_SUBMIT:
/*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/
if (pVnode->config.streamMode == 0) {
if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) {
// TODO: handle error
......
......@@ -63,9 +63,10 @@ typedef struct {
FstRegex *regexCreate(const char *str);
void regexSetup(FstRegex *regex, uint32_t size, const char *str);
// uint32_t regexStart()
uint32_t regexAutomStart(FstRegex *regex);
bool regexAutomIsMatch(FstRegex *regex, uint32_t state);
bool regexAutomCanMatch(FstRegex *regex, uint32_t state, bool null);
bool regexAutomAccept(FstRegex *regex, uint32_t state, uint8_t byte, uint32_t *result);
#ifdef __cplusplus
}
......
......@@ -23,9 +23,9 @@ extern "C" {
#endif
typedef struct FstSparseSet {
SArray *dense;
SArray *sparse;
int32_t size;
uint32_t *dense;
uint32_t *sparse;
int32_t size;
} FstSparseSet;
FstSparseSet *sparSetCreate(int32_t sz);
......
......@@ -27,7 +27,7 @@
#endif
#define INDEX_NUM_OF_THREADS 4
#define INDEX_QUEUE_SIZE 200
#define INDEX_QUEUE_SIZE 200
void* indexQhandle = NULL;
......
......@@ -21,8 +21,8 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000
#define MEM_THRESHOLD 1024 * 1024
#define MEM_TERM_LIMIT 10 * 10000
#define MEM_THRESHOLD 1024 * 1024
#define MEM_ESTIMATE_RADIO 1.5
static void indexMemRef(MemTable* tbl);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
YAML:9:25: error: unknown key 'AlignConsecutiveMacros' * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
......
......@@ -14,6 +14,7 @@
*/
#include "indexFstRegex.h"
#include "indexFstDfa.h"
#include "indexFstSparse.h"
FstRegex *regexCreate(const char *str) {
......@@ -26,9 +27,35 @@ FstRegex *regexCreate(const char *str) {
memcpy(orig, str, sz);
regex->orig = orig;
// construct insts based on str
SArray *insts = NULL;
FstDfaBuilder *builder = dfaBuilderCreate(insts);
regex->dfa = dfaBuilderBuild(builder);
return regex;
}
void regexSetup(FstRegex *regex, uint32_t size, const char *str) {
// return
// return;
uint32_t regexAutomStart(FstRegex *regex) {
///// no nothing
return 0;
}
bool regexAutomIsMatch(FstRegex *regex, uint32_t state) {
if (regex->dfa != NULL && dfaIsMatch(regex->dfa, state)) {
return true;
} else {
return false;
}
}
bool regexAutomCanMatch(FstRegex *regex, uint32_t state, bool null) {
// make frame happy
return null;
}
bool regexAutomAccept(FstRegex *regex, uint32_t state, uint8_t byte, uint32_t *result) {
if (regex->dfa == NULL) {
return false;
}
return dfaAccept(regex->dfa, state, byte, result);
}
......@@ -21,47 +21,44 @@ FstSparseSet *sparSetCreate(int32_t sz) {
return NULL;
}
ss->dense = taosArrayInit(sz, sizeof(uint32_t));
ss->sparse = taosArrayInit(sz, sizeof(uint32_t));
ss->size = sz;
ss->dense = (uint32_t *)taosMemoryCalloc(sz, sizeof(uint32_t));
ss->sparse = (uint32_t *)taosMemoryCalloc(sz, sizeof(uint32_t));
ss->size = 0;
return ss;
}
void sparSetDestroy(FstSparseSet *ss) {
if (ss == NULL) {
return;
}
taosArrayDestroy(ss->dense);
taosArrayDestroy(ss->sparse);
taosMemoryFree(ss->dense);
taosMemoryFree(ss->sparse);
taosMemoryFree(ss);
}
uint32_t sparSetLen(FstSparseSet *ss) { return ss == NULL ? 0 : ss->size; }
uint32_t sparSetLen(FstSparseSet *ss) {
// Get occupied size
return ss == NULL ? 0 : ss->size;
}
uint32_t sparSetAdd(FstSparseSet *ss, uint32_t ip) {
if (ss == NULL) {
return 0;
}
uint32_t i = ss->size;
taosArraySet(ss->dense, i, &ip);
taosArraySet(ss->sparse, ip, &i);
ss->dense[i] = ip;
ss->sparse[ip] = i;
ss->size += 1;
return i;
}
uint32_t sparSetGet(FstSparseSet *ss, uint32_t i) {
if (i >= taosArrayGetSize(ss->dense)) {
return 0;
}
uint32_t *v = taosArrayGet(ss->dense, i);
return *v;
// check later
return ss->dense[i];
}
bool sparSetContains(FstSparseSet *ss, uint32_t ip) {
if (ip >= taosArrayGetSize(ss->sparse)) {
return false;
}
uint32_t i = *(uint32_t *)taosArrayGet(ss->sparse, ip);
if (i >= taosArrayGetSize(ss->dense)) {
uint32_t i = ss->sparse[ip];
if (i < ss->size && ss->dense[i] == ip) {
return true;
} else {
return false;
}
uint32_t v = *(uint32_t *)taosArrayGet(ss->dense, i);
return v == ip;
}
void sparSetClear(FstSparseSet *ss) {
if (ss == NULL) {
......
......@@ -1217,6 +1217,9 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt
SName tableName;
int32_t code = getTableMetaImpl(
pCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), &pTableMeta);
if ((TSDB_CODE_TDB_INVALID_TABLE_ID == code || TSDB_CODE_VND_TB_NOT_EXIST == code) && pClause->ignoreNotExists) {
return TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_SUPER_TABLE == pTableMeta->tableType) {
code = doTranslateDropSuperTable(pCxt, &tableName, pClause->ignoreNotExists);
......@@ -2593,8 +2596,10 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
break;
default:
pQuery->directRpc = true;
TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg, SCmdMsgInfo*);
pQuery->msgType = pQuery->pCmdMsg->msgType;
if (NULL != pCxt->pCmdMsg) {
TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg, SCmdMsgInfo*);
pQuery->msgType = pQuery->pCmdMsg->msgType;
}
break;
}
......
......@@ -719,6 +719,11 @@ static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExc
if (NULL == pScan->pScanCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
code = sortScanCols(pScan->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = sortScanCols(pScan->pScanCols);
}
......
......@@ -38,7 +38,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg*
req.taskId = pTask->fixedEpDispatcher.taskId;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
// TODO fix tbname issue
// TODO use general name rule of schemaless
char ctbName[TSDB_TABLE_FNAME_LEN + 22];
// all groupId must be the same in an array
SSDataBlock* pBlock = taosArrayGet(data, 0);
......
......@@ -30,6 +30,9 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
pRead->curFileFirstVer = -1;
pRead->capacity = 0;
pRead->status = 0;
taosThreadMutexInit(&pRead->mutex, NULL);
pRead->pHead = taosMemoryMalloc(sizeof(SWalHead));
if (pRead->pHead == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
......@@ -135,6 +138,22 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return 0;
}
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead) {
taosThreadMutexLock(&pRead->mutex);
if (walReadWithHandle(pRead, ver) < 0) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
*ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.len);
if (*ppHead == NULL) {
taosThreadMutexUnlock(&pRead->mutex);
return -1;
}
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.len);
taosThreadMutexUnlock(&pRead->mutex);
return 0;
}
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int code;
// TODO: check wal life
......@@ -145,7 +164,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
}
}
if (!taosValidFile(pRead->pReadLogTFile)) return -1;
if (!taosValidFile(pRead->pReadLogTFile)) {
return -1;
}
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) {
......
......@@ -23,6 +23,9 @@ int32_t taosNewProc(char **args) {
int32_t pid = fork();
if (pid == 0) {
args[0] = tsProcPath;
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
return execvp(tsProcPath, args);
} else {
return pid;
......@@ -39,7 +42,7 @@ void taosSetProcName(int32_t argc, char **argv, const char *name) {
argv[i][j] = 0;
}
if (i == 0) {
tstrncpy(argv[0], name, len);
tstrncpy(argv[0], name, len + 1);
}
}
}
......@@ -48,5 +51,5 @@ void taosSetProcPath(int32_t argc, char **argv) { tsProcPath = argv[0]; }
bool taosProcExists(int32_t pid) {
int32_t p = getpgid(pid);
return p == 0;
return p >= 0;
}
......@@ -590,12 +590,12 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
}
int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
uInfo("load from global env variables");
uInfo("load from global env variables not implemented yet");
return 0;
}
int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *filepath) {
uInfo("load from env file %s", filepath);
uInfo("load from env file not implemented yet");
return 0;
}
......@@ -654,6 +654,6 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
}
int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
uInfo("load from apoll url %s", url);
uInfo("load from apoll url not implemented yet");
return 0;
}
......@@ -121,7 +121,8 @@ int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) {
return tjsonAddItemToArray(pJson, pJobj);
}
int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize, int32_t num) {
int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize,
int32_t num) {
if (num > 0) {
SJson* pJsonArray = tjsonAddArrayToObject(pJson, pName);
if (NULL == pJsonArray) {
......@@ -168,7 +169,7 @@ int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal
}
*pVal = strtol(p, NULL, 10);
return (errno == EINVAL || errno == ERANGE) ? TSDB_CODE_FAILED:TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
int32_t tjsonGetIntValue(const SJson* pJson, const char* pName, int32_t* pVal) {
......@@ -199,19 +200,19 @@ int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pV
}
*pVal = strtoul(p, NULL, 10);
return (errno == ERANGE||errno == EINVAL) ? TSDB_CODE_FAILED:TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal) {
uint64_t val = 0;
int32_t code = tjsonGetUBigIntValue(pJson, pName, &val);
int32_t code = tjsonGetUBigIntValue(pJson, pName, &val);
*pVal = val;
return code;
}
int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) {
uint64_t val = 0;
int32_t code = tjsonGetUBigIntValue(pJson, pName, &val);
int32_t code = tjsonGetUBigIntValue(pJson, pName, &val);
*pVal = val;
return code;
}
......@@ -264,7 +265,7 @@ int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, v
int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize) {
const cJSON* jArray = tjsonGetObjectItem(pJson, pName);
int32_t size = (NULL == jArray ? 0 : tjsonGetArraySize(jArray));
int32_t size = (NULL == jArray ? 0 : tjsonGetArraySize(jArray));
for (int32_t i = 0; i < size; ++i) {
int32_t code = func(tjsonGetArrayItem(jArray, i), (char*)pArray + itemSize * i);
if (TSDB_CODE_SUCCESS != code) {
......
......@@ -140,14 +140,16 @@ static void taosProcDestroySem(SProcQueue *pQueue) {
pQueue->sem = NULL;
}
}
#endif
static void taosProcCleanupQueue(SProcQueue *pQueue) {
#if 0
if (pQueue != NULL) {
taosProcDestroyMutex(pQueue);
taosProcDestroySem(pQueue);
}
}
#endif
}
static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody,
int32_t rawBodyLen, ProcFuncType ftype) {
......@@ -222,7 +224,6 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
taosThreadMutexLock(&pQueue->mutex);
if (pQueue->total - pQueue->avail <= 0) {
taosThreadMutexUnlock(&pQueue->mutex);
tsem_post(&pQueue->sem);
terrno = TSDB_CODE_OUT_OF_SHM_MEM;
return 0;
}
......@@ -317,7 +318,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize);
pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
// taosProcCleanupQueue(pProc->pChildQueue);
taosProcCleanupQueue(pProc->pChildQueue);
taosMemoryFree(pProc);
return NULL;
}
......@@ -336,7 +337,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->parentConsumeFp = pCfg->parentConsumeFp;
pProc->isChild = pCfg->isChild;
uDebug("proc:%s, is initialized, child:%d child queue:%p parent queue:%p", pProc->name, pProc->isChild,
uDebug("proc:%s, is initialized, isChild:%d child queue:%p parent queue:%p", pProc->name, pProc->isChild,
pProc->pChildQueue, pProc->pParentQueue);
return pProc;
......@@ -376,7 +377,7 @@ static void taosProcThreadLoop(SProcObj *pProc) {
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp,
mallocBodyFp, freeBodyFp);
if (numOfMsgs == 0) {
uInfo("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue);
uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue);
break;
} else if (numOfMsgs < 0) {
uTrace("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr());
......@@ -399,19 +400,19 @@ int32_t taosProcRun(SProcObj *pProc) {
return -1;
}
uDebug("proc:%s, start to consume queue:%p", pProc->name, pProc->pChildQueue);
uDebug("proc:%s, start to consume queue:%p, thread:%" PRId64, pProc->name, pProc->pChildQueue, pProc->thread);
return 0;
}
static void taosProcStop(SProcObj *pProc) {
if (!taosCheckPthreadValid(pProc->thread)) return;
uDebug("proc:%s, start to join thread", pProc->name);
uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread);
SProcQueue *pQueue;
if (pProc->isChild) {
pQueue = pProc->pParentQueue;
} else {
pQueue = pProc->pChildQueue;
} else {
pQueue = pProc->pParentQueue;
}
tsem_post(&pQueue->sem);
taosThreadJoin(pProc->thread, NULL);
......@@ -419,10 +420,11 @@ static void taosProcStop(SProcObj *pProc) {
void taosProcCleanup(SProcObj *pProc) {
if (pProc != NULL) {
uDebug("proc:%s, clean up", pProc->name);
uDebug("proc:%s, start to clean up", pProc->name);
taosProcStop(pProc);
// taosProcCleanupQueue(pProc->pChildQueue);
// taosProcCleanupQueue(pProc->pParentQueue);
taosProcCleanupQueue(pProc->pChildQueue);
taosProcCleanupQueue(pProc->pParentQueue);
uDebug("proc:%s, is cleaned up", pProc->name);
taosMemoryFree(pProc);
}
}
......
......@@ -5,6 +5,7 @@
./test.sh -f tsim/user/basic1.sim
# ---- db
./test.sh -f tsim/db/alter_option.sim
./test.sh -f tsim/db/basic1.sim
./test.sh -f tsim/db/basic2.sim
./test.sh -f tsim/db/basic3.sim
......
......@@ -59,22 +59,12 @@ endi
print ============= create database
#database_option: {
# BLOCKS value [3~1000, default: 6]
# | CACHE value [default: 16M]
# | CACHELAST value [0, 1, 2, 3]
# | COMP [0 | 1 | 2]
# | DAYS value [unit is minutes]
# | FSYNC value [0 ~ 180000 ms]
# | MAXROWS value [default: 4096]
# | MINROWS value [default: 100]
# | KEEP value [days, 365000]
# | PRECISION ['ms' | 'us' | 'ns']
# | QUORUM value [1 | 2]
# | REPLICA value [1 | 3]
# | TTL value [unit is day, min=1]
# | WAL value [1 | 2]
# | VGROUPS value [default: 2]
# | SINGLE_STABLE [0 | 1]
# | STREAM_MODE [0 | 1]
sql create database db BLOCKS 7 CACHE 3 CACHELAST 3 COMP 0 DAYS 240 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1000 PRECISION 'ns' QUORUM 1 REPLICA 3 TTL 7 WAL 2 VGROUPS 6 SINGLE_STABLE 1 STREAM_MODE 1
sql show databases
......@@ -187,22 +177,10 @@ sql_error alter database db quorum 4
sql_error alter database db quorum 5
#print ============== modify days
#sql alter database db days 480
#sql show databases
#print days $data6_db
#if $data6_db != 480 then # days
# return -1
#endi
#sql alter database db days 360
#sql show databases
#print days $data6_db
#if $data6_db != 360 then # days
# return -1
#endi
sql_error alter database db days 480
sql_error alter database db days 360
sql_error alter database db days 0
#sql_error alter database db days 14400 # set over than keep
sql_error alter database db days 14400 # set over than keep
print ============== modify keep
sql alter database db keep 2000
......@@ -230,25 +208,14 @@ sql_error alter database db keep -1
#sql_error alter database db keep 365001
print ============== modify cache
#sql alter database db cache 12
#sql show databases
#print cache $data8_db
#if $data8_db != 12 then
# return -1
#endi
#sql alter database db cache 1
#sql show databases
#print cache $data8_db
#if $data8_db != 6 then
# return -1
#endi
#
#sql_error alter database db cache 60
#sql_error alter database db cache 50
#sql_error alter database db cache 20
#sql_error alter database db cache 3
#sql_error alter database db cache 129
#sql_error alter database db cache 300
sql_error alter database db cache 12
sql_error alter database db cache 1
sql_error alter database db cache 60
sql_error alter database db cache 50
sql_error alter database db cache 20
sql_error alter database db cache 3
sql_error alter database db cache 129
sql_error alter database db cache 300
sql_error alter database db cache 0
sql_error alter database db cache -1
......@@ -276,45 +243,18 @@ sql_error alter database db blocks 0
sql_error alter database db blocks -1
sql_error alter database db blocks 10001
#print ============== modify minrows
#sql alter database db minrows 8
#sql show databases
#print minrows $data10_db
#if $data10_db != 8 then
# return -1
#endi
#sql alter database db minrows 200
#sql show databases
#print minrows $data10_db
#if $data10_db != 200 then
# return -1
#endi
#
#sql alter database db minrows 11
#sql show databases
#print minrows $data10_db
#if $data10_db != 11 then
# return -1
#endi
#sql_error alter database db minrows 8000
#sql_error alter database db minrows 8001
print ============== modify minrows
sql_error alter database db minrows 8
sql_error alter database db minrows 200
sql_error alter database db minrows 11
sql_error alter database db minrows 8000
sql_error alter database db minrows 8001
#print ============== modify maxrows
#sql alter database db maxrows 1000
#sql show databases
#print maxrows $data11_db
#if $data11_db != 1000 then
# return -1
#endi
#sql alter database db maxrows 2000
#sql show databases
#print maxrows $data11_db
#if $data11_db != 2000 then
# return -1
#endi
#
#sql_error alter database db maxrows 11 # equal minrows
#sql_error alter database db maxrows 10 # little than minrows
print ============== modify maxrows
sql_error alter database db maxrows 1000
sql_error alter database db maxrows 2000
sql_error alter database db maxrows 11 # equal minrows
sql_error alter database db maxrows 10 # little than minrows
print ============== step wal
sql alter database db wal 1
......@@ -330,7 +270,7 @@ if $data12_db != 2 then
return -1
endi
sql_error alter database db wal 0
#sql_error alter database db wal 0 # TD-14436
sql_error alter database db wal 3
sql_error alter database db wal 100
sql_error alter database db wal -1
......@@ -348,35 +288,20 @@ print fsync $data13_db
if $data13_db != 500 then
return -1
endi
sql_error alter database db fsync 0
sql_error alter database db fsync -1
print ============== modify comp
sql alter database db comp 1
sql show databases
print comp $data14_db
if $data14_db != 1 then
return -1
endi
sql alter database db comp 2
sql alter database db fsync 0
sql show databases
print comp $data14_db
if $data14_db != 2 then
return -1
endi
sql alter database db comp 1
sql show databases
print comp $data14_db
if $data14_db != 1 then
return -1
endi
sql alter database db comp 0
sql show databases
print comp $data14_db
if $data14_db != 0 then
print fsync $data13_db
if $data13_db != 0 then
return -1
endi
sql_error alter database db fsync 180001
sql_error alter database db fsync -1
print ============== modify comp
sql_error alter database db comp 1
sql_error alter database db comp 2
sql_error alter database db comp 1
sql_error alter database db comp 0
sql_error alter database db comp 3
sql_error alter database db comp 4
sql_error alter database db comp 5
......@@ -414,30 +339,15 @@ if $data15_db != 3 then
return -1
endi
sql_error alter database db comp 4
sql_error alter database db comp 10
sql_error alter database db comp -1
sql_error alter database db cachelast 4
sql_error alter database db cachelast 10
sql_error alter database db cachelast -1
print ============== modify precision
sql alter database db precision 'ms'
sql show databases
print precision $data16_db
if $data16_db != ms then
return -1
endi
sql alter database db precision 'us'
sql show databases
print precision $data16_db
if $data16_db != us then
return -1
endi
sql alter database db precision 'ns'
sql show databases
print precision $data16_db
if $data16_db != ns then
return -1
endi
sql_error alter database db precision 'ms'
sql_error alter database db precision 'us'
sql_error alter database db precision 'ns'
sql_error alter database db precision 'ys'
sql_error alter database db prec 'xs'
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -7,7 +7,7 @@ sql connect
print =============== create database
sql create database d0
sql show databases
if $rows != 2 then
if $rows != 2 then
return -1
endi
......@@ -17,7 +17,7 @@ print =============== create super table and child table
sql create table stb (ts timestamp, tbcol int) tags (t1 int)
sql show stables
print $rows $data00 $data01 $data02
if $rows != 1 then
if $rows != 1 then
return -1
endi
......@@ -29,7 +29,7 @@ sql show tables
print $rows $data00 $data10 $data20
if $rows != 4 then
return -1
endi
endi
print =============== insert data into child table ct1 (s)
sql insert into ct1 values ( '2022-01-01 01:01:01.000', 1 )
......@@ -83,13 +83,13 @@ print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 5 then
return -1
endi
endi
if $data00 != 1 then
return -1
endi
endi
if $data40 != 1 then
return -1
endi
endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(10s)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(10s)
......@@ -101,13 +101,13 @@ print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 5 then
return -1
endi
endi
if $data00 != 1 then
return -1
endi
endi
if $data40 != 1 then
return -1
endi
endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(5s)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sliding(5s)
......@@ -123,16 +123,16 @@ print ===> rows7: $data70 $data71 $data72 $data73 $data74
print ===> rows8: $data80 $data81 $data82 $data83 $data84
if $rows != 9 then
return -1
endi
endi
if $data00 != 1 then
return -1
endi
endi
if $data70 != 2 then
return -1
endi
endi
if $data80 != 1 then
return -1
endi
endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h)
......@@ -144,10 +144,10 @@ print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
if $rows != 4 then
return -1
endi
endi
if $data00 != 1 then
return -1
endi
endi
if $data10 != 2 then
return -1
endi
......@@ -155,27 +155,54 @@ endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04 $data05
print ===> rows1: $data10 $data11 $data12 $data13 $data14 $data15
print ===> rows2: $data20 $data21 $data22 $data23 $data24 $data25
print ===> rows3: $data30 $data31 $data32 $data33 $data34 $data35
print ===> rows4: $data40 $data41 $data42 $data43 $data44 $data45
print ===> rows5: $data50 $data51 $data52 $data53 $data54 $data55
print ===> rows6: $data60 $data61 $data62 $data63 $data64 $data65
print ===> rows7: $data70 $data71 $data72 $data73 $data74 $data75
print ===> rows0: $data00 $data01 $data02 $data03 $data04 $data05
print ===> rows1: $data10 $data11 $data12 $data13 $data14 $data15
print ===> rows2: $data20 $data21 $data22 $data23 $data24 $data25
print ===> rows3: $data30 $data31 $data32 $data33 $data34 $data35
print ===> rows4: $data40 $data41 $data42 $data43 $data44 $data45
print ===> rows5: $data50 $data51 $data52 $data53 $data54 $data55
print ===> rows6: $data60 $data61 $data62 $data63 $data64 $data65
print ===> rows7: $data70 $data71 $data72 $data73 $data74 $data75
if $rows != 8 then
return -1
endi
endi
if $data00 != 1 then
return -1
endi
endi
if $data10 != 2 then
return -1
endi
endi
if $data70 != 1 then
return -1
endi
sql select _wstartts, _wendts, _wduration, _qstartts, _qendts, count(tbcol) from ct2 interval(1d, 2h) sliding(12h)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h)
print ===> rows: $rows
print ===> rows0: $data00 $data01 $data02 $data03 $data04 $data05
print ===> rows1: $data10 $data11 $data12 $data13 $data14 $data15
print ===> rows2: $data20 $data21 $data22 $data23 $data24 $data25
print ===> rows3: $data30 $data31 $data32 $data33 $data34 $data35
print ===> rows4: $data40 $data41 $data42 $data43 $data44 $data45
print ===> rows5: $data50 $data51 $data52 $data53 $data54 $data55
print ===> rows6: $data60 $data61 $data62 $data63 $data64 $data65
print ===> rows7: $data70 $data71 $data72 $data73 $data74 $data75
if $rows != 8 then
return -1
endi
if $data05 != 1 then
return -1
endi
if $data15 != 2 then
return -1
endi
if $data75 != 1 then
return -1
endi
if $data02 != 86400000 then
return -1
endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w)
print ===> rows: $rows
......@@ -184,15 +211,15 @@ print ===> rows1: $data10 $data11 $data12 $data13 $data14
print ===> rows2: $data20 $data21 $data22 $data23 $data24
print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
#if $rows != 5 then
# return -1
#endi
#if $data00 != 1 then
# return -1
#endi
#if $data40 != 1 then
# return -1
#endi
# if $rows != 5 then
# return -1
# endi
# f $data00 != 1 then
# return -1
# endi
# if $data40 != 1 then
# return -1
# endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(2w)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(2w)
......@@ -204,13 +231,13 @@ print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
#if $rows != 5 then
# return -1
#endi
#endi
#if $data00 != 1 then
# return -1
#endi
#endi
#if $data40 != 1 then
# return -1
#endi
#endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(4w)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(4w)
......@@ -225,13 +252,13 @@ print ===> rows6: $data60 $data61 $data62 $data63 $data64
print ===> rows7: $data70 $data71 $data72 $data73 $data74
#if $rows != 8 then
# return -1
#endi
#endi
#if $data00 != 2 then
# return -1
#endi
#endi
#if $data70 != 1 then
# return -1
#endi
#endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n)
......@@ -243,13 +270,13 @@ print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
#if $rows != 5 then
# return -1
#endi
#endi
#if $data00 != 1 then
# return -1
#endi
#endi
#if $data40 != 1 then
# return -1
#endi
#endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) sliding(6n)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) sliding(6n)
......@@ -261,13 +288,13 @@ print ===> rows3: $data30 $data31 $data32 $data33 $data34
print ===> rows4: $data40 $data41 $data42 $data43 $data44
#if $rows != 5 then
# return -1
#endi
#endi
#if $data00 != 1 then
# return -1
#endi
#endi
#if $data40 != 1 then
# return -1
#endi
#endi
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) sliding(12n)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) sliding(12n)
......@@ -282,13 +309,13 @@ print ===> rows6: $data60 $data61 $data62 $data63 $data64
print ===> rows7: $data70 $data71 $data72 $data73 $data74
#if $rows != 8 then
# return -1
#endi
#endi
#if $data00 != 2 then
# return -1
#endi
#endi
#if $data70 != 1 then
# return -1
#endi
#endi
#=================================================
print =============== stop and restart taosd
......@@ -322,9 +349,4 @@ endi
#sql select count(*) from car where ts > '2019-05-14 00:00:00' interval(1y, 5d)
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -29,18 +29,18 @@ $i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
endw
$i = $i + 1
endw
endw
print =============== step2
$i = 1
......@@ -49,51 +49,51 @@ $tb = $tbPrefix . $i
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb interval(1m)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb interval(1m)
print ===> $rows $data01 $data05
if $rows != $rowNum then
if $rows != $rowNum then
return -1
endi
if $data00 != 1 then
if $data00 != 1 then
return -1
endi
if $data04 != 1 then
if $data04 != 1 then
return -1
endi
#print =============== step3
print =============== step3
#$cc = 4 * 60000
#$ms = 1601481600000 + $cc
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m)
#print ===> $rows $data01 $data05
#if $rows != 5 then
#if $rows != 5 then
# return -1
#endi
#if $data00 != 1 then
#if $data00 != 1 then
# return -1
#endi
#if $data04 != 1 then
#if $data04 != 1 then
# return -1
#endi
#print =============== step4
print =============== step4
#$cc = 40 * 60000
#$ms = 1601481600000 + $cc
#$cc = 1 * 60000
#$ms2 = 1601481600000 - $cc
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m)
#print ===> $rows $data01 $data05
#if $rows != 20 then
# return -1
#endi
#if $data00 != 1 then
# return -1
#endi
#if $data04 != 1 then
# return -1
#endi
sql select _wstartts, _wendts, _wduration, _qstartts, _qendts, count(tbcol) from $tb interval(1m)
print ===> select _wstartts, _wendts, _wduration, _qstartts, _qendts, count(tbcol) from $tb interval(1m)
print ===> $rows $data01 $data05
if $rows != $rowNum then
return -1
endi
if $data05 != 1 then
return -1
endi
if $data02 != 60000 then
return -1
endi
#print =============== step5
#$cc = 40 * 60000
......@@ -105,13 +105,13 @@ endi
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0)
#print ===> $rows $data21 $data25
#if $rows != 42 then
#if $rows != 42 then
# return -1
#endi
#if $data20 != 1 then
#if $data20 != 1 then
# return -1
#endi
#if $data24 != 1 then
#if $data24 != 1 then
# return -1
#endi
......@@ -119,10 +119,10 @@ endi
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m)
#print ===> $rows $data11
#if $rows != 20 then
#if $rows != 20 then
# return -1
#endi
#if $data11 != 10 then
#if $data11 != 10 then
# return -1
#endi
......@@ -132,10 +132,10 @@ endi
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m)
#print ===> $rows $data11
#if $rows != 5 then
#if $rows != 5 then
# return -1
#endi
#if $data11 != 10 then
#if $data11 != 10 then
# return -1
#endi
......@@ -149,10 +149,10 @@ endi
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m)
#print ===> $rows $data11
#if $rows != 20 then
#if $rows != 20 then
# return -1
#endi
#if $data11 != 10 then
#if $data11 != 10 then
# return -1
#endi
#
......@@ -166,18 +166,18 @@ endi
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0)
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0)
#print ===> $rows $data11
#if $rows != 42 then
#if $rows != 42 then
# return -1
#endi
#if $data11 != 10 then
#if $data11 != 10 then
# return -1
#endi
print =============== clear
#sql drop database $db
#sql show databases
#if $rows != 0 then
#if $rows != 0 then
# return -1
#endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
run tsim/user/basic1.sim
run tsim/db/alter_option.sim
run tsim/db/basic1.sim
run tsim/db/basic2.sim
run tsim/db/basic3.sim
......
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
# scene1: vgroups=1, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# scene2: vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# scene3: vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# scene4: vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
######## ######## ######## ######## ######## ######## ######## ######## ######## ########
######## This test case include scene2 and scene4
######## ######## ######## ######## ######## ######## ######## ######## ######## ########
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
$loop_cnt = 0
$vgroups = 1
$dbNamme = d0
loop_vgroups:
print =============== create database $dbNamme vgroups $vgroups
sql create database $dbNamme vgroups $vgroups
sql show databases
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
if $loop_cnt == 0 then
if $rows != 2 then
return -1
endi
if $data02 != 1 then # vgroups
print vgroups: $data02
return -1
endi
else
if $rows != 3 then
return -1
endi
if $data00 == d1 then
if $data02 != 4 then # vgroups
print vgroups: $data02
return -1
endi
else
if $data12 != 4 then # vgroups
print vgroups: $data12
return -1
endi
endi
endi
sql use $dbNamme
print =============== create super table
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
$tbPrefix = ct
$tbNum = 100
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using stb tags( $i )
$i = $i + 1
endw
print =============== create normal table
sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10))
print =============== create multi topics. notes: now only support:
print =============== 1. columns from stb/ctb/ntb; 2. * from ctb/ntb; 3. function from stb/ctb/ntb
print =============== will support: * from stb
sql create topic topic_stb_column as select ts, c1, c3 from stb
#sql create topic topic_stb_all as select * from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
sql create topic topic_ctb_column as select ts, c1, c3 from ct0
sql create topic topic_ctb_all as select * from ct0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0
sql create topic topic_ntb_column as select ts, c1, c3 from ntb
sql create topic topic_ntb_all as select * from ntb
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb
sql show tables
if $rows != 101 then
return -1
endi
print =============== insert data
$rowNum = 100
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = ' . binary
$binary = $binary . $c
$binary = $binary . '
sql insert into $tb values ($tstart , $c , $x , $binary )
sql insert into ntb values ($tstart , $c , $x , $binary )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
$tstart = 1640966400000
endw
#root@trd02 /home $ tmq_sim --help
# -c Configuration directory, default is
# -d The name of the database for cosumer, no default
# -t The topic string for cosumer, no default
# -k The key-value string for cosumer, no default
# -g showMsgFlag, default is 0
#
$totalMsgCnt = $rowNum * $tbNum
print inserted totalMsgCnt: $totalMsgCnt
# supported key:
# group.id:<xxx>
# enable.auto.commit:<true | false>
# auto.offset.reset:<earliest | latest | none>
# td.connect.ip:<fqdn | ipaddress>
# td.connect.user:root
# td.connect.pass:taosdata
# td.connect.port:6030
# td.connect.db:db
$numOfTopics = 2
$totalMsgCntOfmultiTopics = $totalMsgCnt * $numOfTopics
$expect_result = @{consume success: @
$expect_result = $expect_result . $totalMsgCntOfmultiTopics
$expect_result = $expect_result . @, @
$expect_result = $expect_result . 0}
print expect_result----> $expect_result
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function, topic_stb_all" -k "group.id:tg2"
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function, topic_stb_all" -k "group.id:tg2"
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 20000, 0}@ then
if $system_content != $expect_result then
return -1
endi
$numOfTopics = 3
$totalMsgCntOfmultiTopics = $rowNum * $numOfTopics
$expect_result = @{consume success: @
$expect_result = $expect_result . $totalMsgCntOfmultiTopics
$expect_result = $expect_result . @, @
$expect_result = $expect_result . 0}
print expect_result----> $expect_result
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column, topic_ctb_function, topic_ctb_all" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column, topic_ctb_function, topic_ctb_all" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 300, 0}@ then
if $system_content != $expect_result then
return -1
endi
$numOfTopics = 3
$totalMsgCntOfmultiTopics = $totalMsgCnt * $numOfTopics
$expect_result = @{consume success: @
$expect_result = $expect_result . $totalMsgCntOfmultiTopics
$expect_result = $expect_result . @, @
$expect_result = $expect_result . 0}
print expect_result----> $expect_result
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column, topic_ntb_all, topic_ntb_function" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column, topic_ntb_all, topic_ntb_function" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 30000, 0}@ then
if $system_content != $expect_result then
return -1
endi
if $loop_cnt == 0 then
$loop_cnt = 1
$vgroups = 4
$dbNamme = d1
goto loop_vgroups
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
# scene1: vgroups=1, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# scene2: vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# scene3: vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# scene4: vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
######## ######## ######## ######## ######## ######## ######## ######## ######## ########
######## This test case include scene1 and scene3
######## ######## ######## ######## ######## ######## ######## ######## ######## ########
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
$loop_cnt = 0
$vgroups = 1
$dbNamme = d0
loop_vgroups:
print =============== create database $dbNamme vgroups $vgroups
sql create database $dbNamme vgroups $vgroups
sql show databases
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
if $loop_cnt == 0 then
if $rows != 2 then
return -1
endi
if $data02 != 1 then # vgroups
print vgroups: $data02
return -1
endi
else
if $rows != 3 then
return -1
endi
if $data00 == d1 then
if $data02 != 4 then # vgroups
print vgroups: $data02
return -1
endi
else
if $data12 != 4 then # vgroups
print vgroups: $data12
return -1
endi
endi
endi
sql use $dbNamme
print =============== create super table
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
$tbPrefix = ct
$tbNum = 100
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using stb tags( $i )
$i = $i + 1
endw
print =============== create normal table
sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10))
print =============== create multi topics. notes: now only support:
print =============== 1. columns from stb/ctb/ntb; 2. * from ctb/ntb; 3. function from stb/ctb/ntb
print =============== will support: * from stb
sql create topic topic_stb_column as select ts, c1, c3 from stb
#sql create topic topic_stb_all as select * from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
sql create topic topic_ctb_column as select ts, c1, c3 from ct0
sql create topic topic_ctb_all as select * from ct0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0
sql create topic topic_ntb_column as select ts, c1, c3 from ntb
sql create topic topic_ntb_all as select * from ntb
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb
sql show tables
if $rows != 101 then
return -1
endi
print =============== insert data
$rowNum = 100
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = ' . binary
$binary = $binary . $c
$binary = $binary . '
sql insert into $tb values ($tstart , $c , $x , $binary )
sql insert into ntb values ($tstart , $c , $x , $binary )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
$tstart = 1640966400000
endw
#root@trd02 /home $ tmq_sim --help
# -c Configuration directory, default is
# -d The name of the database for cosumer, no default
# -t The topic string for cosumer, no default
# -k The key-value string for cosumer, no default
# -g showMsgFlag, default is 0
#
$totalMsgCnt = $rowNum * $tbNum
print inserted totalMsgCnt: $totalMsgCnt
# supported key:
# group.id:<xxx>
# enable.auto.commit:<true | false>
# auto.offset.reset:<earliest | latest | none>
# td.connect.ip:<fqdn | ipaddress>
# td.connect.user:root
# td.connect.pass:taosdata
# td.connect.port:6030
# td.connect.db:db
$expect_result = @{consume success: @
$expect_result = $expect_result . $totalMsgCnt
$expect_result = $expect_result . @, @
$expect_result = $expect_result . 0}
print expect_result----> $expect_result
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 10000, 0}@ then
if $system_content != $expect_result then
return -1
endi
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
#print cmd result----> $system_content
##if $system_content != @{consume success: 10000, 0}@ then
#if $system_content != $expect_result then
# return -1
#endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 10000, 0}@ then
if $system_content != $expect_result then
return -1
endi
$expect_result = @{consume success: @
$expect_result = $expect_result . $rowNum
$expect_result = $expect_result . @, @
$expect_result = $expect_result . 0}
print expect_result----> $expect_result
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 100, 0}@ then
if $system_content != $expect_result then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 100, 0}@ then
if $system_content != $expect_result then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 100, 0}@ then
if $system_content != $expect_result then
return -1
endi
$expect_result = @{consume success: @
$expect_result = $expect_result . $totalMsgCnt
$expect_result = $expect_result . @, @
$expect_result = $expect_result . 0}
print expect_result----> $expect_result
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 10000, 0}@ then
if $system_content != $expect_result then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 10000, 0}@ then
if $system_content != $expect_result then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2"
print cmd result----> $system_content
#if $system_content != @{consume success: 10000, 0}@ then
if $system_content != $expect_result then
return -1
endi
if $loop_cnt == 0 then
$loop_cnt = 1
$vgroups = 4
$dbNamme = d1
goto loop_vgroups
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) {
int32_t totalRows = 0;
int32_t skipLogNum = 0;
while (running) {
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 6000);
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 3000);
if (tmqMsg) {
totalMsgs++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册