提交 f9bcea8d 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format

......@@ -71,7 +71,6 @@ typedef struct SStmtBindInfo {
typedef struct SStmtExecInfo {
int32_t affectedRows;
bool emptyRes;
SRequestObj* pRequest;
SHashObj* pVgHash;
SHashObj* pBlockHash;
......@@ -87,7 +86,6 @@ typedef struct SStmtSQLInfo {
char* sqlStr;
int32_t sqlLen;
SArray* nodeList;
SQueryPlan* pQueryPlan;
SStmtQueryResInfo queryRes;
bool autoCreateTbl;
} SStmtSQLInfo;
......
......@@ -279,7 +279,6 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
}
pStmt->exec.autoCreateTbl = false;
pStmt->exec.emptyRes = false;
if (keepTable) {
return TSDB_CODE_SUCCESS;
......@@ -298,7 +297,6 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
taosMemoryFree(pStmt->sql.queryRes.userFields);
taosMemoryFree(pStmt->sql.sqlStr);
qDestroyQuery(pStmt->sql.pQuery);
qDestroyQueryPlan(pStmt->sql.pQueryPlan);
taosArrayDestroy(pStmt->sql.nodeList);
void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
......@@ -599,6 +597,8 @@ int32_t stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel
int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
......@@ -617,19 +617,40 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
if (STMT_TYPE_QUERY == pStmt->sql.type) {
if (NULL == pStmt->sql.pQueryPlan) {
STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
pStmt->exec.pRequest->body.pDag = NULL;
STMT_ERR_RET(stmtBackupQueryFields(pStmt));
} else {
STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
STMT_ERR_RET(qStmtBindParams(pStmt->sql.pQuery, bind, colIdx, pStmt->exec.pRequest->requestId));
SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
.acctId = pStmt->taos->acctId,
.db = pStmt->exec.pRequest->pDb,
.topicQuery = false,
.pSql = pStmt->sql.sqlStr,
.sqlLen = pStmt->sql.sqlLen,
.pMsg = pStmt->exec.pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pStmt->taos->pAppInfo->pTransporter,
.pStmtCb = NULL,
.pUser = pStmt->taos->user};
ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));
STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery));
if (pStmt->sql.pQuery->haveResultSet) {
setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema, pStmt->sql.pQuery->numOfResCols);
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
}
STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId, &pStmt->exec.emptyRes));
TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
//if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
// STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
//}
//STMT_ERR_RET(stmtBackupQueryFields(pStmt));
return TSDB_CODE_SUCCESS;
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
......@@ -736,11 +757,7 @@ int stmtExec(TAOS_STMT *stmt) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
if (STMT_TYPE_QUERY == pStmt->sql.type) {
if (pStmt->exec.emptyRes) {
pStmt->exec.pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
} else {
scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList, NULL);
}
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, NULL);
} else {
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true, (autoCreateTbl ? (void**)&pRsp : NULL));
......@@ -839,16 +856,7 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
}
if (STMT_TYPE_QUERY == pStmt->sql.type) {
if (NULL == pStmt->sql.pQueryPlan) {
STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
pStmt->exec.pRequest->body.pDag = NULL;
STMT_ERR_RET(stmtBackupQueryFields(pStmt));
} else {
STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
}
*nums = taosArrayGetSize(pStmt->sql.pQueryPlan->pPlaceholderValues);
*nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
} else {
STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
}
......
......@@ -279,6 +279,7 @@ int32_t taosAddClientLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "cDebugFlag", cDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "uDebugFlag", uDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcDebugFlag", rpcDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "qDebugFlag", qDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "tmrDebugFlag", tmrDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "simDebugFlag", 143, 0, 255, 1) != 0) return -1;
......@@ -456,6 +457,7 @@ static void taosSetClientLogCfg(SConfig *pCfg) {
tsLogKeepDays = cfgGetItem(pCfg, "logKeepDays")->i32;
cDebugFlag = cfgGetItem(pCfg, "cDebugFlag")->i32;
uDebugFlag = cfgGetItem(pCfg, "uDebugFlag")->i32;
qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32;
rpcDebugFlag = cfgGetItem(pCfg, "rpcDebugFlag")->i32;
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
jniDebugFlag = cfgGetItem(pCfg, "jniDebugFlag")->i32;
......
......@@ -842,6 +842,8 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
return TSDB_CODE_SUCCESS;
}
ctgDebug("Got subtable meta from cache, type:%d, dbFName:%s, tbName:%s, suid:%" PRIx64, tbMeta->tableType, dbFName, pTableName->tname, tbMeta->suid);
CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
......@@ -1656,6 +1658,11 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
if (orig) {
origType = orig->tableType;
if (origType == meta->tableType && orig->uid == meta->uid && orig->sversion >= meta->sversion && orig->tversion >= meta->tversion) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
return TSDB_CODE_SUCCESS;
}
if (origType == TSDB_SUPER_TABLE) {
if ((!isStb) || orig->suid != meta->suid) {
CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
......@@ -1693,7 +1700,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
CTG_CACHE_STAT_ADD(tblNum, 1);
}
ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d, suid:%" PRIx64, dbFName, tbName, meta->tableType, meta->suid);
ctgdShowTableMeta(pCtg, tbName, meta);
if (!isStb) {
......@@ -1701,12 +1708,6 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
return TSDB_CODE_SUCCESS;
}
if (origType == TSDB_SUPER_TABLE && origSuid == meta->suid) {
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
return TSDB_CODE_SUCCESS;
}
STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) {
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
......@@ -1721,7 +1722,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d, suid:%" PRIx64 ",ma:%p", dbFName, tbName, meta->tableType, meta->suid, tbMeta);
SSTableMetaVersion metaRent = {.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
strcpy(metaRent.dbFName, dbFName);
......
......@@ -310,28 +310,28 @@ enum {
};
int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfdProxy {
typedef struct SUdfcProxy {
char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
uv_barrier_t gUdfInitBarrier;
uv_barrier_t initBarrier;
uv_loop_t gUdfdLoop;
uv_thread_t gUdfLoopThread;
uv_async_t gUdfLoopTaskAync;
uv_loop_t uvLoop;
uv_thread_t loopThread;
uv_async_t loopTaskAync;
uv_async_t gUdfLoopStopAsync;
uv_async_t loopStopAsync;
uv_mutex_t gUdfTaskQueueMutex;
int8_t gUdfcState;
QUEUE gUdfTaskQueue;
QUEUE gUvProcTaskQueue;
uv_mutex_t taskQueueMutex;
int8_t udfcState;
QUEUE taskQueue;
QUEUE uvProcTaskQueue;
int8_t initialized;
} SUdfdProxy;
} SUdfcProxy;
SUdfdProxy gUdfdProxy = {0};
SUdfcProxy gUdfdProxy = {0};
typedef struct SClientUdfUvSession {
SUdfdProxy *udfc;
SUdfcProxy *udfc;
int64_t severHandle;
uv_pipe_t *udfUvPipe;
......@@ -341,7 +341,7 @@ typedef struct SClientUdfUvSession {
} SClientUdfUvSession;
typedef struct SClientUvTaskNode {
SUdfdProxy *udfc;
SUdfcProxy *udfc;
int8_t type;
int errCode;
......@@ -1055,11 +1055,11 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
SUdfdProxy *udfc = uvTask->udfc;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue);
uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
uv_async_send(&udfc->gUdfLoopTaskAync);
SUdfcProxy *udfc = uvTask->udfc;
uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
uv_mutex_unlock(&udfc->taskQueueMutex);
uv_async_send(&udfc->loopTaskAync);
uv_sem_wait(&uvTask->taskSem);
fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask);
......@@ -1073,7 +1073,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
switch (uvTask->type) {
case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0);
uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0);
uvTask->pipe = pipe;
SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
......@@ -1113,46 +1113,46 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
}
void udfClientAsyncCb(uv_async_t *async) {
SUdfdProxy *udfc = async->data;
SUdfcProxy *udfc = async->data;
QUEUE wq;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq);
uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_MOVE(&udfc->taskQueue, &wq);
uv_mutex_unlock(&udfc->taskQueueMutex);
while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
udfcStartUvTask(task);
QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue);
QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
}
}
void cleanUpUvTasks(SUdfdProxy *udfc) {
void cleanUpUvTasks(SUdfcProxy *udfc) {
fnDebug("clean up uv tasks")
QUEUE wq;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq);
uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_MOVE(&udfc->taskQueue, &wq);
uv_mutex_unlock(&udfc->taskQueueMutex);
while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
if (udfc->udfcState == UDFC_STATE_STOPPING) {
task->errCode = TSDB_CODE_UDF_STOPPING;
}
uv_sem_post(&task->taskSem);
}
while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) {
QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue);
while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
QUEUE* h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
if (udfc->udfcState == UDFC_STATE_STOPPING) {
task->errCode = TSDB_CODE_UDF_STOPPING;
}
uv_sem_post(&task->taskSem);
......@@ -1160,28 +1160,28 @@ void cleanUpUvTasks(SUdfdProxy *udfc) {
}
void udfStopAsyncCb(uv_async_t *async) {
SUdfdProxy *udfc = async->data;
SUdfcProxy *udfc = async->data;
cleanUpUvTasks(udfc);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
uv_stop(&udfc->gUdfdLoop);
if (udfc->udfcState == UDFC_STATE_STOPPING) {
uv_stop(&udfc->uvLoop);
}
}
void constructUdfService(void *argsThread) {
SUdfdProxy *udfc = (SUdfdProxy*)argsThread;
uv_loop_init(&udfc->gUdfdLoop);
uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopTaskAync, udfClientAsyncCb);
udfc->gUdfLoopTaskAync.data = udfc;
uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopStopAsync, udfStopAsyncCb);
udfc->gUdfLoopStopAsync.data = udfc;
uv_mutex_init(&udfc->gUdfTaskQueueMutex);
QUEUE_INIT(&udfc->gUdfTaskQueue);
QUEUE_INIT(&udfc->gUvProcTaskQueue);
uv_barrier_wait(&udfc->gUdfInitBarrier);
SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
uv_loop_init(&udfc->uvLoop);
uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfClientAsyncCb);
udfc->loopTaskAync.data = udfc;
uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
udfc->loopStopAsync.data = udfc;
uv_mutex_init(&udfc->taskQueueMutex);
QUEUE_INIT(&udfc->taskQueue);
QUEUE_INIT(&udfc->uvProcTaskQueue);
uv_barrier_wait(&udfc->initBarrier);
//TODO return value of uv_run
uv_run(&udfc->gUdfdLoop, UV_RUN_DEFAULT);
uv_loop_close(&udfc->gUdfdLoop);
uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
uv_loop_close(&udfc->uvLoop);
}
int32_t udfcOpen() {
......@@ -1189,14 +1189,14 @@ int32_t udfcOpen() {
if (old == 1) {
return 0;
}
SUdfdProxy *proxy = &gUdfdProxy;
SUdfcProxy *proxy = &gUdfdProxy;
getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
proxy->gUdfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->gUdfInitBarrier, 2);
uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
atomic_store_8(&proxy->gUdfcState, UDFC_STATE_READY);
proxy->gUdfcState = UDFC_STATE_READY;
uv_barrier_wait(&proxy->gUdfInitBarrier);
proxy->udfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->initBarrier, 2);
uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
proxy->udfcState = UDFC_STATE_READY;
uv_barrier_wait(&proxy->initBarrier);
fnInfo("udfc initialized")
return 0;
}
......@@ -1207,13 +1207,13 @@ int32_t udfcClose() {
return 0;
}
SUdfdProxy *udfc = &gUdfdProxy;
udfc->gUdfcState = UDFC_STATE_STOPPING;
uv_async_send(&udfc->gUdfLoopStopAsync);
uv_thread_join(&udfc->gUdfLoopThread);
uv_mutex_destroy(&udfc->gUdfTaskQueueMutex);
uv_barrier_destroy(&udfc->gUdfInitBarrier);
udfc->gUdfcState = UDFC_STATE_INITAL;
SUdfcProxy *udfc = &gUdfdProxy;
udfc->udfcState = UDFC_STATE_STOPPING;
uv_async_send(&udfc->loopStopAsync);
uv_thread_join(&udfc->loopThread);
uv_mutex_destroy(&udfc->taskQueueMutex);
uv_barrier_destroy(&udfc->initBarrier);
udfc->udfcState = UDFC_STATE_INITAL;
fnInfo("udfc cleaned up");
return 0;
}
......@@ -1236,7 +1236,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
fnInfo("udfc setup udf. udfName: %s", udfName);
if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) {
if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
return TSDB_CODE_UDF_INVALID_STATE;
}
SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
......@@ -1484,7 +1484,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SSDataBlock tempBlock = {0};
tempBlock.info.numOfCols = numOfCols;
tempBlock.info.rows = numOfRows;
tempBlock.info.rows = pInput->totalRows;
tempBlock.info.uid = pInput->uid;
bool hasVarCol = false;
tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
......
......@@ -1227,16 +1227,20 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
if (NULL == (*pQuery)->pTableList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT;
(*pQuery)->pRoot = (SNode*)context.pOutput;
}
if (NULL == (*pQuery)->pTableList) {
(*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
if (NULL == (*pQuery)->pTableList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = skipInsertInto(&context);
......
......@@ -184,5 +184,6 @@ int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) {
code = calculateConstant(pCxt, pQuery);
}
return code;
}
......@@ -290,6 +290,10 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isSuperTable, STabl
pTableMeta->sversion = msg->sversion;
pTableMeta->tversion = msg->tversion;
if (isSuperTable) {
qDebug("stable %s meta returned, suid:%" PRIx64, msg->stbName, pTableMeta->suid);
}
pTableMeta->tableInfo.numOfTags = msg->numOfTags;
pTableMeta->tableInfo.precision = msg->precision;
pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;
......
......@@ -37,7 +37,8 @@ typedef struct SScalarCtx {
#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type))
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)
#define SCL_IS_NULL_VALUE_NODE(_node) ((QUERY_NODE_VALUE == nodeType(_node)) && (TSDB_DATA_TYPE_NULL == ((SValueNode *)_node)->node.resType.type) && (((SValueNode *)_node)->placeholderNo <= 0))
//#define SCL_IS_NULL_VALUE_NODE(_node) ((QUERY_NODE_VALUE == nodeType(_node)) && (TSDB_DATA_TYPE_NULL == ((SValueNode *)_node)->node.resType.type) && (((SValueNode *)_node)->placeholderNo <= 0))
#define SCL_IS_NULL_VALUE_NODE(_node) ((QUERY_NODE_VALUE == nodeType(_node)) && (TSDB_DATA_TYPE_NULL == ((SValueNode *)_node)->node.resType.type))
#define sclFatal(...) qFatal(__VA_ARGS__)
#define sclError(...) qError(__VA_ARGS__)
......
......@@ -9,10 +9,12 @@
#include <unistd.h>
#include "../../../include/client/taos.h"
#define FUNCTION_TEST_IDX 1
int32_t shortColList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT};
int32_t fullColList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_BOOL, TSDB_DATA_TYPE_TINYINT, TSDB_DATA_TYPE_UTINYINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_USMALLINT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_UINT, TSDB_DATA_TYPE_BIGINT, TSDB_DATA_TYPE_UBIGINT, TSDB_DATA_TYPE_FLOAT, TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_BINARY, TSDB_DATA_TYPE_NCHAR};
int32_t bindColTypeList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_NCHAR};
int32_t optrIdxList[] = {0, 9};
int32_t bindColTypeList[] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT};
int32_t optrIdxList[] = {0, 7};
typedef struct {
char* oper;
......@@ -53,7 +55,6 @@ FuncInfo funcInfo[] = {
{"count", 1},
{"sum", 1},
{"min", 1},
{"sin", 1},
};
char *bpStbPrefix = "st";
......@@ -66,6 +67,10 @@ int32_t bpDefaultStbId = 1;
//char *varoperatorList[] = {">", ">=", "<", "<=", "=", "<>", "in", "not in", "like", "not like", "match", "nmatch"};
#define tListLen(x) (sizeof(x) / sizeof((x)[0]))
#define IS_SIGNED_NUMERIC_TYPE(_t) ((_t) >= TSDB_DATA_TYPE_TINYINT && (_t) <= TSDB_DATA_TYPE_BIGINT)
#define IS_UNSIGNED_NUMERIC_TYPE(_t) ((_t) >= TSDB_DATA_TYPE_UTINYINT && (_t) <= TSDB_DATA_TYPE_UBIGINT)
#define IS_FLOAT_TYPE(_t) ((_t) == TSDB_DATA_TYPE_FLOAT || (_t) == TSDB_DATA_TYPE_DOUBLE)
#define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t)))
typedef struct {
int64_t* tsData;
......@@ -165,8 +170,11 @@ CaseCfg gCase[] = {
// 22
{"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, true, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1},
{"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2},
{"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 2, 10, 1, 3, 0, 0, 1, 2},
// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2},
// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2},
{"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2},
{"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2},
};
......@@ -181,6 +189,7 @@ typedef struct {
bool printQuerySql;
bool printStmtSql;
bool autoCreateTbl;
bool numericParam;
int32_t rowNum; //row num for one table
int32_t bindColNum;
int32_t bindTagNum;
......@@ -207,6 +216,7 @@ CaseCtrl gCaseCtrl = { // default
.printQuerySql = true,
.printStmtSql = true,
.autoCreateTbl = false,
.numericParam = false,
.rowNum = 0,
.bindColNum = 0,
.bindTagNum = 0,
......@@ -259,26 +269,22 @@ CaseCtrl gCaseCtrl = {
#if 1
CaseCtrl gCaseCtrl = { // query case with specified col&oper
.bindNullNum = 0,
.bindNullNum = 1,
.printCreateTblSql = false,
.printQuerySql = true,
.printStmtSql = true,
.rowNum = 0,
.bindColNum = 0,
.bindRowNum = 0,
.bindColTypeNum = 0,
.bindColTypeList = NULL,
.optrIdxListNum = 0,
.optrIdxList = NULL,
.optrIdxListNum = tListLen(optrIdxList),
.optrIdxList = optrIdxList,
.bindColTypeNum = tListLen(bindColTypeList),
.bindColTypeList = bindColTypeList,
.checkParamNum = false,
.printRes = true,
.runTimes = 0,
.caseRunIdx = -1,
.optrIdxListNum = 0,
.optrIdxList = NULL,
.bindColTypeNum = 0,
.bindColTypeList = NULL,
.caseIdx = 24,
.caseIdx = 23,
.caseNum = 1,
.caseRunNum = 1,
};
......@@ -303,11 +309,11 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper
.printRes = true,
.runTimes = 0,
.caseRunIdx = -1,
//.optrIdxListNum = tListLen(optrIdxList),
//.optrIdxList = optrIdxList,
//.bindColTypeNum = tListLen(bindColTypeList),
//.bindColTypeList = bindColTypeList,
.caseIdx = 22,
.optrIdxListNum = tListLen(optrIdxList),
.optrIdxList = optrIdxList,
.bindColTypeNum = tListLen(bindColTypeList),
.bindColTypeList = bindColTypeList,
.caseIdx = 24,
.caseNum = 1,
.caseRunNum = 1,
};
......@@ -661,11 +667,11 @@ void bpGenerateConstInFuncSQL(BindData *data, int32_t tblIdx) {
void generateQueryMiscSQL(BindData *data, int32_t tblIdx) {
switch(tblIdx) {
case 0:
bpGenerateConstInOpSQL(data, tblIdx);
break;
case 1:
//TODO FILL TEST
default:
bpGenerateConstInOpSQL(data, tblIdx);
break;
case FUNCTION_TEST_IDX:
bpGenerateConstInFuncSQL(data, tblIdx);
break;
}
......@@ -709,6 +715,16 @@ void generateColDataType(BindData *data, int32_t bindIdx, int32_t colIdx, int32_
} else if (gCurCase->fullCol) {
*dataType = gCurCase->colList[bindIdx];
return;
} else if (gCaseCtrl.numericParam) {
while (true) {
*dataType = rand() % (TSDB_DATA_TYPE_MAX - 1) + 1;
if (!IS_NUMERIC_TYPE(*dataType)) {
continue;
}
break;
}
return;
} else if (0 == colIdx) {
*dataType = TSDB_DATA_TYPE_TIMESTAMP;
return;
......@@ -1046,6 +1062,10 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) {
data->binaryLen[i] = gVarCharLen;
}
if (tblIdx == FUNCTION_TEST_IDX) {
gCaseCtrl.numericParam = true;
}
for (int b = 0; b < bindNum; b++) {
for (int c = 0; c < gCurCase->bindColNum; ++c) {
prepareColData(BP_BIND_COL, data, b*gCurCase->bindColNum+c, b*gCurCase->bindRowNum, c);
......
......@@ -63,7 +63,7 @@
# ---- tstream
./test.sh -f tsim/tstream/basic0.sim
#./test.sh -f tsim/tstream/basic1.sim
./test.sh -f tsim/tstream/basic1.sim
# ---- transaction
./test.sh -f tsim/trans/create_db.sim
......
......@@ -109,6 +109,7 @@ if $data01 != 18.547236991 then
endi
sql select udf2(udf1(f2-f1)), udf2(udf1(f2/f1)) from t2;
print $rows , $data00 , $data01
if $rows != 1 then
return -1
endi
......@@ -118,7 +119,19 @@ endi
if $data01 != 152.420471066 then
return -1
endi
print $rows , $data00 , $data01
sql select udf2(f2) from udf.t2 group by 1-udf1(f1);
print $rows , $data00 , $data10
if $rows != 2 then
return -1
endi
if $data00 != 2.000000000 then
return -1
endi
if $data10 != 12.083045974 then
return -1
endi
sql drop function udf1;
sql show functions;
if $rows != 1 then
......
......@@ -14,10 +14,10 @@ from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
clientCfgDict = {'qdebugflag':'143'}
updatecfgDict = {'clientCfg': {}, 'qdebugflag':'143'}
updatecfgDict["clientCfg"] = clientCfgDict
print ("===================: ", updatecfgDict)
#clientCfgDict = {'qdebugflag':'143'}
#updatecfgDict = {'clientCfg': {}, 'qdebugflag':'143'}
#updatecfgDict["clientCfg"] = clientCfgDict
#print ("===================: ", updatecfgDict)
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
......
......@@ -54,4 +54,4 @@ python3 ./test.py -f 2-query/arccos.py
python3 ./test.py -f 2-query/arctan.py
# python3 ./test.py -f 2-query/query_cols_tags_and_or.py
#python3 ./test.py -f 7-tmq/basic5.py
python3 ./test.py -f 7-tmq/basic5.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册