提交 f2693313 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0' into feature/3.0_wxy

......@@ -70,7 +70,7 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7
enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__MAX };
enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__UDFD, CONN_TYPE__MAX };
enum {
HEARTBEAT_KEY_USER_AUTHINFO = 1,
......@@ -2097,6 +2097,18 @@ enum {
TOPIC_SUB_TYPE__TABLE,
};
typedef struct {
SMsgHead head;
int64_t leftForVer;
int32_t vgId;
int64_t consumerId;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
} SMqVDeleteReq;
typedef struct {
int8_t reserved;
} SMqVDeleteRsp;
typedef struct {
int64_t leftForVer;
int32_t vgId;
......@@ -2532,11 +2544,15 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
}
typedef struct {
void* data;
int64_t streamId;
int32_t taskId;
int32_t sourceVg;
int64_t sourceVer;
SArray* data; // SArray<SSDataBlock>
} SStreamDispatchReq;
typedef struct {
int8_t status;
int8_t inputStatus;
} SStreamDispatchRsp;
#define TD_AUTO_CREATE_TABLE 0x1
......
......@@ -178,6 +178,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
......
......@@ -281,6 +281,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E8)
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
#define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB)
// mnode-stream
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
......
......@@ -216,6 +216,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
......
......@@ -308,6 +308,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SUBMIT_RSMA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
......
......@@ -459,6 +459,7 @@ typedef struct {
char* ast;
char* physicalPlan;
SSchemaWrapper schema;
int32_t refConsumerCnt;
} SMqTopicObj;
typedef struct {
......
......@@ -38,6 +38,9 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c
}
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic);
#ifdef __cplusplus
}
......
......@@ -32,6 +32,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
#ifdef __cplusplus
}
......
......@@ -35,6 +35,8 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
#ifdef __cplusplus
}
#endif
......
......@@ -399,6 +399,9 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
for (int32_t i = 0; i < newTopicNum; i++) {
char *topic = taosArrayGetP(newSub, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
......@@ -406,7 +409,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
goto SUBSCRIBE_OVER;
}
// TODO lock topic to prevent drop
// ref topic to prevent drop
// TODO make topic complete
SMqTopicObj topicObj = {0};
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
mndReleaseTopic(pMnode, pTopic);
}
......@@ -422,8 +432,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
......@@ -494,8 +502,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
goto SUBSCRIBE_OVER;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
}
......@@ -503,6 +509,8 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
SUBSCRIBE_OVER:
mndTransDrop(pTrans);
if (pConsumerOld) {
/*taosRUnLockLatch(&pConsumerOld->lock);*/
mndReleaseConsumer(pMnode, pConsumerOld);
......
......@@ -50,6 +50,14 @@ int32_t mndInitOffset(SMnode *pMnode) {
void mndCleanupOffset(SMnode *pMnode) {}
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic) {
int32_t i = 0;
while (pOffset->key[i] != ':') i++;
while (pOffset->key[i] != ':') i++;
if (strcmp(&pOffset->key[i + 1], topic) == 0) return true;
return false;
}
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
......@@ -134,10 +142,11 @@ int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicNa
int32_t sz = taosArrayGetSize(vgs);
for (int32_t i = 0; i < sz; i++) {
int32_t vgId = *(int32_t *)taosArrayGet(vgs, i);
SMqOffsetObj offsetObj;
SMqOffsetObj offsetObj = {0};
if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, vgId) < 0) {
return -1;
}
// TODO assign db
offsetObj.offset = -1;
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj);
if (pOffsetRaw == NULL) {
......@@ -240,6 +249,14 @@ static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOff
return 0;
}
static int32_t mndSetDropOffsetRedoLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) {
SSdbRaw *pRedoRaw = mndOffsetActionEncode(pOffset);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
......@@ -247,7 +264,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
void *pIter = NULL;
SMqOffsetObj *pOffset = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pOffset);
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
if (pIter == NULL) break;
if (pOffset->dbUid != pDb->uid) {
......@@ -256,8 +273,39 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
}
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
goto END;
}
sdbRelease(pSdb, pOffset);
}
code = 0;
END:
return code;
}
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqOffsetObj *pOffset = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
if (pIter == NULL) break;
if (!mndOffsetFromTopic(pOffset, topic)) {
sdbRelease(pSdb, pOffset);
continue;
}
if (mndSetDropOffsetRedoLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
goto END;
}
sdbRelease(pSdb, pOffset);
}
code = 0;
......
......@@ -73,6 +73,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
......@@ -389,7 +390,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebOutputObj *pOutput) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
if (pTrans == NULL) {
return -1;
}
......@@ -458,6 +459,20 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
goto REB_FAIL;
}
}
if (consumerNum) {
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic) {
// TODO make topic complete
SMqTopicObj topicObj = {0};
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
}
}
// 4. TODO commit log: modification log
// 5. set cb
......@@ -688,6 +703,14 @@ static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
return 0;
}
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
if (pCommitRaw == NULL) return -1;
......@@ -712,6 +735,57 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
}
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub);
goto END;
}
}
code = 0;
END:
return code;
}
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqSubscribeObj *pSub = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
if (pIter == NULL) break;
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
if (strcmp(topic, topicName) != 0) {
sdbRelease(pSdb, pSub);
continue;
}
// iter all vnode to delete handle
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
pReq->head.vgId = htonl(pVgEp->vgId);
pReq->vgId = pVgEp->vgId;
pReq->consumerId = -1;
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
STransAction action = {0};
action.epSet = pVgEp->epSet;
action.pCont = pReq;
action.contLen = sizeof(SMqVDeleteReq);
action.msgType = TDMT_VND_MQ_VG_DELETE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
}
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub);
goto END;
}
}
......
......@@ -18,8 +18,10 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndSubscribe.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
......@@ -106,6 +108,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
......@@ -190,6 +193,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
goto TOPIC_DECODE_OVER;
}
SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
terrno = TSDB_CODE_SUCCESS;
......@@ -220,11 +225,13 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
taosWLockLatch(&pOldTopic->lock);
atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);
/*taosWLockLatch(&pOldTopic->lock);*/
// TODO handle update
taosWUnLockLatch(&pOldTopic->lock);
/*taosWUnLockLatch(&pOldTopic->lock);*/
return 0;
}
......@@ -292,6 +299,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql);
topicObj.sqlLen = strlen(pCreate->sql) + 1;
topicObj.refConsumerCnt = 0;
if (pCreate->ast && pCreate->ast[0]) {
topicObj.ast = strdup(pCreate->ast);
......@@ -436,15 +444,7 @@ CREATE_TOPIC_OVER:
return code;
}
static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) {
// TODO: cannot drop when subscribed
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
return -1;
}
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SNodeMsg *pReq, SMqTopicObj *pTopic) {
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
......@@ -465,6 +465,7 @@ static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic)
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
SMDropTopicReq dropReq = {0};
if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
......@@ -485,10 +486,35 @@ static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
return -1;
}
}
// TODO: check ref
int32_t code = mndDropTopic(pMnode, pReq, pTopic);
// TODO: iterate and drop related subscriptions and offsets
// check ref
if (pTopic->refConsumerCnt != 0) {
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
return -1;
}
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
#if 1
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0);
return -1;
}
#endif
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0);
return -1;
}
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
mndReleaseTopic(pMnode, pTopic);
if (code != 0) {
......@@ -577,6 +603,15 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
return numOfRows;
}
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
if (pCommitRaw == NULL) return -1;
......
......@@ -196,6 +196,8 @@ struct SMetaEntry {
STSma *tsma;
} smaEntry;
};
uint8_t *pBuf;
};
struct SMetaReader {
......
......@@ -116,6 +116,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
......
......@@ -498,6 +498,18 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
entry.version = version;
// do actual write
metaWLock(pMeta);
// save to table db
metaSaveToTbDb(pMeta, &entry);
tdbDbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0);
metaSaveToSkmDb(pMeta, &entry);
metaULock(pMeta);
tDecoderClear(&dc);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
......@@ -511,8 +523,97 @@ _err:
}
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
// TODO
SMetaEntry ctbEntry = {0};
SMetaEntry stbEntry = {0};
void *pVal = NULL;
int nVal = 0;
int ret;
int c;
tb_uid_t uid;
int64_t oversion;
const void *pData = NULL;
int nData = 0;
// search name index
ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
}
uid = *(tb_uid_t *)pVal;
tdbFree(pVal);
pVal = NULL;
// search uid index
TDBC *pUidIdxc = NULL;
tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
tdbDbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c);
ASSERT(c == 0);
tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData);
oversion = *(int64_t *)pData;
// search table.db
TDBC *pTbDbc = NULL;
SDecoder dc = {0};
/* get ctbEntry */
tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn);
tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c);
ASSERT(c == 0);
tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData);
ctbEntry.pBuf = taosMemoryMalloc(nData);
memcpy(ctbEntry.pBuf, pData, nData);
tDecoderInit(&dc, ctbEntry.pBuf, nData);
metaDecodeEntry(&dc, &ctbEntry);
tDecoderClear(&dc);
/* get stbEntry*/
tdbDbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
tdbDbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
(void **)&stbEntry.pBuf, &nVal);
tdbFree(pVal);
tDecoderInit(&dc, stbEntry.pBuf, nVal);
metaDecodeEntry(&dc, &stbEntry);
tDecoderClear(&dc);
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
SSchema *pColumn = NULL;
int32_t iCol = 0;
for (;;) {
pColumn = NULL;
if (iCol >= pTagSchema->nCols) break;
pColumn = &pTagSchema->pSchema[iCol];
if (strcmp(pColumn->name, pAlterTbReq->tagName) == 0) break;
iCol++;
}
if (pColumn == NULL) {
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
goto _err;
}
{
// TODO:
}
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
return 0;
_err:
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
tdbDbcClose(pTbDbc);
tdbDbcClose(pUidIdxc);
return -1;
}
static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
......
......@@ -861,6 +861,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
#endif
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0);
return 0;
}
// TODO: persist meta into tdb
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqRebVgReq req = {0};
......@@ -1087,35 +1095,6 @@ int32_t tqProcessStreamTrigger2(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
}
int32_t tqProcessTaskExec2(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTaskExecReq req = {0};
tDecodeSStreamTaskExecReq(msg, &req);
int32_t taskId = req.taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
ASSERT(pTask);
ASSERT(pTask->inputType == TASK_INPUT_TYPE__DATA_BLOCK);
// enqueue
int32_t inputStatus = streamEnqueueDataBlk(pTask, (SStreamDataBlock*)req.data);
if (inputStatus == TASK_INPUT_STATUS__BLOCKED) {
// TODO rsp blocked
return 0;
}
// try exec
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
if (execStatus == TASK_STATUS__IDLE) {
if (streamTaskRun(pTask) < 0) {
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING);
goto FAIL;
}
} else if (execStatus == TASK_STATUS__EXECUTING) {
return 0;
}
// TODO rsp success
//
return 0;
FAIL:
return -1;
}
......@@ -101,6 +101,11 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
// TODO: handle error
}
break;
case TDMT_VND_MQ_VG_DELETE:
if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
// TODO: handle error
}
break;
case TDMT_VND_TASK_DEPLOY: {
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
......@@ -199,7 +204,7 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
......@@ -462,7 +467,11 @@ _exit:
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVAlterTbReq vAlterTbReq = {0};
SVAlterTbRsp vAlterTbRsp = {0};
SDecoder dc = {0};
int rcode = 0;
int ret;
SEncoder ec = {0};
pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
pRsp->pCont = NULL;
......@@ -473,19 +482,27 @@ static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, i
// decode
if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
pRsp->code = TSDB_CODE_INVALID_MSG;
vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
tDecoderClear(&dc);
return -1;
rcode = -1;
goto _exit;
}
// process
if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq) < 0) {
pRsp->code = terrno;
vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
tDecoderClear(&dc);
return -1;
rcode = -1;
goto _exit;
}
tDecoderClear(&dc);
_exit:
tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
tEncoderClear(&ec);
return 0;
}
......
......@@ -105,7 +105,12 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx);
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t sampleFunction(SqlFunctionCtx* pCtx);
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t tailFunction(SqlFunctionCtx* pCtx);
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
......
......@@ -386,6 +386,35 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS;
}
static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (2 != numOfParams && 3 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The input parameter of TAIL function can only be column");
}
for (int32_t i = 1; i < numOfParams; ++i) {
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
if (!IS_INTEGER_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
}
SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
uint8_t colType = pCol->resType.type;
if (IS_VAR_DATA_TYPE(colType)) {
pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType};
} else {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType};
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// todo
return TSDB_CODE_SUCCESS;
......@@ -850,6 +879,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = sampleFunction,
.finalizeFunc = NULL
},
{
.name = "tail",
.type = FUNCTION_TYPE_TAIL,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateTail,
.getEnvFunc = getTailFuncEnv,
.initFunc = tailFunctionSetup,
.processFunc = tailFunction,
.finalizeFunc = tailFinalize
},
{
.name = "abs",
.type = FUNCTION_TYPE_ABS,
......
......@@ -18,12 +18,15 @@
#include "function.h"
#include "querynodes.h"
#include "taggfunction.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tpercentile.h"
#define HISTOGRAM_MAX_BINS_NUM 1000
#define MAVG_MAX_POINTS_NUM 1000
#define SAMPLE_MAX_POINTS_NUM 1000
#define TAIL_MAX_POINTS_NUM 100
#define TAIL_MAX_OFFSET 100
typedef struct SSumRes {
union {
......@@ -161,6 +164,21 @@ typedef struct SSampleInfo {
int64_t *timestamp;
} SSampleInfo;
typedef struct STailItem {
int64_t timestamp;
bool isNull;
char data[];
} STailItem;
typedef struct STailInfo {
int32_t numOfPoints;
int32_t numAdded;
int32_t offset;
uint8_t colType;
int16_t colBytes;
STailItem **pItems;
} STailInfo;
#define SET_VAL(_info, numOfElem, res) \
do { \
if ((numOfElem) <= 0) { \
......@@ -3107,7 +3125,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (colDataIsNull_s(pInputCol, i)) {
//colDataAppendNULL(pOutput, i);
continue;
}
......@@ -3141,3 +3159,132 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
//
// return pResInfo->numOfRes;
//}
bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
int32_t numOfPoints = pVal->datum.i;
pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailItem) + pCol->node.resType.bytes);
return true;
}
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
STailInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
pInfo->numAdded = 0;
pInfo->numOfPoints = pCtx->param[1].param.i;
if (pCtx->numOfParams == 4) {
pInfo->offset = pCtx->param[2].param.i;
} else {
pInfo->offset = 0;
}
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
if ((pInfo->numOfPoints < 1 || pInfo->numOfPoints > TAIL_MAX_POINTS_NUM) ||
(pInfo->numOfPoints < 0 || pInfo->numOfPoints > TAIL_MAX_OFFSET)) {
return false;
}
pInfo->pItems = (STailItem **)((char *)pInfo + sizeof(STailInfo));
char *pItem = (char *)pInfo->pItems + pInfo->numOfPoints * POINTER_BYTES;
size_t unitSize = sizeof(STailItem) + pInfo->colBytes;
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
pInfo->pItems[i] = (STailItem *)(pItem + i * unitSize);
pInfo->pItems[i]->isNull = false;
}
return true;
}
static void tailAssignResult(STailItem* pItem, char *data, int32_t colBytes, TSKEY ts, bool isNull) {
pItem->timestamp = ts;
if (isNull) {
pItem->isNull = true;
} else {
memcpy(pItem->data, data, colBytes);
}
}
static int32_t tailCompFn(const void *p1, const void *p2, const void *param) {
STailItem *d1 = *(STailItem **)p1;
STailItem *d2 = *(STailItem **)p2;
return compareInt64Val(&d1->timestamp, &d2->timestamp);
}
static void doTailAdd(STailInfo* pInfo, char *data, TSKEY ts, bool isNull) {
STailItem **pList = pInfo->pItems;
if (pInfo->numAdded < pInfo->numOfPoints) {
tailAssignResult(pList[pInfo->numAdded], data, pInfo->colBytes, ts, isNull);
taosheapsort((void *)pList, sizeof(STailItem **), pInfo->numAdded + 1, NULL, tailCompFn, 0);
pInfo->numAdded++;
} else if (pList[0]->timestamp < ts) {
tailAssignResult(pList[0], data, pInfo->colBytes, ts, isNull);
taosheapadjust((void *)pList, sizeof(STailItem **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0);
}
}
int32_t tailFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STailInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset;
if (pInfo->offset >= pInput->numOfRows) {
return 0;
} else {
pInfo->numOfPoints = MIN(pInfo->numOfPoints, pInput->numOfRows - pInfo->offset);
}
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex - pInfo->offset; i += 1) {
char* data = colDataGetData(pInputCol, i);
doTailAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i));
}
taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn);
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
int32_t pos = startOffset + i;
STailItem *pItem = pInfo->pItems[i];
if (pItem->isNull) {
colDataAppendNULL(pOutput, pos);
} else {
colDataAppend(pOutput, pos, pItem->data, false);
}
}
return pInfo->numOfPoints;
}
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STailInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo);
pEntryInfo->complete = true;
int32_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
// todo assign the tag value and the corresponding row data
int32_t currentRow = pBlock->info.rows;
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STailItem *pItem = pInfo->pItems[i];
colDataAppend(pCol, currentRow, pItem->data, false);
//setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow);
currentRow += 1;
}
return pEntryInfo->numOfRes;
}
......@@ -312,6 +312,7 @@ typedef struct SUdfcFuncStub {
char udfName[TSDB_FUNC_NAME_LEN];
UdfcFuncHandle handle;
int32_t refCount;
int64_t lastRefTime;
} SUdfcFuncStub;
typedef struct SUdfcProxy {
......@@ -1446,6 +1447,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
*pHandle = foundStub->handle;
++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
return 0;
}
*pHandle = NULL;
......@@ -1455,6 +1457,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
strcpy(stub.udfName, udfName);
stub.handle = *pHandle;
++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else {
......@@ -1662,7 +1665,8 @@ int32_t cleanUpUdfs() {
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle);
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
taosArrayPush(udfStubs, stub);
}
++i;
......
......@@ -485,7 +485,146 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
uv_stop(global.loop);
}
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
typedef enum EUdfdRpcReqRspType {
UDFD_RPC_MNODE_CONNECT = 0,
UDFD_RPC_RETRIVE_FUNC,
} EUdfdRpcReqRspType;
typedef struct SUdfdRpcSendRecvInfo {
EUdfdRpcReqRspType rpcType;
int32_t code;
void* param;
uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo;
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle;
ASSERT(pMsg->ahandle != NULL);
if (pEpSet) {
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&global.mgmtEp, pEpSet);
}
}
if (pMsg->code != TSDB_CODE_SUCCESS) {
fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
msgInfo->code = pMsg->code;
goto _return;
}
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
if (connectRsp.epSet.numOfEps == 0) {
msgInfo->code = TSDB_CODE_MND_APP_ERROR;
goto _return;
}
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
}
msgInfo->code = 0;
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf* udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
taosArrayDestroy(retrieveRsp.pFuncInfos);
msgInfo->code = 0;
}
_return:
rpcFreeCont(pMsg->pCont);
uv_sem_post(&msgInfo->resultSem);
return;
}
int32_t udfdConnectToMNode() {
SConnectReq connReq = {0};
connReq.connType = CONN_TYPE__UDFD;
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
connReq.pid = htonl(taosGetPId());
connReq.startTime = htobe64(taosGetTimestampMs());
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connReq);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_MND_CONNECT;
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
int32_t code = msgInfo->code;
uv_sem_destroy(&msgInfo->resultSem);
taosMemoryFree(msgInfo);
return code;
}
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
msgInfo->param = udf;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
uv_sem_destroy(&msgInfo->resultSem);
int32_t code = msgInfo->code;
taosMemoryFree(msgInfo);
return code;
}
static bool udfdRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
pEpSet->version = 0;
......@@ -528,69 +667,30 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
return 0;
}
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
SRpcMsg rpcRsp = {0};
rpcSendRecv(clientRpc, &global.mgmtEp.epSet, &rpcMsg, &rpcRsp);
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", udfName);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
taosArrayDestroy(retrieveRsp.pFuncInfos);
rpcFreeCont(rpcRsp.pCont);
return 0;
}
int32_t udfdOpenClientRpc() {
char *pass = "taosdata";
char *user = "root";
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit = {0};
rpcInit.label = (char *)"UDFD";
rpcInit.label = "UDFD";
rpcInit.numOfThreads = 1;
rpcInit.cfp = udfdProcessRpcRsp;
rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
rpcInit.user = (char *)user;
rpcInit.ckey = (char *)"key";
rpcInit.secret = (char *)secretEncrypt;
rpcInit.spi = 1;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
rpcInit.secret = pass;
global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
return -1;
}
return 0;
}
......@@ -700,12 +800,6 @@ static int32_t udfdRun() {
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
uv_mutex_init(&global.udfsMutex);
// TOOD: client rpc to fetch udf function info from mnode
if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure");
return -1;
}
if (udfdUvInit() != 0) {
fnError("uv init failure");
return -2;
......@@ -717,7 +811,6 @@ static int32_t udfdRun() {
int codeClose = uv_loop_close(global.loop);
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
removeListeningPipe();
udfdCloseClientRpc();
uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash);
return 0;
......@@ -746,9 +839,22 @@ int main(int argc, char *argv[]) {
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error");
return -1;
return -2;
}
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
return udfdRun();
if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure");
return -3;
}
if (udfdConnectToMNode() != 0) {
fnError("failed to start since can not connect to mnode");
return -4;
}
udfdRun();
udfdCloseClientRpc();
}
......@@ -33,11 +33,17 @@ typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b);
TExeCond tCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b, int8_t dType);
TExeCond tDoCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b);
_cache_range_compare indexGetCompare(RangeType ty);
int32_t indexConvertData(void* src, int8_t type, void** dst);
int32_t indexConvertDataToStr(void* src, int8_t type, void** dst);
int32_t indexGetDataByteLen(int8_t type);
char* indexInt2str(int64_t val, char* dst, int radix);
#ifdef __cplusplus
}
......
......@@ -109,17 +109,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
taosThreadMutexInit(&sIdx->mtx, NULL);
sIdx->refId = indexAddRef(sIdx);
taosAcquireRef(indexRefMgt, sIdx->refId);
indexAcquireRef(sIdx->refId);
*index = sIdx;
return 0;
END:
if (sIdx != NULL) {
indexClose(sIdx);
}
*index = NULL;
return -1;
}
......@@ -273,7 +271,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
tm->nColName = nColName;
char* buf = NULL;
int32_t len = indexConvertData((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf);
int32_t len = indexConvertDataToStr((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf);
assert(len != -1);
tm->colVal = buf;
......
......@@ -282,8 +282,10 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
if (0 != strncmp(c->colVal, pCt->colVal, skip)) {
break;
}
char* p = taosMemoryCalloc(1, strlen(c->colVal) + 1);
memcpy(p, c->colVal, strlen(c->colVal));
TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType);
TExeCond cond = cmpFn(p + skip, term->colVal, dType);
if (cond == MATCH) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
......@@ -297,6 +299,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
} else if (cond == BREAK) {
break;
}
taosMemoryFree(p);
}
taosMemoryFree(pCt);
......@@ -463,7 +466,6 @@ int indexCacheSchedToMerge(IndexCache* pCache) {
// schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t));
// memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t));
schedMsg.msg = NULL;
indexAcquireRef(pCache->index->refId);
taosScheduleTask(indexQhandle, &schedMsg);
......
......@@ -19,10 +19,38 @@
#include "tcoding.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "ttypes.h"
char JSON_COLUMN[] = "JSON";
char JSON_VALUE_DELIM = '&';
char* indexInt2str(int64_t val, char* dst, int radix) {
char buffer[65];
char* p;
int64_t new_val;
uint64_t uval = (uint64_t)val;
if (radix < 0) {
if (val < 0) {
*dst++ = '-';
uval = (uint64_t)0 - uval; /* Avoid integer overflow in (-val) for LLONG_MIN (BUG#31799). */
}
}
p = &buffer[sizeof(buffer) - 1];
*p = '\0';
new_val = (int64_t)(uval / 10);
*--p = '0' + (char)(uval - (uint64_t)new_val * 10);
val = new_val;
while (val != 0) {
new_val = val / 10;
*--p = '0' + (char)(val - new_val * 10);
val = new_val;
}
while ((*dst++ = *p++) != 0)
;
return dst - 1;
}
static __compar_fn_t indexGetCompar(int8_t type) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
return (__compar_fn_t)strcmp;
......@@ -31,25 +59,49 @@ static __compar_fn_t indexGetCompar(int8_t type) {
}
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
return tCompare(func, QUERY_LESS_THAN, a, b, type);
}
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
return tCompare(func, QUERY_LESS_EQUAL, a, b, type);
}
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
return tCompare(func, QUERY_GREATER_THAN, a, b, type);
}
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
return tCompare(func, QUERY_GREATER_EQUAL, a, b, type);
}
TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) {
if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR) {
return tDoCompare(func, cmptype, a, b);
}
#if 1
int8_t bytes = tDataTypes[dtype].bytes;
if (bytes == 1) {
int8_t va = taosStr2int64(a);
int8_t vb = taosStr2int64(b);
return tDoCompare(func, cmptype, &va, &vb);
} else if (bytes == 2) {
int16_t va = taosStr2int64(a);
int16_t vb = taosStr2int64(b);
return tDoCompare(func, cmptype, &va, &vb);
} else if (bytes == 4) {
int32_t va = taosStr2int64(a);
int32_t vb = taosStr2int64(b);
return tDoCompare(func, cmptype, &va, &vb);
} else {
int64_t va = taosStr2int64(a);
int64_t vb = taosStr2int64(b);
return tDoCompare(func, cmptype, &va, &vb);
}
#endif
}
TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
// optime later
int32_t ret = func(a, b);
switch (comType) {
switch (comparType) {
case QUERY_LESS_THAN: {
if (ret < 0) return MATCH;
} break;
......@@ -174,9 +226,9 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) {
tlen = taosEncodeFixedU32(dst, *(uint32_t*)src);
break;
case TSDB_DATA_TYPE_BIGINT:
tlen = taosEncodeFixedI64(NULL, *(uint32_t*)src);
tlen = taosEncodeFixedI64(NULL, *(int64_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedI64(dst, *(uint32_t*)src);
tlen = taosEncodeFixedI64(dst, *(int64_t*)src);
break;
case TSDB_DATA_TYPE_DOUBLE:
tlen = taosEncodeBinary(NULL, src, sizeof(double));
......@@ -184,9 +236,9 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) {
tlen = taosEncodeBinary(dst, src, sizeof(double));
break;
case TSDB_DATA_TYPE_UBIGINT:
tlen = taosEncodeFixedU64(NULL, *(uint32_t*)src);
tlen = taosEncodeFixedU64(NULL, *(uint64_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedU64(dst, *(uint32_t*)src);
tlen = taosEncodeFixedU64(dst, *(uint64_t*)src);
break;
case TSDB_DATA_TYPE_NCHAR: {
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
......@@ -215,14 +267,94 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) {
break;
}
*dst = *dst - tlen;
if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR && type != TSDB_DATA_TYPE_VARBINARY &&
type == TSDB_DATA_TYPE_VARCHAR) {
uint8_t* p = *dst;
for (int i = 0; i < tlen; i++) {
if (p[i] == 0) {
p[i] = (uint8_t)'0';
}
// indexMayFillNumbericData(*dst, tlen);
return tlen;
}
int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) {
int tlen = tDataTypes[type].bytes;
switch (type) {
case TSDB_DATA_TYPE_TIMESTAMP:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(int64_t*)src, *dst, -1);
break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
// tlen = taosEncodeFixedU8(NULL, *(uint8_t*)src);
//*dst = taosMemoryCalloc(1, tlen + 1);
// tlen = taosEncodeFixedU8(dst, *(uint8_t*)src);
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(uint8_t*)src, *dst, 1);
break;
case TSDB_DATA_TYPE_TINYINT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(int8_t*)src, *dst, 1);
break;
case TSDB_DATA_TYPE_SMALLINT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(int16_t*)src, *dst, -1);
break;
case TSDB_DATA_TYPE_USMALLINT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(uint16_t*)src, *dst, -1);
break;
case TSDB_DATA_TYPE_INT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(int32_t*)src, *dst, -1);
break;
case TSDB_DATA_TYPE_FLOAT:
tlen = taosEncodeBinary(NULL, src, sizeof(float));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, sizeof(float));
*dst = *dst - tlen;
break;
case TSDB_DATA_TYPE_UINT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(uint32_t*)src, *dst, 1);
break;
case TSDB_DATA_TYPE_BIGINT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(int64_t*)src, *dst, 1);
break;
case TSDB_DATA_TYPE_DOUBLE:
tlen = taosEncodeBinary(NULL, src, sizeof(double));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, sizeof(double));
*dst = *dst - tlen;
break;
case TSDB_DATA_TYPE_UBIGINT:
assert(0);
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(uint64_t*)src, *dst, 1);
break;
case TSDB_DATA_TYPE_NCHAR: {
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src));
*dst = *dst - tlen;
break;
}
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
#if 1
tlen = taosEncodeBinary(NULL, src, strlen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, strlen(src));
*dst = *dst - tlen;
break;
#endif
}
case TSDB_DATA_TYPE_VARBINARY:
#if 1
tlen = taosEncodeBinary(NULL, src, strlen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, strlen(src));
*dst = *dst - tlen;
break;
#endif
default:
TASSERT(0);
break;
}
return tlen;
}
......@@ -1324,7 +1324,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) {
taosArrayPop(sws->inp);
}
streamStateDestroy(p);
// streamStateDestroy(p);
continue;
}
FstTransition trn;
......
......@@ -410,8 +410,9 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult*
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
cost = taosGetTimestampUs() - et;
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
tem->colName, tem->colVal, cost);
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64
", size: %d, time cost: %" PRIu64 "us",
tem->suid, tem->colName, tem->colVal, offset, (int)taosArrayGetSize(tr->total), cost);
}
fstSliceDestroy(&key);
return 0;
......@@ -941,7 +942,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
// TODO(yihao): opt later
WriterCtx* ctx = reader->ctx;
// add block cache
char block[1024] = {0};
char block[4096] = {0};
int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
assert(nread >= sizeof(uint32_t));
......
......@@ -56,6 +56,29 @@ class JsonEnv : public ::testing::Test {
SIndexJson* index;
};
static void WriteData(SIndexJson* index, const std::string& colName, int8_t dtype, void* data, int dlen, int tableId,
int8_t operType = ADD_VALUE) {
SIndexTerm* term =
indexTermCreate(1, (SIndexOperOnColumn)operType, dtype, colName.c_str(), colName.size(), (const char*)data, dlen);
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, (int64_t)tableId);
indexMultiTermDestroy(terms);
}
static void Search(SIndexJson* index, const std::string& colNam, int8_t dtype, void* data, int dlen, int8_t filterType,
SArray** result) {
std::string colName(colNam);
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, dtype, colName.c_str(), colName.size(), (const char*)data, dlen);
SArray* res = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, (EIndexQueryType)filterType);
tIndexJsonSearch(index, mq, res);
indexMultiTermQueryDestroy(mq);
*result = res;
}
TEST_F(JsonEnv, testWrite) {
{
std::string colName("test");
......@@ -204,9 +227,10 @@ TEST_F(JsonEnv, testWriteMillonData) {
TEST_F(JsonEnv, testWriteJsonNumberData) {
{
std::string colName("test");
std::string colVal("10");
// std::string colVal("10");
int val = 10;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
(const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -217,9 +241,9 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
}
{
std::string colName("test2");
std::string colVal("20");
int val = 20;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
(const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -229,10 +253,10 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
indexMultiTermDestroy(terms);
}
{
std::string colName("test2");
std::string colVal("15");
std::string colName("test");
int val = 15;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
(const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -243,9 +267,9 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
}
{
std::string colName("test2");
std::string colVal("15");
const char* val = "test";
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
(const char*)val, strlen(val));
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -255,12 +279,11 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
indexMultiTermDestroy(terms);
}
{
std::string colName("test");
std::string colVal("10");
std::string colName("test");
int val = 15;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
......@@ -270,11 +293,11 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
}
{
std::string colName("test");
std::string colVal("10");
int val = 15;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
......@@ -284,11 +307,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
}
{
std::string colName("test");
std::string colVal("10");
int val = 10;
;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(int));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
......@@ -298,11 +322,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
}
{
std::string colName("test");
std::string colVal("10");
int val = 10;
// std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
......@@ -312,11 +337,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
}
{
std::string colName("test");
std::string colVal("10");
int val = 10;
// std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
......@@ -326,12 +352,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
}
}
TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
{
std::string colName("test1");
std::string colVal("10");
int val = 10;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
(const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -355,11 +381,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
}
{
std::string colName("test1");
std::string colVal("10");
int val = 10;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
......@@ -369,11 +395,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
}
{
std::string colName("test1");
std::string colVal("10");
int val = 10;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(int));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
......@@ -383,11 +409,12 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
}
{
std::string colName("test1");
std::string colVal("10");
// std::string colVal("10");
int val = 10;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
......@@ -397,11 +424,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
}
{
std::string colName("test1");
std::string colVal("10");
int val = 10;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
......@@ -411,11 +438,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
}
{
std::string colName("test1");
std::string colVal("10");
int val = 10;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
......@@ -425,9 +452,10 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
}
{
std::string colName("other_column");
std::string colVal("100");
int val = 100;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
(const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
......@@ -438,11 +466,12 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
}
{
std::string colName("test1");
std::string colVal("10");
int val = 10;
// std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(),
colVal.size());
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
......@@ -450,4 +479,102 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
{
std::string colName("test1");
int val = 15;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i + 1000);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("test1");
int val = 8;
// std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(2000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
}
TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT2) {
{
int val = 10;
std::string colName("test1");
for (int i = 0; i < 10000; i++) {
val += 1;
WriteData(index, colName, TSDB_DATA_TYPE_INT, &val, sizeof(val), i);
}
}
{
int val = 10;
std::string colName("test2xxx");
std::string colVal("xxxxxxxxxxxxxxx");
for (int i = 0; i < 100000; i++) {
val += 1;
WriteData(index, colName, TSDB_DATA_TYPE_BINARY, (void*)(colVal.c_str()), colVal.size(), i);
}
}
{
SArray* res = NULL;
std::string colName("test1");
int val = 9;
Search(index, colName, TSDB_DATA_TYPE_INT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(10000, taosArrayGetSize(res));
}
{
SArray* res = NULL;
std::string colName("test2xxx");
std::string colVal("xxxxxxxxxxxxxxx");
Search(index, colName, TSDB_DATA_TYPE_BINARY, (void*)(colVal.c_str()), colVal.size(), QUERY_TERM, &res);
EXPECT_EQ(100000, taosArrayGetSize(res));
}
}
TEST_F(JsonEnv, testWriteJsonTfileAndCache_FLOAT) {
{
float val = 10.0;
std::string colName("test1");
for (int i = 0; i < 1000; i++) {
WriteData(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), i);
}
}
{
float val = 2.0;
std::string colName("test1");
for (int i = 0; i < 1000; i++) {
WriteData(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), i);
}
}
{
SArray* res = NULL;
std::string colName("test1");
float val = 1.9;
Search(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(2000, taosArrayGetSize(res));
}
{
SArray* res = NULL;
std::string colName("test1");
float val = 2.1;
Search(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(1000, taosArrayGetSize(res));
}
{
std::string colName("test1");
SArray* res = NULL;
float val = 2.1;
Search(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(1000, taosArrayGetSize(res));
}
}
......@@ -6,12 +6,14 @@
#include <vector>
#include "index.h"
#include "indexCache.h"
#include "indexComm.h"
#include "indexFst.h"
#include "indexFstCountingWriter.h"
#include "indexFstUtil.h"
#include "indexInt.h"
#include "indexTfile.h"
#include "indexUtil.h"
#include "tcoding.h"
#include "tglobal.h"
#include "tskiplist.h"
#include "tutil.h"
......@@ -305,3 +307,17 @@ TEST_F(UtilEnv, 01Except) {
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 0), 1);
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100);
}
TEST_F(UtilEnv, testFill) {
for (int i = 0; i < 10000000; i++) {
int64_t val = i;
char buf[65] = {0};
indexInt2str(val, buf, 1);
EXPECT_EQ(val, taosStr2int64(buf));
}
for (int i = 0; i < 10000000; i++) {
int64_t val = 0 - i;
char buf[65] = {0};
indexInt2str(val, buf, -1);
EXPECT_EQ(val, taosStr2int64(buf));
}
}
......@@ -35,7 +35,7 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
return (void*)buf;
}
static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
SStreamTaskExecReq req = {
.streamId = pTask->streamId,
.data = data,
......@@ -107,7 +107,7 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
SArray* pData = *(SArray**)pIter;
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet;
if (streamBuildDispatchMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) {
if (streamBuildExecMsg(pTask, pData, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
......@@ -133,7 +133,7 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
return inputStatus;
}
int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) {
void* exec = pTask->exec.runners[0].executor;
// set input
......@@ -265,87 +265,42 @@ FAIL:
return -1;
}
int32_t streamTaskDispatchDown(SStreamTask* pTask, SMsgCb* pMsgCb) {
//
return 0;
}
int32_t streamTaskSink(SStreamTask* pTask) {
//
return 0;
}
int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* pBlock, SRpcMsg* pRsp) {
// 1. handle input
// 1.1 enqueue
taosWriteQitem(pTask->inputQ, pBlock);
// 1.2 calc back pressure
// 1.3 rsp by input status
int8_t inputStatus = atomic_load_8(&pTask->inputStatus);
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
pCont->status = inputStatus;
pRsp->pCont = pCont;
pRsp->contLen = sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
// 2. try exec
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
int32_t streamTaskSink(SStreamTask* pTask, SMsgCb* pMsgCb) {
bool firstRun = 1;
while (1) {
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
if (execStatus == TASK_STATUS__IDLE) {
void* exec = pTask->exec.runners[0].executor;
SArray* pRes = taosArrayInit(0, sizeof(void*));
const SArray* blocks = pBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
while (1) {
SSDataBlock* output;
uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) break;
taosArrayPush(pRes, &output);
}
// TODO: wrap destroy block
taosArrayDestroyP(pBlock->blocks, (FDelete)blockDataDestroy);
if (taosArrayGetSize(pRes) != 0) {
SArray** resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
*resQ = pRes;
taosWriteQitem(pTask->outputQ, resQ);
SStreamDataBlock* pBlock = NULL;
if (!firstRun) {
taosReadAllQitems(pTask->outputQ, pTask->outputQAll);
}
taosGetQitem(pTask->outputQAll, (void**)&pBlock);
if (pBlock == NULL) {
if (firstRun) {
firstRun = 0;
continue;
} else {
break;
}
} else if (execStatus == TASK_STATUS__CLOSING) {
continue;
} else if (execStatus == TASK_STATUS__EXECUTING)
break;
else {
ASSERT(0);
}
}
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
STaosQall* qall = taosAllocateQall();
taosReadAllQitems(pTask->outputQ, qall);
SArray** ppRes = NULL;
while (1) {
taosGetQitem(qall, (void**)&ppRes);
if (ppRes == NULL) break;
SArray* pRes = *ppRes;
SArray* pRes = pBlock->blocks;
// sink
if (pTask->sinkType == TASK_SINK__TABLE) {
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->sourceVer, pRes);
// blockDebugShowData(pRes);
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes);
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes);
//
} else if (pTask->sinkType == TASK_SINK__FETCH) {
//
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
}
// dispatch
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SRpcMsg dispatchMsg = {0};
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
ASSERT(0);
return -1;
}
......@@ -366,7 +321,7 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
......@@ -401,75 +356,58 @@ int32_t streamTaskProcessInputReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDat
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
}
}
//
return 0;
}
int32_t streamTaskProcessDispatchRsp(SStreamTask* pTask, char* msg, int32_t msgLen) {
//
return 0;
}
int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
int8_t status;
// 1.1 update status
// TODO cal backpressure
if (pBlock == NULL) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
status = TASK_INPUT_STATUS__FAILED;
} else {
status = atomic_load_8(&pTask->inputStatus);
}
// 1.2 enqueue
pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pBlock->sourceVg = pReq->sourceVg;
pBlock->sourceVer = pReq->sourceVer;
taosWriteQitem(pTask->inputQ, pBlock);
// 1.3 rsp by input status
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
pCont->inputStatus = status;
pRsp->pCont = pCont;
pRsp->contLen = sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) {
//
return 0;
}
int32_t streamTaskRun(SStreamTask* pTask) {
SArray* pRes = NULL;
if (pTask->execType == TASK_EXEC__PIPE || pTask->execType == TASK_EXEC__MERGE) {
// TODO remove multi runner
void* exec = pTask->exec.runners[0].executor;
int8_t status = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING);
if (status == TASK_STATUS__IDLE) {
pRes = taosArrayInit(0, sizeof(void*));
if (pRes == NULL) {
return -1;
}
int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
// 1. handle input
streamTaskEnqueue(pTask, pReq, pRsp);
void* input = NULL;
taosWriteQitem(pTask->inputQ, &input);
if (input == NULL) return 0;
// TODO: fix type
if (pTask->sourceType == TASK_SOURCE__SCAN) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)input;
qSetStreamInput(exec, pSubmit->data, STREAM_DATA_TYPE_SUBMIT_BLOCK);
while (1) {
SSDataBlock* output;
uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) break;
taosArrayPush(pRes, &output);
}
streamDataSubmitRefDec(pSubmit);
} else {
SStreamDataBlock* pStreamBlock = (SStreamDataBlock*)input;
const SArray* blocks = pStreamBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
while (1) {
SSDataBlock* output;
uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(false);
}
if (output == NULL) break;
taosArrayPush(pRes, &output);
}
// TODO: wrap destroy block
taosArrayDestroyP(pStreamBlock->blocks, (FDelete)blockDataDestroy);
}
// 2. try exec
// 2.1. idle: exec
// 2.2. executing: return
// 2.3. closing: keep trying
streamTaskExec2(pTask, pMsgCb);
if (taosArrayGetSize(pRes) != 0) {
SArray** resQ = taosAllocateQitem(sizeof(void**), DEF_QITEM);
*resQ = pRes;
taosWriteQitem(pTask->outputQ, resQ);
}
}
}
// 3. handle output
// 3.1 check and set status
// 3.2 dispatch / sink
streamTaskSink(pTask, pMsgCb);
return 0;
}
int32_t streamTaskProcessRecoverReq(SStreamTask* pTask, char* msg) {
//
return 0;
}
......@@ -545,7 +483,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
SRpcMsg dispatchMsg = {0};
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, NULL) < 0) {
ASSERT(0);
return -1;
}
......@@ -566,7 +504,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
if (streamBuildExecMsg(pTask, pRes, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
return -1;
}
......
......@@ -285,6 +285,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid qu
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer waiting for rebalance")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
// mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
......
......@@ -7,7 +7,7 @@ system sh/cfg.sh -n dnode1 -c udf -v 1
print ========= start dnode1 as LEADER
system sh/exec.sh -n dnode1 -s start
sleep 2000
sleep 1000
sql connect
print ======== step1 udf
......
......@@ -220,7 +220,6 @@ if $data[0][4] == LEADER then
print ---- vgroup $data[0][0] leader switch to dnode $data[0][3]
elif $data[0][6] == LEADER then
print ---- vgroup $data[0][0] leader switch to dnode $data[0][5]
endi
elif $data[0][8] == LEADER then
print ---- vgroup $data[0][0] leader switch to dnode $data[0][7]
else
......@@ -342,7 +341,6 @@ elif $data[0][6] == LEADER then
goto check_vg_ready_3
endi
print ---- vgroup $data[0][0] leader locating dnode $data[0][7]
endi
elif $data[0][8] == LEADER then
if $data[0][4] == LEADER then
goto check_vg_ready_3
......
......@@ -420,7 +420,6 @@ elif $data[0][6] == LEADER then
goto check_vg_ready_3
endi
print ---- vgroup $data[0][0] leader locating dnode $data[0][7]
endi
elif $data[0][8] == LEADER then
if $data[0][4] == LEADER then
goto check_vg_ready_3
......
此差异已折叠。
import datetime
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
PRIMARY_COL = "ts"
INT_COL = "c1"
BINT_COL = "c2"
SINT_COL = "c3"
TINT_COL = "c4"
FLOAT_COL = "c5"
DOUBLE_COL = "c6"
BOOL_COL = "c7"
BINARY_COL = "c8"
NCHAR_COL = "c9"
TS_COL = "c10"
NUM_COL = [ INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
CHAR_COL = [ BINARY_COL, NCHAR_COL, ]
BOOLEAN_COL = [ BOOL_COL, ]
TS_TYPE_COL = [ TS_COL, ]
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def __query_condition(self,tbname):
query_condition = []
for char_col in CHAR_COL:
query_condition.extend(
(
f"{tbname}.{char_col}",
f"upper( {tbname}.{char_col} )",
f"char_length( {tbname}.{char_col} )",
f"concat( {tbname}.{char_col}, {tbname}.{char_col} )",
f"concat_ws( '_', {tbname}.{char_col}, {tbname}.{char_col} )",
f"length( {tbname}.{char_col} )",
f"lower( {tbname}.{char_col} )",
f"ltrim( {tbname}.{char_col} )",
f"rtrim( {tbname}.{char_col} )",
f"substr( {tbname}.{char_col}, 1 )",
f"count( {tbname}.{char_col} )",
f"cast( {tbname}.{char_col} as nchar(3) )",
f"cast( {tbname}.{char_col} as nchar(8) )",
)
)
query_condition.extend( f"cast( {tbname}.{un_char_col} as binary(16) ) " for un_char_col in NUM_COL)
query_condition.extend( f"cast( {tbname}.{char_col} + {tbname}.{char_col_2} as binary(32) ) " for char_col_2 in CHAR_COL )
query_condition.extend( f"cast( {tbname}.{char_col} + {tbname}.{un_char_col} as binary(32) ) " for un_char_col in NUM_COL )
for num_col in NUM_COL:
query_condition.extend(
(
f"{tbname}.{num_col}",
f"ceil( {tbname}.{num_col} )",
f"abs( {tbname}.{num_col} )",
f"acos( {tbname}.{num_col} )",
f"asin( {tbname}.{num_col} )",
f"atan( {tbname}.{num_col} )",
f"cos( {tbname}.{num_col} )",
f"floor( {tbname}.{num_col} )",
f"log( {tbname}.{num_col}, {tbname}.{num_col})",
f"sin( {tbname}.{num_col} )",
f"sqrt( {tbname}.{num_col} )",
f"tan( {tbname}.{num_col} )",
f"round( {tbname}.{num_col} )",
f"max( {tbname}.{num_col} )",
f"sum( {tbname}.{num_col} )",
f"count( {tbname}.{num_col} )",
f"min( {tbname}.{num_col} )",
)
)
query_condition.extend( f"{tbname}.{num_col} + {tbname}.{num_col_2}" for num_col_2 in NUM_COL )
query_condition.extend( f"{tbname}.{num_col} + {tbname}.{char_col} " for char_col in CHAR_COL )
query_condition.extend(
(
''' "test1234!@#$%^&*():'><?/.,][}{" ''',
''' "test12" ''',
# 1010,
)
)
return query_condition
def __join_condition(self, tb_list, filter=PRIMARY_COL, INNER=False):
table_reference = tb_list[0]
join_condition = table_reference
join = "inner join" if INNER else "join"
for i in range(len(tb_list[1:])):
join_condition += f" {join} {tb_list[i+1]} on {table_reference}.{filter}={tb_list[i+1]}.{filter}"
return join_condition
def __where_condition(self, col=None, tbname=None, query_conditon=None):
if query_conditon and isinstance(query_conditon, str):
if query_conditon.startswith("count"):
query_conditon = query_conditon[6:-1]
elif query_conditon.startswith("max"):
query_conditon = query_conditon[4:-1]
elif query_conditon.startswith("sum"):
query_conditon = query_conditon[4:-1]
elif query_conditon.startswith("min"):
query_conditon = query_conditon[4:-1]
if query_conditon:
return f" where {query_conditon} is not null"
if col in NUM_COL:
return f" where abs( {tbname}.{col} ) >= 0"
if col in CHAR_COL:
return f" where lower( {tbname}.{col} ) like 'bina%' or lower( {tbname}.{col} ) like '_cha%' "
if col in BOOLEAN_COL:
return f" where {tbname}.{col} in (false, true) "
if col in TS_TYPE_COL or col in PRIMARY_COL:
return f" where cast( {tbname}.{col} as binary(16) ) is not null "
return ""
def __group_condition(self, col, having = None):
if isinstance(col, str):
if col.startswith("count"):
col = col[6:-1]
elif col.startswith("max"):
col = col[4:-1]
elif col.startswith("sum"):
col = col[4:-1]
elif col.startswith("min"):
col = col[4:-1]
return f" group by {col} having {having}" if having else f" group by {col} "
def __single_sql(self, select_clause, from_clause, where_condition="", group_condition=""):
if isinstance(select_clause, str) and "on" not in from_clause and select_clause.split(".")[0] != from_clause.split(".")[0]:
return
return f"select {select_clause} from {from_clause} {where_condition} {group_condition}"
@property
def __join_tblist(self):
return [
["ct1", "ct2"],
["ct1", "ct4"],
["ct1", "t1"],
["ct2", "ct4"],
["ct2", "t1"],
["ct4", "t1"],
# ["ct1", "ct2", "ct4"],
# ["ct1", "ct2", "t1"],
# ["ct1", "ct4", "t1"],
# ["ct2", "ct4", "t1"],
# ["ct1", "ct2", "ct4", "t1"],
]
@property
def __tb_liast(self):
return [
"ct1",
"ct2",
"ct4",
"t1",
]
def sql_list(self):
sqls = []
__join_tblist = self.__join_tblist
for join_tblist in __join_tblist:
for join_tb in join_tblist:
select_claus_list = self.__query_condition(join_tb)
for select_claus in select_claus_list:
group_claus = self.__group_condition( col=select_claus)
where_claus = self.__where_condition(query_conditon=select_claus)
having_claus = self.__group_condition( col=select_claus, having=f"{select_claus} is not null")
sqls.extend(
(
self.__single_sql(select_claus, join_tb, where_claus, group_claus),
self.__single_sql(select_claus, join_tb, where_claus, having_claus),
self.__single_sql(select_claus, self.__join_condition(join_tblist), where_claus, having_claus),
self.__single_sql(select_claus, self.__join_condition(join_tblist, INNER=True), where_claus, having_claus),
self.__single_sql(select_claus, join_tb, where_claus),
self.__single_sql(select_claus, join_tb, having_claus),
self.__single_sql(select_claus, join_tb, group_claus),
self.__single_sql(select_claus, join_tb),
)
)
__no_join_tblist = self.__tb_liast
for tb in __no_join_tblist:
select_claus_list = self.__query_condition(tb)
for select_claus in select_claus_list:
group_claus = self.__group_condition(col=select_claus)
where_claus = self.__where_condition(query_conditon=select_claus)
having_claus = self.__group_condition(col=select_claus, having=f"{select_claus} is not null")
sqls.extend(
(
self.__single_sql(select_claus, join_tb, where_claus, group_claus),
self.__single_sql(select_claus, join_tb, where_claus, having_claus),
self.__single_sql(select_claus, join_tb, where_claus),
self.__single_sql(select_claus, join_tb, group_claus),
self.__single_sql(select_claus, join_tb, having_claus),
self.__single_sql(select_claus, join_tb),
)
)
return filter(None, sqls)
# return list(filter(None, sqls))
def __get_type(self, col):
if tdSql.cursor.istype(col, "BOOL"):
return "BOOL"
if tdSql.cursor.istype(col, "INT"):
return "INT"
if tdSql.cursor.istype(col, "BIGINT"):
return "BIGINT"
if tdSql.cursor.istype(col, "TINYINT"):
return "TINYINT"
if tdSql.cursor.istype(col, "SMALLINT"):
return "SMALLINT"
if tdSql.cursor.istype(col, "FLOAT"):
return "FLOAT"
if tdSql.cursor.istype(col, "DOUBLE"):
return "DOUBLE"
if tdSql.cursor.istype(col, "BINARY"):
return "BINARY"
if tdSql.cursor.istype(col, "NCHAR"):
return "NCHAR"
if tdSql.cursor.istype(col, "TIMESTAMP"):
return "TIMESTAMP"
if tdSql.cursor.istype(col, "JSON"):
return "JSON"
if tdSql.cursor.istype(col, "TINYINT UNSIGNED"):
return "TINYINT UNSIGNED"
if tdSql.cursor.istype(col, "SMALLINT UNSIGNED"):
return "SMALLINT UNSIGNED"
if tdSql.cursor.istype(col, "INT UNSIGNED"):
return "INT UNSIGNED"
if tdSql.cursor.istype(col, "BIGINT UNSIGNED"):
return "BIGINT UNSIGNED"
def union_check(self):
sqls = self.sql_list()
for sql1 in sqls:
tdSql.query(sql1)
res1_type = self.__get_type(0)
for sql2 in sqls:
tdSql.query(sql2)
union_type = False
res2_type = self.__get_type(0)
if res1_type in ( "BIGINT" , "NCHAR" ):
union_type = True
elif res2_type == res1_type:
union_type = True
elif res1_type == "TIMESAMP" and res2_type not in ("BINARY", "NCHAR"):
union_type = True
elif res1_type == "BINARY" and res2_type != "NCHAR":
union_type = True
if union_type:
tdSql.query(f"{sql1} union {sql2}")
tdSql.checkCols(1)
tdSql.query(f"{sql1} union all {sql2}")
tdSql.checkCols(1)
else:
tdSql.error(f"{sql1} union {sql2}")
def __test_error(self):
tdSql.error( "show tables union show tables" )
tdSql.error( "create table errtb1 union all create table errtb2" )
tdSql.error( "drop table ct1 union all drop table ct3" )
tdSql.error( "select c1 from ct1 union all drop table ct3" )
tdSql.error( "select c1 from ct1 union all '' " )
tdSql.error( " '' union all select c1 from ct1 " )
tdSql.error( "select c1 from ct1 union select c1 from ct2 union select c1 from ct4 ")
def all_test(self):
self.__test_error()
self.union_check()
def __create_tb(self):
tdLog.printNoPrefix("==========step1:create table")
create_stb_sql = f'''create table stb1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
) tags (t1 int)
'''
create_ntb_sql = f'''create table t1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
)
'''
tdSql.execute(create_stb_sql)
tdSql.execute(create_ntb_sql)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
{ i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}
def __insert_data(self, rows):
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(rows):
tdSql.execute(
f"insert into ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f"insert into ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f"insert into ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f'''insert into ct1 values
( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } )
( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } )
'''
)
tdSql.execute(
f'''insert into ct4 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
(
{ now_time + 5184000000}, {pow(2,31)-pow(2,15)}, {pow(2,63)-pow(2,30)}, 32767, 127,
{ 3.3 * pow(10,38) }, { 1.3 * pow(10,308) }, { rows % 2 }, "binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000}
)
(
{ now_time + 2592000000 }, {pow(2,31)-pow(2,16)}, {pow(2,63)-pow(2,31)}, 32766, 126,
{ 3.2 * pow(10,38) }, { 1.2 * pow(10,308) }, { (rows-1) % 2 }, "binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000}
)
'''
)
tdSql.execute(
f'''insert into ct2 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
(
{ now_time + 5184000000 }, { -1 * pow(2,31) + pow(2,15) }, { -1 * pow(2,63) + pow(2,30) }, -32766, -126,
{ -1 * 3.2 * pow(10,38) }, { -1.2 * pow(10,308) }, { rows % 2 }, "binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000 }
)
(
{ now_time + 2592000000 }, { -1 * pow(2,31) + pow(2,16) }, { -1 * pow(2,63) + pow(2,31) }, -32767, -127,
{ - 3.3 * pow(10,38) }, { -1.3 * pow(10,308) }, { (rows-1) % 2 }, "binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000 }
)
'''
)
for i in range(rows):
insert_data = f'''insert into t1 values
( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2},
"binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } )
'''
tdSql.execute(insert_data)
tdSql.execute(
f'''insert into t1 values
( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7200000 }, { pow(2,31) - pow(2,15) }, { pow(2,63) - pow(2,30) }, 32767, 127,
{ 3.3 * pow(10,38) }, { 1.3 * pow(10,308) }, { rows % 2 },
"binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000 }
)
(
{ now_time + 3600000 } , { pow(2,31) - pow(2,16) }, { pow(2,63) - pow(2,31) }, 32766, 126,
{ 3.2 * pow(10,38) }, { 1.2 * pow(10,308) }, { (rows-1) % 2 },
"binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000 }
)
'''
)
def run(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table")
self.__create_tb()
tdLog.printNoPrefix("==========step2:insert data")
self.rows = 10
self.__insert_data(self.rows)
tdLog.printNoPrefix("==========step3:all check")
self.all_test()
tdDnodes.stop(1)
tdDnodes.start(1)
tdSql.execute("use db")
tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -360,7 +360,7 @@ class TDTestCase:
# wait db ready
while 1:
tdSql.query("show databases")
if tdSql.getRows() == 4:
if tdSql.getRows() == 5:
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
break
else:
......
......@@ -54,5 +54,8 @@ python3 ./test.py -f 2-query/arcsin.py
python3 ./test.py -f 2-query/arccos.py
python3 ./test.py -f 2-query/arctan.py
python3 ./test.py -f 2-query/query_cols_tags_and_or.py
python3 ./test.py -f 2-query/nestedQuery.py
python3 ./test.py -f 7-tmq/basic5.py
python3 ./test.py -f 7-tmq/subscribeDb.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册