提交 c0907379 编写于 作者: “happyguoxy”

Merge branch '3.0' of github.com:taosdata/TDengine into test/TD-21161

cmake_minimum_required(VERSION 3.0) cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE ON) set(CMAKE_VERBOSE_MAKEFILE OFF)
set(TD_BUILD_TAOSA_INTERNAL FALSE) set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory #set output directory
......
# rocksdb # rocksdb
ExternalProject_Add(rocksdb ExternalProject_Add(rocksdb
GIT_REPOSITORY https://github.com/taosdata-contrib/rocksdb.git GIT_REPOSITORY https://github.com/facebook/rocksdb.git
GIT_TAG v6.23.3 GIT_TAG v8.1.1
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
BUILD_COMMAND "" BUILD_COMMAND ""
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taosadapter # taosadapter
ExternalProject_Add(taosadapter ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG ae8d51c GIT_TAG 565ca21
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
此差异已折叠。
...@@ -310,6 +310,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm ...@@ -310,6 +310,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
/* ------------------------------ TAOSX -----------------------------------*/ /* ------------------------------ TAOSX -----------------------------------*/
// note: following apis are unstable // note: following apis are unstable
......
...@@ -20,13 +20,13 @@ ...@@ -20,13 +20,13 @@
#include "tsimplehash.h" #include "tsimplehash.h"
#include "tstreamFileState.h" #include "tstreamFileState.h"
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_
// void* streamBackendInit(const char* path); // void* streamBackendInit(const char* path);
// void streamBackendCleanup(void* arg); // void streamBackendCleanup(void* arg);
// SListNode* streamBackendAddCompare(void* backend, void* arg); // SListNode* streamBackendAddCompare(void* backend, void* arg);
......
...@@ -13,16 +13,13 @@ ...@@ -13,16 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "executor.h"
#include "os.h" #include "os.h"
#include "query.h"
#include "streamState.h" #include "streamState.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdbInt.h" #include "tdbInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -340,7 +337,7 @@ typedef struct SStreamMeta { ...@@ -340,7 +337,7 @@ typedef struct SStreamMeta {
TTB* pTaskDb; TTB* pTaskDb;
TTB* pCheckpointDb; TTB* pCheckpointDb;
SHashObj* pTasks; SHashObj* pTasks;
SArray* pTaskList; // SArray<task_id*> SArray* pTaskList; // SArray<task_id*>
void* ahandle; void* ahandle;
TXN* txn; TXN* txn;
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
...@@ -568,6 +565,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask); ...@@ -568,6 +565,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask); // int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
void streamMetaInit();
void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
......
...@@ -449,6 +449,7 @@ static void *tscCrashReportThreadFp(void *param) { ...@@ -449,6 +449,7 @@ static void *tscCrashReportThreadFp(void *param) {
tscError("failed to send crash report"); tscError("failed to send crash report");
if (pFile) { if (pFile) {
taosReleaseCrashLogFile(pFile, false); taosReleaseCrashLogFile(pFile, false);
pFile = NULL;
continue; continue;
} }
} else { } else {
...@@ -468,6 +469,7 @@ static void *tscCrashReportThreadFp(void *param) { ...@@ -468,6 +469,7 @@ static void *tscCrashReportThreadFp(void *param) {
if (pFile) { if (pFile) {
taosReleaseCrashLogFile(pFile, truncateFile); taosReleaseCrashLogFile(pFile, truncateFile);
pFile = NULL;
truncateFile = false; truncateFile = false;
} }
......
...@@ -24,6 +24,8 @@ typedef struct { ...@@ -24,6 +24,8 @@ typedef struct {
struct { struct {
int64_t clusterId; int64_t clusterId;
int32_t passKeyCnt; int32_t passKeyCnt;
int32_t passVer;
int32_t reqCnt;
}; };
}; };
} SHbParam; } SHbParam;
...@@ -536,14 +538,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -536,14 +538,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) { if (!pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
int32_t code = 0; int32_t code = 0;
if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) {
tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver);
goto _return;
}
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion));
if (!user) { if (!user) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -570,6 +578,11 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -570,6 +578,11 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
goto _return; goto _return;
} }
// assign the passVer
if (param) {
param->passVer = pTscObj->passInfo.ver;
}
_return: _return:
releaseTscObj(connKey->tscRid); releaseTscObj(connKey->tscRid);
if (code) { if (code) {
...@@ -714,13 +727,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { ...@@ -714,13 +727,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
} }
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
SHbParam *hbParam = (SHbParam *)param; int32_t code = 0;
struct SCatalog *pCatalog = NULL; SHbParam *hbParam = (SHbParam *)param;
SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (hbParam->reqCnt == 0) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); code = catalogGetHandle(hbParam->clusterId, &pCatalog);
return code; if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
} }
hbGetAppInfo(hbParam->clusterId, req); hbGetAppInfo(hbParam->clusterId, req);
...@@ -728,24 +744,28 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req ...@@ -728,24 +744,28 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
hbGetQueryBasicInfo(connKey, req); hbGetQueryBasicInfo(connKey, req);
if (hbParam->passKeyCnt > 0) { if (hbParam->passKeyCnt > 0) {
hbGetUserBasicInfo(connKey, req); hbGetUserBasicInfo(connKey, hbParam, req);
} }
code = hbGetExpiredUserInfo(connKey, pCatalog, req); if (hbParam->reqCnt == 0) {
if (TSDB_CODE_SUCCESS != code) { code = hbGetExpiredUserInfo(connKey, pCatalog, req);
return code; if (TSDB_CODE_SUCCESS != code) {
} return code;
}
code = hbGetExpiredDBInfo(connKey, pCatalog, req); code = hbGetExpiredDBInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
code = hbGetExpiredStbInfo(connKey, pCatalog, req); code = hbGetExpiredStbInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
}
} }
++hbParam->reqCnt; // success to get catalog info
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -766,55 +786,47 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -766,55 +786,47 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
} }
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
if (!pBatchReq->reqs) {
int64_t rid = -1;
int32_t code = 0;
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
SClientHbReq *pOneReq = pIter;
SClientHbKey *connKey = pOneReq ? &pOneReq->connKey : NULL;
if (connKey != NULL) rid = connKey->tscRid;
STscObj *pTscObj = (STscObj *)acquireTscObj(rid);
if (pTscObj == NULL) {
tFreeClientHbBatchReq(pBatchReq); tFreeClientHbBatchReq(pBatchReq);
return NULL; return NULL;
} }
while (pIter != NULL) { void *pIter = NULL;
SHbParam param = {0};
while ((pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter))) {
SClientHbReq *pOneReq = pIter;
SClientHbKey *connKey = &pOneReq->connKey;
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) {
continue;
}
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
SHbParam param;
switch (pOneReq->connKey.connType) { switch (connKey->connType) {
case CONN_TYPE__QUERY: { case CONN_TYPE__QUERY: {
param.clusterId = pOneReq->clusterId; if (param.clusterId == 0) {
// init
param.clusterId = pOneReq->clusterId;
param.passVer = INT32_MIN;
}
param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt); param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt);
break; break;
} }
default: default:
break; break;
} }
if (clientHbMgr.reqHandle[pOneReq->connKey.connType]) { if (clientHbMgr.reqHandle[connKey->connType]) {
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &param, pOneReq); int32_t code = (*clientHbMgr.reqHandle[connKey->connType])(connKey, &param, pOneReq);
if (code) { if (code) {
tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code), tscWarn("hbGatherAllInfo failed since %s, tscRid:%" PRIi64 ", connType:%" PRIi8, tstrerror(code),
pOneReq->connKey.tscRid, pOneReq->connKey.connType); connKey->tscRid, connKey->connType);
} }
} }
break;
#if 0 releaseTscObj(connKey->tscRid);
if (code) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
pOneReq = pIter;
continue;
}
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
pOneReq = pIter;
#endif
} }
releaseTscObj(rid);
return pBatchReq; return pBatchReq;
} }
...@@ -885,7 +897,6 @@ static void *hbThreadFunc(void *param) { ...@@ -885,7 +897,6 @@ static void *hbThreadFunc(void *param) {
hbGatherAppInfo(); hbGatherAppInfo();
} }
SArray *mgr = taosArrayInit(sz, sizeof(void *));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) { if (pAppHbMgr == NULL) {
...@@ -894,7 +905,6 @@ static void *hbThreadFunc(void *param) { ...@@ -894,7 +905,6 @@ static void *hbThreadFunc(void *param) {
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
if (connCnt == 0) { if (connCnt == 0) {
taosArrayPush(mgr, &pAppHbMgr);
continue; continue;
} }
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr); SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
...@@ -908,7 +918,6 @@ static void *hbThreadFunc(void *param) { ...@@ -908,7 +918,6 @@ static void *hbThreadFunc(void *param) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr); // hbClearReqInfo(pAppHbMgr);
taosArrayPush(mgr, &pAppHbMgr);
break; break;
} }
...@@ -920,7 +929,6 @@ static void *hbThreadFunc(void *param) { ...@@ -920,7 +929,6 @@ static void *hbThreadFunc(void *param) {
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr); // hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf); taosMemoryFree(buf);
taosArrayPush(mgr, &pAppHbMgr);
break; break;
} }
pInfo->fp = hbAsyncCallBack; pInfo->fp = hbAsyncCallBack;
...@@ -941,12 +949,8 @@ static void *hbThreadFunc(void *param) { ...@@ -941,12 +949,8 @@ static void *hbThreadFunc(void *param) {
// hbClearReqInfo(pAppHbMgr); // hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
taosArrayPush(mgr, &pAppHbMgr);
} }
taosArrayDestroy(clientHbMgr.appHbMgrs);
clientHbMgr.appHbMgrs = mgr;
taosThreadMutexUnlock(&clientHbMgr.lock); taosThreadMutexUnlock(&clientHbMgr.lock);
taosMsleep(HEARTBEAT_INTERVAL); taosMsleep(HEARTBEAT_INTERVAL);
......
...@@ -2109,6 +2109,29 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { ...@@ -2109,6 +2109,29 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
} }
} }
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*) res;
STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
if (pOffset->type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.rspOffset.version;
}
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->metaRsp.rspOffset.version;
}
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.rspOffset.version;
}
}
// data from tsdb, no valid offset info
return -1;
}
const char* tmq_get_table_name(TAOS_RES* res) { const char* tmq_get_table_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) { if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res; SMqRspObj* pRspObj = (SMqRspObj*)res;
......
...@@ -91,6 +91,7 @@ static void *dmCrashReportThreadFp(void *param) { ...@@ -91,6 +91,7 @@ static void *dmCrashReportThreadFp(void *param) {
dError("failed to send crash report"); dError("failed to send crash report");
if (pFile) { if (pFile) {
taosReleaseCrashLogFile(pFile, false); taosReleaseCrashLogFile(pFile, false);
pFile = NULL;
continue; continue;
} }
} else { } else {
...@@ -110,6 +111,7 @@ static void *dmCrashReportThreadFp(void *param) { ...@@ -110,6 +111,7 @@ static void *dmCrashReportThreadFp(void *param) {
if (pFile) { if (pFile) {
taosReleaseCrashLogFile(pFile, truncateFile); taosReleaseCrashLogFile(pFile, truncateFile);
pFile = NULL;
truncateFile = false; truncateFile = false;
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "dmNodes.h" #include "dmNodes.h"
#include "index.h" #include "index.h"
#include "qworker.h" #include "qworker.h"
#include "tstream.h"
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper); SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
...@@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) { ...@@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
} }
indexInit(tsNumOfCommitThreads); indexInit(tsNumOfCommitThreads);
streamMetaInit();
dmReportStartup("dnode-transport", "initialized"); dmReportStartup("dnode-transport", "initialized");
dDebug("dnode is created, ptr:%p", pDnode); dDebug("dnode is created, ptr:%p", pDnode);
...@@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) { ...@@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupServer(pDnode); dmCleanupServer(pDnode);
dmClearVars(pDnode); dmClearVars(pDnode);
rpcCleanup(); rpcCleanup();
streamMetaCleanup();
indexCleanup(); indexCleanup();
taosConvDestroy(); taosConvDestroy();
dDebug("dnode is closed, ptr:%p", pDnode); dDebug("dnode is closed, ptr:%p", pDnode);
......
...@@ -751,7 +751,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) { ...@@ -751,7 +751,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
SCreateDnodeReq createReq = {0}; SCreateDnodeReq createReq = {0};
if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0) { if ((terrno = grantCheck(TSDB_GRANT_DNODE)) != 0 || (terrno = grantCheck(TSDB_GRANT_CPU_CORES)) != 0) {
code = terrno; code = terrno;
goto _OVER; goto _OVER;
} }
......
...@@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { ...@@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t sz = taosArrayGetSize(pTasks); int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) { for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j); SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndPauseStreamTask(pTrans, pTask) < 0) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) {
return -1; return -1;
} }
} }
...@@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn ...@@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks); int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) { for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j); SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1; return -1;
} }
} }
......
...@@ -894,8 +894,10 @@ void destroyTimeSliceOperatorInfo(void* param) { ...@@ -894,8 +894,10 @@ void destroyTimeSliceOperatorInfo(void* param) {
} }
taosArrayDestroy(pInfo->pLinearInfo); taosArrayDestroy(pInfo->pLinearInfo);
taosMemoryFree(pInfo->pPrevGroupKey->pData); if (pInfo->pPrevGroupKey) {
taosMemoryFree(pInfo->pPrevGroupKey); taosMemoryFree(pInfo->pPrevGroupKey->pData);
taosMemoryFree(pInfo->pPrevGroupKey);
}
cleanupExprSupp(&pInfo->scalarSup); cleanupExprSupp(&pInfo->scalarSup);
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
#ifndef _STREAM_BACKEDN_ROCKSDB_H_ #ifndef _STREAM_BACKEDN_ROCKSDB_H_
#define _STREAM_BACKEDN_ROCKSDB_H_ #define _STREAM_BACKEDN_ROCKSDB_H_
#include "executor.h"
#include "rocksdb/c.h" #include "rocksdb/c.h"
// #include "streamInc.h" // #include "streamInc.h"
#include "streamState.h" #include "streamState.h"
...@@ -112,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi ...@@ -112,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);
void* streamStateCreateBatch();
int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// default cf // default cf
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
...@@ -138,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch); ...@@ -138,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch); void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch); void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen); void* val, int32_t vlen, int64_t ttl);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif #endif
\ No newline at end of file
...@@ -16,9 +16,12 @@ ...@@ -16,9 +16,12 @@
#ifndef _STREAM_INC_H_ #ifndef _STREAM_INC_H_
#define _STREAM_INC_H_ #define _STREAM_INC_H_
//#include "executor.h" #include "executor.h"
#include "query.h"
#include "tstream.h" #include "tstream.h"
#include "trpc.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
......
...@@ -308,6 +308,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -308,6 +308,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
numOfBlocks, size); numOfBlocks, size);
streamDataSubmitDestroy(pSubmitBlock); streamDataSubmitDestroy(pSubmitBlock);
taosFreeQitem(pSubmitBlock);
return -1; return -1;
} }
......
...@@ -13,8 +13,9 @@ ...@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// #include "streamStateRocksdb.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "executor.h"
#include "query.h"
#include "tcommon.h" #include "tcommon.h"
typedef struct SCompactFilteFactory { typedef struct SCompactFilteFactory {
...@@ -110,6 +111,9 @@ void* streamBackendInit(const char* path) { ...@@ -110,6 +111,9 @@ void* streamBackendInit(const char* path) {
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
} }
} else { } else {
/*
list all cf and get prefix
*/
int64_t streamId; int64_t streamId;
int32_t taskId, dummpy = 0; int32_t taskId, dummpy = 0;
SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
...@@ -649,18 +653,7 @@ const char* compactFilteFactoryName(void* arg) { ...@@ -649,18 +653,7 @@ const char* compactFilteFactoryName(void* arg) {
void destroyCompactFilte(void* arg) { (void)arg; } void destroyCompactFilte(void* arg) { (void)arg; }
unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen,
char** newval, size_t* newvlen, unsigned char* value_changed) { char** newval, size_t* newvlen, unsigned char* value_changed) {
// int64_t unixTime = taosGetTimestampMs(); return streamStateValueIsStale((char*)val) ? 1 : 0;
if (streamStateValueIsStale((char*)val)) {
return 1;
}
// SStreamValue value;
// memset(&value, 0, sizeof(value));
// streamValueDecode(&value, (char*)val);
// taosMemoryFree(value.data);
// if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
// return 1;
// }
return 0;
} }
const char* compactFilteName(void* arg) { return "stream_filte"; } const char* compactFilteName(void* arg) { return "stream_filte"; }
...@@ -703,7 +696,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { ...@@ -703,7 +696,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
memcpy(cfNames[0], "default", strlen("default")); memcpy(cfNames[0], "default", strlen("default"));
continue; continue;
} }
qError("cf name %s", idstr);
GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key); GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key);
if (i % cfLen == 0) { if (i % cfLen == 0) {
...@@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { ...@@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen); if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen);
} }
} }
for (int i = 0; i < nSize * cfLen + 1; i++) {
qError("cf name %s", cfNames[i]);
}
rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*)); rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*));
RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*)); RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*));
for (int i = 0; i < nSize * cfLen + 1; i++) { for (int i = 0; i < nSize * cfLen + 1; i++) {
...@@ -858,7 +847,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { ...@@ -858,7 +847,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
if (err != NULL) { if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err); taosMemoryFreeClear(err);
// return -1;
} }
} }
pState->pTdbState->rocksdb = handle->db; pState->pTdbState->rocksdb = handle->db;
...@@ -1012,53 +1000,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -1012,53 +1000,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
taosMemoryFree(ttlV); \ taosMemoryFree(ttlV); \
} while (0); } while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \ do { \
code = 0; \ code = 0; \
char buf[128] = {0}; \ char buf[128] = {0}; \
char* err = NULL; \ char* err = NULL; \
int i = streamGetInit(funcname); \ int i = streamGetInit(funcname); \
if (i < 0) { \ if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \ qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \ code = -1; \
break; \ break; \
} \ } \
char toString[128] = {0}; \ char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \ rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \ size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \ if (val == NULL) { \
qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ if (err == NULL) { \
if (err != NULL) taosMemoryFree(err); \ qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
code = -1; \ funcname); \
} else { \ } else { \
char * p = NULL, *end = NULL; \ qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \ err); \
if (len < 0) { \ taosMemoryFreeClear(err); \
qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \ } \
code = -1; \ code = -1; \
} else { \ } else { \
qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \ char* p = NULL; \
} \ int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (pVal != NULL) { \ if (len < 0) { \
*pVal = p; \ qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
} else { \ funcname); \
taosMemoryFree(p); \ code = -1; \
} \ } else { \
taosMemoryFree(val); \ qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
if (vLen != NULL) *vLen = len; \ len); \
} \ } \
if (err != NULL) { \ taosMemoryFree(val); \
taosMemoryFree(err); \ if (vLen != NULL) *vLen = len; \
qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ } \
code = -1; \ if (code == 0) \
} else { \ qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
} \
} while (0); } while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
...@@ -1133,10 +1119,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { ...@@ -1133,10 +1119,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen); // eLen);
if (err != NULL) { if (err != NULL) {
qWarn( qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
"failed to delete range cf(state) err: %s, "
"start: %s, end:%s",
err, toStringStart, toStringEnd);
taosMemoryFree(err); taosMemoryFree(err);
} }
...@@ -1588,20 +1571,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, ...@@ -1588,20 +1571,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
return -1; return -1;
} }
size_t tlen; size_t klen, vlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &klen);
winKeyDecode(&winKey, keyStr); winKeyDecode(&winKey, keyStr);
size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
char* dst = NULL; // char* dst = NULL;
int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst); int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, (char**)pVal);
if (len < 0) { if (len < 0) {
return -1; return -1;
} }
if (pVLen != NULL) *pVLen = len;
if (pVal != NULL) *pVal = (char*)dst;
if (pVLen != NULL) *pVLen = vlen;
*pKey = winKey; *pKey = winKey;
return 0; return 0;
...@@ -1999,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) { ...@@ -1999,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) {
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen) { void* val, int32_t vlen, int64_t ttl) {
int i = streamGetInit(cfName); int i = streamGetInit(cfName);
if (i < 0) { if (i < 0) {
...@@ -2010,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr ...@@ -2010,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
int32_t klen = ginitDict[i].enFunc((void*)key, buf); int32_t klen = ginitDict[i].enFunc((void*)key, buf);
char* ttlV = NULL; char* ttlV = NULL;
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV); int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV); taosMemoryFree(ttlV);
......
...@@ -20,12 +20,12 @@ ...@@ -20,12 +20,12 @@
#define MIN_STREAM_EXEC_BATCH_NUM 16 #define MIN_STREAM_EXEC_BATCH_NUM 16
bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldStop(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
} }
bool streamTaskShouldPause(const SStreamStatus* pStatus) { bool streamTaskShouldPause(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
return (status == TASK_STATUS__PAUSE); return (status == TASK_STATUS__PAUSE);
} }
...@@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
while (pTask->taskLevel == TASK_LEVEL__SOURCE) { while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus); int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status != TASK_STATUS__NORMAL) { if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
atomic_load_8(&pTask->status.taskStatus)); atomic_load_8(&pTask->status.taskStatus));
taosMsleep(2); taosMsleep(2);
...@@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data;
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
pSubmit->submit.msgLen, pSubmit->submit.ver); pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data;
SArray* pBlockList = pBlock->blocks; SArray* pBlockList = pBlock->blocks;
int32_t numOfBlocks = taosArrayGetSize(pBlockList); int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data;
...@@ -202,7 +202,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -202,7 +202,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qRes->blocks = pRes; qRes->blocks = pRes;
code = streamTaskOutput(pTask, qRes); code = streamTaskOutput(pTask, qRes);
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
taosFreeQitem(pRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return code; return code;
} }
...@@ -332,12 +333,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -332,12 +333,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int64_t ckId = 0; int64_t ckId = 0;
int64_t dataVer = 0; int64_t dataVer = 0;
qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
", checkPoint id:%" PRId64 " -> %" PRId64, ", checkPoint id:%" PRId64 " -> %" PRId64,
pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId);
pTask->chkInfo = (SCheckpointInfo) {.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer};
taosWLockLatch(&pTask->pMeta->lock); taosWLockLatch(&pTask->pMeta->lock);
...@@ -407,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -407,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed", pTask->id.idStr); qDebug("s-task:%s exec completed", pTask->id.idStr);
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) { if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask); streamSchedExec(pTask);
} }
} }
......
...@@ -19,6 +19,13 @@ ...@@ -19,6 +19,13 @@
#include "tref.h" #include "tref.h"
#include "ttimer.h" #include "ttimer.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
static int32_t streamBackendId = 0;
static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() { taosCloseRef(streamBackendId); }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int32_t code = -1; int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
...@@ -32,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -32,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
sprintf(streamPath, "%s/%s", path, "stream"); sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = taosStrdup(streamPath); pMeta->path = taosStrdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
taosMemoryFree(streamPath);
goto _err; goto _err;
} }
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755); code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
taosMemoryFree(streamPath);
goto _err; goto _err;
} }
taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err; goto _err;
...@@ -74,26 +79,26 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -74,26 +79,26 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->vgId = vgId; pMeta->vgId = vgId;
pMeta->ahandle = ahandle; pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
pMeta->streamBackendId = streamBackendId;
char* statePath = taosMemoryCalloc(1, len); memset(streamPath, 0, len);
sprintf(statePath, "%s/%s", pMeta->path, "state"); sprintf(streamPath, "%s/%s", pMeta->path, "state");
code = taosMulModeMkDir(statePath, 0755); code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
taosMemoryFree(streamPath);
goto _err; goto _err;
} }
pMeta->streamBackend = streamBackendInit(statePath); pMeta->streamBackend = streamBackendInit(streamPath);
pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend);
taosMemoryFree(statePath); taosMemoryFree(streamPath);
taosInitRWLatch(&pMeta->lock); taosInitRWLatch(&pMeta->lock);
return pMeta; return pMeta;
_err: _err:
taosMemoryFree(streamPath);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
...@@ -129,9 +134,7 @@ void streamMetaClose(SStreamMeta* pMeta) { ...@@ -129,9 +134,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
} }
taosHashCleanup(pMeta->pTasks); taosHashCleanup(pMeta->pTasks);
taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid); taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
// streamBackendCleanup(pMeta->streamBackend);
taosCloseRef(pMeta->streamBackendId);
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
...@@ -265,13 +268,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -265,13 +268,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) { if (ppTask) {
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
// taosWLockLatch(&pMeta->lock);
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
//
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
int32_t num = taosArrayGetSize(pMeta->pTaskList); int32_t num = taosArrayGetSize(pMeta->pTaskList);
......
...@@ -115,7 +115,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -115,7 +115,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
pState->taskId = pTask->id.taskId; pState->taskId = pTask->id.taskId;
pState->streamId = pTask->id.streamId; pState->streamId = pTask->id.streamId;
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
qWarn("open stream state1"); // qWarn("open stream state1");
taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState); int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState);
if (code == -1) { if (code == -1) {
...@@ -220,6 +220,7 @@ void streamStateClose(SStreamState* pState, bool remove) { ...@@ -220,6 +220,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
// streamStateCloseBackend(pState); // streamStateCloseBackend(pState);
streamStateDestroy(pState, remove); streamStateDestroy(pState, remove);
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
#else #else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
...@@ -231,7 +232,6 @@ void streamStateClose(SStreamState* pState, bool remove) { ...@@ -231,7 +232,6 @@ void streamStateClose(SStreamState* pState, bool remove) {
tdbTbClose(pState->pTdbState->pParTagDb); tdbTbClose(pState->pTdbState->pParTagDb);
tdbClose(pState->pTdbState->db); tdbClose(pState->pTdbState->db);
#endif #endif
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
} }
int32_t streamStateBegin(SStreamState* pState) { int32_t streamStateBegin(SStreamState* pState) {
...@@ -399,7 +399,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo ...@@ -399,7 +399,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
int32_t code = 0; int32_t code = 0;
void* batch = streamStateCreateBatch(); void* batch = streamStateCreateBatch();
code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen); code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
if (code != 0) { if (code != 0) {
return code; return code;
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "tstreamFileState.h" #include "tstreamFileState.h"
#include "query.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "taos.h" #include "taos.h"
#include "tcommon.h" #include "tcommon.h"
...@@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) { ...@@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, 0, true); clearExpiredRowBuff(pFileState, 0, true);
} }
bool needClearDiskBuff(SStreamFileState* pFileState) { bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushMark > 0; }
return pFileState->flushMark > 0;
}
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0; uint64_t i = 0;
...@@ -325,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) { ...@@ -325,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; } void releaseRowBuffPos(SRowBuffPos* pBuff) { pBuff->beUsed = false; }
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false); int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
: pFileState->maxTs - pFileState->deleteMark;
clearExpiredRowBuff(pFileState, mark, false);
return pFileState->usedBuffs; return pFileState->usedBuffs;
} }
...@@ -356,7 +357,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -356,7 +357,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
} }
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize); code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0);
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
} }
if (streamStateGetBatchSize(batch) > 0) { if (streamStateGetBatchSize(batch) > 0) {
...@@ -372,7 +373,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -372,7 +373,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0; int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
taosMemoryFree(valBuf); taosMemoryFree(valBuf);
} }
{ {
...@@ -381,7 +382,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -381,7 +382,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0; int32_t len = 0;
memcpy(keyBuf, taskKey, strlen(taskKey)); memcpy(keyBuf, taskKey, strlen(taskKey));
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
} }
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} }
...@@ -440,7 +441,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { ...@@ -440,7 +441,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark); int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN
: pFileState->maxTs - pFileState->deleteMark;
deleteExpiredCheckPoint(pFileState, mark);
void* pStVal = NULL; void* pStVal = NULL;
int32_t len = 0; int32_t len = 0;
......
...@@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp") ...@@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(
streamUpdateTest streamUpdateTest
PUBLIC os util common gtest stream PUBLIC os util common gtest gtest_main stream
) )
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
......
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "streamBackendRocksdb.h"
#include "tstream.h"
#include "tstreamUpdate.h" #include "tstreamUpdate.h"
#include "ttime.h" #include "ttime.h"
using namespace std; using namespace std;
#define MAX_NUM_SCALABLE_BF 100000 #define MAX_NUM_SCALABLE_BF 100000
class StreamStateEnv : public ::testing::Test {
protected:
virtual void SetUp() {
streamMetaInit();
backend = streamBackendInit(path);
}
virtual void TearDown() {
streamMetaCleanup();
// indexClose(index);
}
const char *path = TD_TMP_DIR_PATH "stream";
void *backend;
};
bool equalSBF(SScalableBf *left, SScalableBf *right) { bool equalSBF(SScalableBf *left, SScalableBf *right) {
if (left->growth != right->growth) return false; if (left->growth != right->growth) return false;
if (left->numBits != right->numBits) return false; if (left->numBits != right->numBits) return false;
...@@ -191,8 +208,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) { ...@@ -191,8 +208,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
// updateInfoDestroy(pSU6); // updateInfoDestroy(pSU6);
// updateInfoDestroy(pSU7); // updateInfoDestroy(pSU7);
} }
// TEST()
int main(int argc, char *argv[]) { TEST(StreamStateEnv, test1) {}
testing::InitGoogleTest(&argc, argv); // int main(int argc, char *argv[]) {
return RUN_ALL_TESTS(); // testing::InitGoogleTest(&argc, argv);
} // return RUN_ALL_TESTS();
\ No newline at end of file // }
\ No newline at end of file
...@@ -657,36 +657,33 @@ if $data20 != null then ...@@ -657,36 +657,33 @@ if $data20 != null then
return -1 return -1
endi endi
#print =============== error for normal table print =============== error for normal table
#sql create table tb2023(ts timestamp, f int); sql create table tb2023(ts timestamp, f int);
#sql_error alter table tb2023 add column v varchar(65535); sql_error alter table tb2023 add column v varchar(65518);
#sql_error alter table tb2023 add column v varchar(65535); sql_error alter table tb2023 add column v varchar(65531);
#sql_error alter table tb2023 add column v varchar(65530); sql_error alter table tb2023 add column v varchar(65535);
#sql alter table tb2023 add column v varchar(16374); sql alter table tb2023 add column v varchar(65517);
#sql_error alter table tb2023 modify column v varchar(65536); sql_error alter table tb2023 modify column v varchar(65518);
#sql desc tb2023 sql desc tb2023
#sql alter table tb2023 drop column v sql alter table tb2023 drop column v
#sql_error alter table tb2023 add column v nchar(16384); sql_error alter table tb2023 add column v nchar(16380);
#sql alter table tb2023 add column v nchar(4093); sql alter table tb2023 add column v nchar(16379);
#sql_error alter table tb2023 modify column v nchar(16384); sql_error alter table tb2023 modify column v nchar(16380);
#sql_error alter table tb2023 add column v nchar(16384); sql desc tb2023
#sql alter table tb2023 drop column v
#sql alter table tb2023 add column v nchar(16374); print =============== error for super table
#sql desc tb2023 sql create table stb2023(ts timestamp, f int) tags(t1 int);
# sql_error alter table stb2023 add column v varchar(65518);
#print =============== error for super table sql_error alter table stb2023 add column v varchar(65531);
#sql create table stb2023(ts timestamp, f int) tags(t1 int); sql_error alter table stb2023 add column v varchar(65535);
#sql_error alter table stb2023 add column v varchar(65535); sql alter table stb2023 add column v varchar(65517);
#sql_error alter table stb2023 add column v varchar(65536); sql_error alter table stb2023 modify column v varchar(65518);
#sql_error alter table stb2023 add column v varchar(33100); sql desc stb2023
#sql alter table stb2023 add column v varchar(16374); sql alter table stb2023 drop column v
#sql_error alter table stb2023 modify column v varchar(16375); sql_error alter table stb2023 add column v nchar(16380);
#sql desc stb2023 sql alter table stb2023 add column v nchar(16379);
#sql alter table stb2023 drop column v sql_error alter table stb2023 modify column v nchar(16380);
#sql_error alter table stb2023 add column v nchar(4094); sql desc stb2023
#sql alter table stb2023 add column v nchar(4093);
#sql_error alter table stb2023 modify column v nchar(4094);
#sql desc stb2023
print ======= over print ======= over
sql drop database d1 sql drop database d1
......
...@@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10); ...@@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
sql_error alter table tb modify column c2 binary(9); sql_error alter table tb modify column c2 binary(9);
sql_error alter table tb modify column c2 binary(-9); sql_error alter table tb modify column c2 binary(-9);
sql_error alter table tb modify column c2 binary(0); sql_error alter table tb modify column c2 binary(0);
sql_error alter table tb modify column c2 binary(65600); sql_error alter table tb modify column c2 binary(65436);
sql_error alter table tb modify column c2 nchar(30); sql_error alter table tb modify column c2 nchar(30);
sql_error alter table tb modify column c3 double; sql_error alter table tb modify column c3 double;
sql_error alter table tb modify column c3 nchar(10); sql_error alter table tb modify column c3 nchar(10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册