未验证 提交 3fd488e8 编写于 作者: W wade zhang 提交者: GitHub

Merge branch '3.0' into docs/wade-pyudf

......@@ -10,7 +10,7 @@ User-defined functions can be scalar functions or aggregate functions. Scalar fu
TDengine supports user-defined functions written in C or Python. This document describes the usage of user-defined functions.
## Implement a UDF in C
## Implement a UDF in C
When you create a user-defined function, you must implement standard interface functions:
- For scalar functions, implement the `scalarfn` interface function.
......@@ -111,13 +111,13 @@ Interface functions return a value that indicates whether the operation was succ
For information about the parameters for interface functions, see Data Model
#### Scalar Interface
`int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
`int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
Replace `scalarfn` with the name of your function. This function performs scalar calculations on data blocks. You can configure a value through the parameters in the `resultColumn` structure.
The parameters in the function are defined as follows:
- inputDataBlock: The data block to input.
- resultColumn: The column to output. The column to output.
- resultColumn: The column to output. The column to output.
#### Aggregate Interface
......@@ -197,7 +197,7 @@ The data structure is described as follows:
- The SUdfDataBlock block includes the number of rows (numOfRows) and the number of columns (numCols). udfCols[i] (0 <= i <= numCols-1) indicates that each column is of type SUdfColumn.
- SUdfColumn includes the definition of the data type of the column (colMeta) and the data in the column (colData).
- The member definitions of SUdfColumnMeta are the same as the data type definitions in `taos.h`.
- The data in SUdfColumnData can become longer. varLenCol indicates variable-length data, and fixLenCol indicates fixed-length data.
- The data in SUdfColumnData can become longer. varLenCol indicates variable-length data, and fixLenCol indicates fixed-length data.
- SUdfInterBuf defines the intermediate structure `buffer` and the number of results in the buffer `numOfResult`.
Additional functions are defined in `taosudf.h` to make it easier to work with these structures.
......@@ -350,7 +350,10 @@ def init():
# initialization
def destroy():
# destroy
def process(input: datablock) -> tuple[output_type]:
def process(input: datablock) -> tuple[output_type]:
# process input datablock,
# datablock.data(row, col) is to access the python object in location(row,col)
# return tuple object consisted of object of type outputtype
```
Note:process() must be implemeted, init() and destroy() must be defined too but they can do nothing.
......@@ -366,12 +369,12 @@ def start() -> bytes:
#return serialize(init_state)
def reduce(inputs: datablock, buf: bytes) -> bytes
# deserialize buf to state
# reduce the inputs and state into new_state.
# use inputs.data(i,j) to access python ojbect of location(i,j)
# reduce the inputs and state into new_state.
# use inputs.data(i,j) to access python object of location(i,j)
# serialize new_state into new_state_bytes
return new_state_bytes
return new_state_bytes
def finish(buf: bytes) -> output_type:
#return obj of type outputtype
#return obj of type outputtype
```
Note: aggregate UDF requires init(), destroy(), start(), reduce() and finish() to be impemented. start() generates the initial result in buffer, then the input data is divided into multiple row data blocks, reduce() is invoked for each data block `inputs` and intermediate `buf`, finally finish() is invoked to generate final result from the intermediate result `buf`.
......@@ -844,8 +847,9 @@ select * from ins_functions \G;
SHOW functions;
DROP FUNCTION function_name;
```
### More Python UDF Samples
#### Scalar Function [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py)
The `pybitand` function implements bitwise addition for multiple columns. If there is only one column, the column is returned. The `pybitand` function ignores null values.
......
......@@ -111,7 +111,7 @@ The parameters described in this document by the effect that they have on the sy
| Attribute | Description |
| ------------- | ---------------------------------------------- |
| Applicable | Client/Server |
| Meaning | The maximum waiting time to get avaliable conn |
| Meaning | The maximum waiting time to get available conn |
| Value Range | 10-50000000(ms) |
| Default Value | 500000 |
......
......@@ -367,7 +367,7 @@ def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
# deserialize buf to state
# reduce the inputs and state into new_state.
# use inputs.data(i,j) to access python ojbect of location(i,j)
# use inputs.data(i,j) to access python object of location(i,j)
# serialize new_state into new_state_bytes
return new_state_bytes
def finish(buf: bytes) -> output_type:
......
......@@ -180,6 +180,7 @@ extern int32_t tsRpcRetryInterval;
extern bool tsDisableStream;
extern int64_t tsStreamBufferSize;
extern int64_t tsCheckpointInterval;
extern bool tsFilterScalarMode;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......
......@@ -149,6 +149,7 @@ struct SWalReader {
TdFilePtr pIdxFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated data
int64_t capacity;
TdThreadMutex mutex;
SWalFilterCond cond;
......@@ -200,6 +201,8 @@ int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader *pReader);
int64_t walReaderGetValidFirstVer(const SWalReader *pReader);
int64_t walReaderGetSkipToVersion(SWalReader *pReader);
void walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver);
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever);
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
......
......@@ -409,6 +409,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MNODE_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0413) // internal
#define TSDB_CODE_MNODE_ONLY_TWO_MNODE TAOS_DEF_ERROR_CODE(0, 0x0414) // internal
#define TSDB_CODE_MNODE_NO_NEED_RESTORE TAOS_DEF_ERROR_CODE(0, 0x0415) // internal
#define TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x0416)
// vnode
// #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x
......
......@@ -251,7 +251,7 @@ int32_t smlClearForRerun(SSmlHandle *info);
int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg);
uint8_t smlGetTimestampLen(int64_t num);
void clearColValArray(SArray* pCols);
void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag);
void smlDestroyTableInfo(void *para);
void freeSSmlKv(void* data);
int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
......
......@@ -1757,6 +1757,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
taosMemoryFreeClear(pResultInfo->convertJson);
pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
if (pResultInfo->convertJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
char* p1 = pResultInfo->convertJson;
......
......@@ -230,6 +230,16 @@ void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tin
}
}
static void smlDestroySTableMeta(void *para) {
SSmlSTableMeta *meta = *(SSmlSTableMeta**)para;
taosHashCleanup(meta->tagHash);
taosHashCleanup(meta->colHash);
taosArrayDestroy(meta->tags);
taosArrayDestroy(meta->cols);
taosMemoryFreeClear(meta->tableMeta);
taosMemoryFree(meta);
}
SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
if (!meta) {
......@@ -264,7 +274,7 @@ SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
return meta;
cleanup:
taosMemoryFree(meta);
smlDestroySTableMeta(meta);
return NULL;
}
......@@ -1035,6 +1045,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
}
}
taosMemoryFreeClear(sTableData->tableMeta);
sTableData->tableMeta = pTableMeta;
uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid,
pTableMeta->sversion, pTableMeta->tversion) tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
......@@ -1093,15 +1104,6 @@ static void smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols) {
}
}
static void smlDestroySTableMeta(SSmlSTableMeta *meta) {
taosHashCleanup(meta->tagHash);
taosHashCleanup(meta->colHash);
taosArrayDestroy(meta->tags);
taosArrayDestroy(meta->cols);
taosMemoryFree(meta->tableMeta);
taosMemoryFree(meta);
}
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg) {
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
......@@ -1141,7 +1143,8 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
return TSDB_CODE_SUCCESS;
}
void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
void smlDestroyTableInfo(void *para) {
SSmlTableInfo *tag = *(SSmlTableInfo**)para;
for (size_t i = 0; i < taosArrayGetSize(tag->cols); i++) {
SHashObj *kvHash = (SHashObj *)taosArrayGetP(tag->cols, i);
taosHashCleanup(kvHash);
......@@ -1178,18 +1181,18 @@ void smlDestroyInfo(SSmlHandle *info) {
qDestroyQuery(info->pQuery);
// destroy info->childTables
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (oneTable) {
smlDestroyTableInfo(info, *oneTable);
oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
}
// SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
// while (oneTable) {
// smlDestroyTableInfo(oneTable);
// oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
// }
// destroy info->superTables
SSmlSTableMeta **oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
while (oneSTable) {
smlDestroySTableMeta(*oneSTable);
oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, oneSTable);
}
// SSmlSTableMeta **oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
// while (oneSTable) {
// smlDestroySTableMeta(*oneSTable);
// oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, oneSTable);
// }
// destroy info->pVgHash
taosHashCleanup(info->pVgHash);
......@@ -1248,6 +1251,8 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
info->childTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
info->tableUids = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
info->superTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
taosHashSetFreeFp(info->superTables, smlDestroySTableMeta);
taosHashSetFreeFp(info->childTables, smlDestroyTableInfo);
info->id = smlGenId();
info->pQuery = smlInitHandle();
......@@ -1354,6 +1359,9 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat,
info->lineNum);
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
if(meta == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
terrno = 0;
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
......@@ -1473,18 +1481,18 @@ static void smlPrintStatisticInfo(SSmlHandle *info) {
int32_t smlClearForRerun(SSmlHandle *info) {
info->reRun = false;
// clear info->childTables
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (oneTable) {
smlDestroyTableInfo(info, *oneTable);
oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
}
// SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
// while (oneTable) {
// smlDestroyTableInfo(info, *oneTable);
// oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, oneTable);
// }
// clear info->superTables
SSmlSTableMeta **oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
while (oneSTable) {
smlDestroySTableMeta(*oneSTable);
oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, oneSTable);
}
// SSmlSTableMeta **oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
// while (oneSTable) {
// smlDestroySTableMeta(*oneSTable);
// oneSTable = (SSmlSTableMeta **)taosHashIterate(info->superTables, oneSTable);
// }
taosHashClear(info->childTables);
taosHashClear(info->superTables);
......
......@@ -695,6 +695,10 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
if(sMeta == NULL){
taosMemoryFreeClear(pTableMeta);
return TSDB_CODE_OUT_OF_MEMORY;
}
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES);
for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){
......@@ -784,7 +788,7 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
smlDestroyTableInfo(info, tinfo);
smlDestroyTableInfo(&tinfo);
return TSDB_CODE_SML_INVALID_DATA;
}
}
......@@ -1048,12 +1052,18 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
return TSDB_CODE_TSC_INVALID_JSON;
}
info->lineNum = payloadNum;
info->dataFormat = true;
if (unlikely(info->lines != NULL)) {
for (int i = 0; i < info->lineNum; i++) {
taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv);
if (info->lines[i].measureTagsLen != 0) taosMemoryFree(info->lines[i].measureTag);
}
taosMemoryFree(info->lines);
info->lines = NULL;
}
info->lineNum = payloadNum;
info->dataFormat = true;
ret = smlClearForRerun(info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
......
......@@ -168,6 +168,10 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
if(sMeta == NULL){
taosMemoryFreeClear(pTableMeta);
return TSDB_CODE_OUT_OF_MEMORY;
}
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES);
for (int i = pTableMeta->tableInfo.numOfColumns;
......@@ -326,7 +330,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlDestroyTableInfo(info, tinfo);
smlDestroyTableInfo(&tinfo);
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
......@@ -372,6 +376,10 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
return TSDB_CODE_SUCCESS;
}
*tmp = smlBuildSTableMeta(info->dataFormat);
if(*tmp == NULL){
taosMemoryFreeClear(pTableMeta);
return TSDB_CODE_OUT_OF_MEMORY;
}
(*tmp)->tableMeta = pTableMeta;
taosHashPut(info->superTables, currElement->measure, currElement->measureLen, tmp, POINTER_BYTES);
......
......@@ -91,6 +91,10 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
return TSDB_CODE_SUCCESS;
}
sMeta = smlBuildSTableMeta(info->dataFormat);
if(sMeta == NULL){
taosMemoryFreeClear(pTableMeta);
return TSDB_CODE_OUT_OF_MEMORY;
}
sMeta->tableMeta = pTableMeta;
taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES);
for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){
......@@ -212,7 +216,7 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
smlDestroyTableInfo(info, tinfo);
smlDestroyTableInfo(&tinfo);
return TSDB_CODE_SML_INVALID_DATA;
}
}
......
......@@ -1100,7 +1100,7 @@ TEST(clientCase, sub_tb_test) {
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_t1");
tmq_list_append(topicList, "t1");
// 启动订阅
tmq_subscribe(tmq, topicList);
......@@ -1118,7 +1118,7 @@ TEST(clientCase, sub_tb_test) {
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
int32_t code = tmq_get_topic_assignment(tmq, "topic_t1", &pAssign, &numOfAssign);
int32_t code = tmq_get_topic_assignment(tmq, "t1", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_consumer_close(tmq);
......@@ -1127,7 +1127,16 @@ TEST(clientCase, sub_tb_test) {
return;
}
tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, 0);
tmq_offset_seek(tmq, "t1", pAssign[0].vgId, 4);
code = tmq_get_topic_assignment(tmq, "t1", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
......
......@@ -208,6 +208,7 @@ char tsUdfdLdLibPath[512] = "";
bool tsDisableStream = false;
int64_t tsStreamBufferSize = 128 * 1024 * 1024;
int64_t tsCheckpointInterval = 3 * 60 * 60 * 1000;
bool tsFilterScalarMode = false;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
......@@ -522,6 +523,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1;
GRANT_CFG_ADD;
return 0;
}
......@@ -898,6 +901,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
tsCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i64;
tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval;
GRANT_CFG_GET;
return 0;
}
......
......@@ -986,8 +986,15 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
}
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
if ((numOfVnodes > 0 || pMObj != NULL || pSObj != NULL || pQObj != NULL) && !force) {
if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
if (numOfVnodes > 0 || pMObj != NULL || pSObj != NULL || pQObj != NULL) {
bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
if (isonline && force) {
terrno = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
if (!isonline && !force) {
terrno = TSDB_CODE_DNODE_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
......
......@@ -510,8 +510,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req);
......@@ -537,7 +535,12 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = currentVer; // return current consume offset value
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
if (currentVer == -1) { // not start to read data from wal yet, return req offset directly
dataRsp.rspOffset.version = reqOffset.version;
} else {
dataRsp.rspOffset.version = currentVer; // return current consume offset value
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
......@@ -1085,6 +1088,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
walReaderSeekVer(pTask->exec.pWalReader, sversion);
pTask->chkInfo.currentVer = sversion;
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
......@@ -1300,19 +1304,22 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
int32_t vgId = pTq->pStreamMeta->vgId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// no lock needs to secure the access of the version
if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) { // discard all the data when the stream task is suspended.
pTask->chkInfo.currentVer = sversion;
walReaderSeekVer(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) {
// discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
}
if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
......@@ -1321,6 +1328,8 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
streamSchedExec(pTask);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
} else {
tqError("vgId:%d failed to find the s-task:0x%x for resume stream task", vgId, pReq->taskId);
}
return 0;
......@@ -1429,7 +1438,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
......@@ -1437,7 +1446,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
......@@ -1445,7 +1454,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosWUnLockLatch(&pMeta->lock);
return -1;
}
......@@ -1456,7 +1465,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
......@@ -87,6 +87,16 @@ static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
}
}
int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
if (skipToVer != 0 && skipToVer > pTask->chkInfo.currentVer) {
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
return code;
}
tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -703,6 +703,7 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
*pTmpLastCol = *pLastCol;
pLastCol = pTmpLastCol;
reallocVarData(&pLastCol->colVal);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
......@@ -789,7 +790,9 @@ static int32_t tsdbCacheLoadFromRocks(STsdb *pTsdb, tb_uid_t uid, SArray *pLastA
code = -1;
}
taosArraySet(pLastArray, idxKey->idx, pLastCol);
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol);
taosArrayRemove(remainCols, j);
taosMemoryFree(values_list[i]);
......@@ -825,7 +828,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
// reallocVarData(&lastCol.colVal);
reallocVarData(&lastCol.colVal);
taosArrayPush(pLastArray, &lastCol);
if (h) {
......@@ -853,8 +856,8 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
taosArraySet(pLastArray, idxKey->idx, &lastCol);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
......@@ -937,14 +940,14 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
}
// build keys & multi get from rocks
int num_keys = pTSchema->numOfCols;
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
int num_keys = pTSchema->numOfCols;
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
const size_t klen = ROCKS_KEY_LEN;
for (int i = 0; i < num_keys; ++i) {
int16_t cid = pTSchema->columns[i].colId;
size_t klen = ROCKS_KEY_LEN;
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid};
((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid};
......@@ -960,39 +963,35 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksMayWrite(pTsdb, true, false, false);
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs);
for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]);
}
for (int i = 0; i < num_keys * 2; ++i) {
rocksdb_free(errs[i]);
if (errs[i]) {
rocksdb_free(errs[i]);
}
}
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(errs);
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
SLastKey *key = &(SLastKey){.ltype = 1, .uid = uid, .cid = pLastCol->colVal.cid};
size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_delete(wb, (char *)key, klen);
taosLRUCacheErase(pTsdb->lruCache, key, klen);
rocksdb_writebatch_delete(wb, keys_list[i], klen);
}
taosLRUCacheErase(pTsdb->lruCache, keys_list[i], klen);
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) {
SLastKey *key = &(SLastKey){.ltype = 0, .uid = uid, .cid = pLastCol->colVal.cid};
size_t klen = ROCKS_KEY_LEN;
rocksdb_writebatch_delete(wb, (char *)key, klen);
taosLRUCacheErase(pTsdb->lruCache, key, klen);
rocksdb_writebatch_delete(wb, keys_list[num_keys + i], klen);
}
taosLRUCacheErase(pTsdb->lruCache, keys_list[num_keys + i], klen);
rocksdb_free(values_list[i]);
rocksdb_free(values_list[i + num_keys]);
}
for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]);
}
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
......@@ -1871,10 +1870,14 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
if (isLast && (pColData->flag & HAS_VALUE)) {
skipBlock = false;
break;
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
} /*else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
skipBlock = false;
break;
}
}*/
}
if (!isLast) {
skipBlock = false;
}
if (skipBlock) {
......@@ -1908,6 +1911,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
if (checkRemainingRow) {
bool skipBlock = true;
int inputColIndex = 0;
if (aCols[0] == PRIMARYKEY_TIMESTAMP_COL_ID) {
++inputColIndex;
}
for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) {
SColData *pColData = &state->pBlockData->aColData[colIndex];
int16_t cid = pColData->cid;
......@@ -1916,15 +1922,19 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
if (isLast && (pColData->flag & HAS_VALUE)) {
skipBlock = false;
break;
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
} /*else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
skipBlock = false;
break;
}
}*/
++inputColIndex;
}
}
if (!isLast) {
skipBlock = false;
}
if (skipBlock) {
if (--state->iBlock < 0) {
tsdbDataFReaderClose(state->pDataFReader);
......@@ -2145,9 +2155,14 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
return false;
} else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
// if (key->version <= pItemFront->version || key->version <= pItemBack->version) {
return true;
} else {
return false;
if (*iSkyline > 1) {
--*iSkyline;
} else {
return false;
}
}
} else {
if (*iSkyline > 1) {
......@@ -2959,7 +2974,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
do {
TSDBROW *pRow = NULL;
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
if (!pRow) {
break;
......
......@@ -315,14 +315,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
tsdbCacheGetBatch(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
// tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
if (TARRAY_SIZE(pRow) <= 0) {
// taosArrayClearEx(pRow, freeItem);
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
continue;
}
SLastCol* pColVal = taosArrayGet(pRow, 0);
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
// taosArrayClearEx(pRow, freeItem);
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
continue;
}
......@@ -381,8 +381,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
}
// taosArrayClearEx(pRow, freeItem);
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
}
if (hasRes) {
......@@ -394,20 +394,20 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
if (TARRAY_SIZE(pRow) <= 0) {
// taosArrayClearEx(pRow, freeItem);
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
// taosArrayClearEx(pRow, freeItem);
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
continue;
}
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
// taosArrayClearEx(pRow, freeItem);
taosArrayClear(pRow);
taosArrayClearEx(pRow, freeItem);
// taosArrayClear(pRow);
taosArrayPush(pTableUidList, &uid);
......
......@@ -190,9 +190,9 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
}
*/
if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) {
tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
}
// if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) {
tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
//}
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" at version %" PRId64,
......
......@@ -1848,7 +1848,7 @@ static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBloc
SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
info.overlapWithDelInfo || info.overlapWithLastBlock);
info.overlapWithDelInfo || info.overlapWithLastBlock || info.partiallyRequired);
return isCleanFileBlock;
}
......
......@@ -573,8 +573,18 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD
if (colDataIsNull_var(pDst, j)) {
colDataSetNull_var(pDst, numOfRows);
} else {
// fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first copy it to p2
char* p1 = colDataGetVarData(pDst, j);
colDataSetVal(pDst, numOfRows, p1, false);
int32_t len = 0;
if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
len = getJsonValueLen(p1);
} else {
len = varDataTLen(p1);
}
char* p2 = taosMemoryMalloc(len);
memcpy(p2, p1, len);
colDataSetVal(pDst, numOfRows, p2, false);
taosMemoryFree(p2);
}
numOfRows += 1;
j += 1;
......
......@@ -600,9 +600,9 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) {
return udf;
} else {
(*pUdfHash)->expired = true;
taosHashRemove(global.udfsHash, udfName, strlen(udfName));
fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64,
(*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime);
taosHashRemove(global.udfsHash, udfName, strlen(udfName));
}
}
......
......@@ -8008,6 +8008,9 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
isJson = true;
code = buildJsonTagVal(pCxt, pTagSchema, pVal, pTagArray, ppTag);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pVal);
}
taosArrayPush(tagName, pTagSchema->name);
} else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL && !pVal->isNull) {
char* tmpVal = nodesGetValueFromNode(pVal);
......@@ -8328,13 +8331,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
SArray* pTagVals = taosArrayInit(1, sizeof(STagVal));
int32_t code = TSDB_CODE_SUCCESS;
STag* pTag = NULL;
do {
code = parseJsontoTagData(pStmt->pVal->literal, pTagVals, &pTag, &pCxt->msgBuf);
if (TSDB_CODE_SUCCESS != code) {
break;
}
} while (0);
code = parseJsontoTagData(pStmt->pVal->literal, pTagVals, &pTag, &pCxt->msgBuf);
taosArrayDestroy(pTagVals);
if (code != TSDB_CODE_SUCCESS) {
return code;
......
......@@ -416,7 +416,7 @@ int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, voi
end:
taosHashCleanup(keyHash);
if (retCode == TSDB_CODE_SUCCESS) {
tTagNew(pTagVals, 1, true, ppTag);
retCode = tTagNew(pTagVals, 1, true, ppTag);
}
for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) {
STagVal* p = (STagVal*)taosArrayGet(pTagVals, i);
......
......@@ -227,8 +227,10 @@ typedef struct SFltTreeStat {
SFilterInfo *info;
} SFltTreeStat;
typedef struct SFltScalarCtx {
SNode *node;
SArray* fltSclRange;
} SFltScalarCtx;
typedef struct SFltBuildGroupCtx {
......@@ -237,6 +239,11 @@ typedef struct SFltBuildGroupCtx {
int32_t code;
} SFltBuildGroupCtx;
typedef struct {
SColumnNode *colNode;
SArray *points;
} SFltSclColumnRange;
struct SFilterInfo {
bool scalarMode;
SFltScalarCtx sclCtx;
......
此差异已折叠。
......@@ -1791,7 +1791,11 @@ void vectorNotMatch(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOu
void vectorIsNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord) {
for (int32_t i = 0; i < pLeft->numOfRows; ++i) {
int8_t v = IS_HELPER_NULL(pLeft->columnData, i) ? 1 : 0;
if (v) {
++pOut->numOfQualified;
}
colDataSetInt8(pOut->columnData, i, &v);
colDataClearNull_f(pOut->columnData->nullbitmap, i);
}
pOut->numOfRows = pLeft->numOfRows;
}
......@@ -1799,7 +1803,11 @@ void vectorIsNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut,
void vectorNotNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord) {
for (int32_t i = 0; i < pLeft->numOfRows; ++i) {
int8_t v = IS_HELPER_NULL(pLeft->columnData, i) ? 0 : 1;
if (v) {
++pOut->numOfQualified;
}
colDataSetInt8(pOut->columnData, i, &v);
colDataClearNull_f(pOut->columnData->nullbitmap, i);
}
pOut->numOfRows = pLeft->numOfRows;
}
......@@ -1812,6 +1820,13 @@ void vectorIsTrue(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut,
colDataSetInt8(pOut->columnData, i, &v);
colDataClearNull_f(pOut->columnData->nullbitmap, i);
}
{
bool v = false;
GET_TYPED_DATA(v, bool, pOut->columnData->info.type, colDataGetData(pOut->columnData, i));
if (v) {
++pOut->numOfQualified;
}
}
}
pOut->columnData->hasNull = false;
}
......@@ -1851,7 +1866,9 @@ void vectorJsonContains(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam
char *pLeftData = colDataGetVarData(pLeft->columnData, i);
getJsonValue(pLeftData, jsonKey, &isExist);
}
if (isExist) {
++pOut->numOfQualified;
}
colDataSetVal(pOutputCol, i, (const char *)(&isExist), false);
}
taosMemoryFree(jsonKey);
......
......@@ -327,7 +327,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
if (streamTaskShouldPause(&pTask->status)) {
return 0;
if (batchSize > 1) {
break;
} else {
return 0;
}
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
......
......@@ -108,6 +108,14 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); }
void walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver) { atomic_store_64(&pReader->skipToVersion, ver); }
// this function is NOT multi-thread safe, and no need to be.
int64_t walReaderGetSkipToVersion(SWalReader *pReader) {
int64_t newVersion = pReader->skipToVersion;
pReader->skipToVersion = 0;
return newVersion;
}
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever) {
*sver = walGetFirstVer(pReader->pWal);
......
......@@ -325,6 +325,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_NOT_CATCH_UP, "Mnode didn't catch th
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_ALREADY_IS_VOTER, "Mnode already is a leader")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_ONLY_TWO_MNODE, "Only two mnodes exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MNODE_NO_NEED_RESTORE, "No need to restore on this dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE, "Please use this command when the dnode is offline")
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed")
......
......@@ -571,7 +571,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
#,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py
......@@ -1089,7 +1089,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/timetruncate.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Timediff.py -Q 4
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/bottom.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/percentile.py -Q 4
......@@ -1274,6 +1274,7 @@
,,y,script,./test.sh -f tsim/parser/columnValue_tinyint.sim
,,y,script,./test.sh -f tsim/parser/columnValue_unsign.sim
,,y,script,./test.sh -f tsim/parser/condition.sim
,,y,script,./test.sh -f tsim/parser/condition_scl.sim
,,y,script,./test.sh -f tsim/parser/constCol.sim
,,y,script,./test.sh -f tsim/parser/create_db.sim
,,y,script,./test.sh -f tsim/parser/create_mt.sim
......@@ -1353,6 +1354,7 @@
,,y,script,./test.sh -f tsim/query/event.sim
,,y,script,./test.sh -f tsim/query/forceFill.sim
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
,,y,script,./test.sh -f tsim/query/emptyTsRange_scl.sim
,,y,script,./test.sh -f tsim/query/partitionby.sim
,,y,script,./test.sh -f tsim/query/tableCount.sim
,,y,script,./test.sh -f tsim/query/tag_scan.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c filterScalarMode -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql drop database if exists cdb
sql create database if not exists cdb
sql use cdb
sql create table stb1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(10), t3 double)
sql create table tb1 using stb1 tags(1,'1',1.0)
sql create table tb2 using stb1 tags(2,'2',2.0)
sql create table tb3 using stb1 tags(3,'3',3.0)
sql create table tb4 using stb1 tags(4,'4',4.0)
sql create table tb5 using stb1 tags(5,'5',5.0)
sql create table tb6 using stb1 tags(6,'6',6.0)
sql insert into tb1 values ('2021-05-05 18:19:00',1,1.0,1,1,1,1.0,true ,'1','1')
sql insert into tb1 values ('2021-05-05 18:19:01',2,2.0,2,2,2,2.0,true ,'2','2')
sql insert into tb1 values ('2021-05-05 18:19:02',3,3.0,3,3,3,3.0,false,'3','3')
sql insert into tb1 values ('2021-05-05 18:19:03',4,4.0,4,4,4,4.0,false,'4','4')
sql insert into tb1 values ('2021-05-05 18:19:04',11,11.0,11,11,11,11.0,true ,'11','11')
sql insert into tb1 values ('2021-05-05 18:19:05',12,12.0,12,12,12,12.0,true ,'12','12')
sql insert into tb1 values ('2021-05-05 18:19:06',13,13.0,13,13,13,13.0,false,'13','13')
sql insert into tb1 values ('2021-05-05 18:19:07',14,14.0,14,14,14,14.0,false,'14','14')
sql insert into tb2 values ('2021-05-05 18:19:08',21,21.0,21,21,21,21.0,true ,'21','21')
sql insert into tb2 values ('2021-05-05 18:19:09',22,22.0,22,22,22,22.0,true ,'22','22')
sql insert into tb2 values ('2021-05-05 18:19:10',23,23.0,23,23,23,23.0,false,'23','23')
sql insert into tb2 values ('2021-05-05 18:19:11',24,24.0,24,24,24,24.0,false,'24','24')
sql insert into tb3 values ('2021-05-05 18:19:12',31,31.0,31,31,31,31.0,true ,'31','31')
sql insert into tb3 values ('2021-05-05 18:19:13',32,32.0,32,32,32,32.0,true ,'32','32')
sql insert into tb3 values ('2021-05-05 18:19:14',33,33.0,33,33,33,33.0,false,'33','33')
sql insert into tb3 values ('2021-05-05 18:19:15',34,34.0,34,34,34,34.0,false,'34','34')
sql insert into tb4 values ('2021-05-05 18:19:16',41,41.0,41,41,41,41.0,true ,'41','41')
sql insert into tb4 values ('2021-05-05 18:19:17',42,42.0,42,42,42,42.0,true ,'42','42')
sql insert into tb4 values ('2021-05-05 18:19:18',43,43.0,43,43,43,43.0,false,'43','43')
sql insert into tb4 values ('2021-05-05 18:19:19',44,44.0,44,44,44,44.0,false,'44','44')
sql insert into tb5 values ('2021-05-05 18:19:20',51,51.0,51,51,51,51.0,true ,'51','51')
sql insert into tb5 values ('2021-05-05 18:19:21',52,52.0,52,52,52,52.0,true ,'52','52')
sql insert into tb5 values ('2021-05-05 18:19:22',53,53.0,53,53,53,53.0,false,'53','53')
sql insert into tb5 values ('2021-05-05 18:19:23',54,54.0,54,54,54,54.0,false,'54','54')
sql insert into tb6 values ('2021-05-05 18:19:24',61,61.0,61,61,61,61.0,true ,'61','61')
sql insert into tb6 values ('2021-05-05 18:19:25',62,62.0,62,62,62,62.0,true ,'62','62')
sql insert into tb6 values ('2021-05-05 18:19:26',63,63.0,63,63,63,63.0,false,'63','63')
sql insert into tb6 values ('2021-05-05 18:19:27',64,64.0,64,64,64,64.0,false,'64','64')
sql insert into tb6 values ('2021-05-05 18:19:28',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
sql create table stb2 (ts timestamp, u1 int unsigned, u2 bigint unsigned, u3 smallint unsigned, u4 tinyint unsigned, ts2 timestamp) TAGS(t1 int unsigned, t2 bigint unsigned, t3 timestamp, t4 int)
sql create table tb2_1 using stb2 tags(1,1,'2021-05-05 18:38:38',1)
sql create table tb2_2 using stb2 tags(2,2,'2021-05-05 18:58:58',2)
sql insert into tb2_1 values ('2021-05-05 18:19:00',1,2,3,4,'2021-05-05 18:28:01')
sql insert into tb2_1 values ('2021-05-05 18:19:01',5,6,7,8,'2021-05-05 18:28:02')
sql insert into tb2_1 values ('2021-05-05 18:19:02',2,2,3,4,'2021-05-05 18:28:03')
sql insert into tb2_1 values ('2021-05-05 18:19:03',5,6,7,8,'2021-05-05 18:28:04')
sql insert into tb2_1 values ('2021-05-05 18:19:04',3,2,3,4,'2021-05-05 18:28:05')
sql insert into tb2_1 values ('2021-05-05 18:19:05',5,6,7,8,'2021-05-05 18:28:06')
sql insert into tb2_1 values ('2021-05-05 18:19:06',4,2,3,4,'2021-05-05 18:28:07')
sql insert into tb2_1 values ('2021-05-05 18:19:07',5,6,7,8,'2021-05-05 18:28:08')
sql insert into tb2_1 values ('2021-05-05 18:19:08',5,2,3,4,'2021-05-05 18:28:09')
sql insert into tb2_1 values ('2021-05-05 18:19:09',5,6,7,8,'2021-05-05 18:28:10')
sql insert into tb2_1 values ('2021-05-05 18:19:10',6,2,3,4,'2021-05-05 18:28:11')
sql insert into tb2_2 values ('2021-05-05 18:19:11',5,6,7,8,'2021-05-05 18:28:12')
sql insert into tb2_2 values ('2021-05-05 18:19:12',7,2,3,4,'2021-05-05 18:28:13')
sql insert into tb2_2 values ('2021-05-05 18:19:13',5,6,7,8,'2021-05-05 18:28:14')
sql insert into tb2_2 values ('2021-05-05 18:19:14',8,2,3,4,'2021-05-05 18:28:15')
sql insert into tb2_2 values ('2021-05-05 18:19:15',5,6,7,8,'2021-05-05 18:28:16')
sql create table stb3 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(10), t3 double)
sql create table tb3_1 using stb3 tags(1,'1',1.0)
sql create table tb3_2 using stb3 tags(2,'2',2.0)
sql insert into tb3_1 values ('2021-01-05 18:19:00',1,1.0,1,1,1,1.0,true ,'1','1')
sql insert into tb3_1 values ('2021-02-05 18:19:01',2,2.0,2,2,2,2.0,true ,'2','2')
sql insert into tb3_1 values ('2021-03-05 18:19:02',3,3.0,3,3,3,3.0,false,'3','3')
sql insert into tb3_1 values ('2021-04-05 18:19:03',4,4.0,4,4,4,4.0,false,'4','4')
sql insert into tb3_1 values ('2021-05-05 18:19:28',5,NULL,5,NULL,5,NULL,true,NULL,'5')
sql insert into tb3_1 values ('2021-06-05 18:19:28',NULL,6.0,NULL,6,NULL,6.0,NULL,'6',NULL)
sql insert into tb3_1 values ('2021-07-05 18:19:28',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
sql insert into tb3_2 values ('2021-01-06 18:19:00',11,11.0,11,11,11,11.0,true ,'11','11')
sql insert into tb3_2 values ('2021-02-06 18:19:01',12,12.0,12,12,12,12.0,true ,'12','12')
sql insert into tb3_2 values ('2021-03-06 18:19:02',13,13.0,13,13,13,13.0,false,'13','13')
sql insert into tb3_2 values ('2021-04-06 18:19:03',14,14.0,14,14,14,14.0,false,'14','14')
sql insert into tb3_2 values ('2021-05-06 18:19:28',15,NULL,15,NULL,15,NULL,true,NULL,'15')
sql insert into tb3_2 values ('2021-06-06 18:19:28',NULL,16.0,NULL,16,NULL,16.0,NULL,'16',NULL)
sql insert into tb3_2 values ('2021-07-06 18:19:28',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
sql create table stb4 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9),c10 binary(16300)) TAGS(t1 int, t2 binary(10), t3 double)
sql create table tb4_0 using stb4 tags(0,'0',0.0)
sql create table tb4_1 using stb4 tags(1,'1',1.0)
sql create table tb4_2 using stb4 tags(2,'2',2.0)
sql create table tb4_3 using stb4 tags(3,'3',3.0)
sql create table tb4_4 using stb4 tags(4,'4',4.0)
$i = 0
$ts0 = 1625850000000
$blockNum = 5
$delta = 0
$tbname0 = tb4_
$a = 0
$b = 200
$c = 400
while $i < $blockNum
$x = 0
$rowNum = 1200
while $x < $rowNum
$ts = $ts0 + $x
$a = $a + 1
$b = $b + 1
$c = $c + 1
$d = $x / 10
$tin = $rowNum
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
$tbname = 'tb4_ . $i
$tbname = $tbname . '
sql insert into $tbname values ( $ts , $a , $b , $c , $d , $d , $c , true, $binary , $nchar , $binary )
$x = $x + 1
endw
$i = $i + 1
$ts0 = $ts0 + 259200000
endw
run tsim/parser/condition_query.sim
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
run tsim/parser/condition_query.sim
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c filterScalarMode -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql drop database if exists db1;
sql create database if not exists db1;
sql use db1;
sql create stable sta (ts timestamp, f1 double, f2 binary(200)) tags(t1 int);
sql create table tba1 using sta tags(1);
sql insert into tba1 values ('2022-04-26 15:15:01', 1.0, "a");
sql insert into tba1 values ('2022-04-26 15:15:02', 2.0, "b");
sql insert into tba1 values ('2022-04-26 15:15:04', 4.0, "b");
sql insert into tba1 values ('2022-04-26 15:15:05', 5.0, "b");
sql select last_row(*) from sta where ts >= 1678901803783 and ts <= 1678901803783 and _c0 <= 1678901803782 interval(10d,8d) fill(linear) order by _wstart desc;
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册