提交 b18fa5a3 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/dnode

......@@ -29,12 +29,13 @@ extern "C" {
typedef enum {
JOB_TASK_STATUS_NULL = 0,
JOB_TASK_STATUS_NOT_START = 1,
JOB_TASK_STATUS_EXECUTING,
JOB_TASK_STATUS_PARTIAL_SUCCEED,
JOB_TASK_STATUS_SUCCEED,
JOB_TASK_STATUS_FAILED,
JOB_TASK_STATUS_DROPPING,
JOB_TASK_STATUS_INIT,
JOB_TASK_STATUS_EXEC,
JOB_TASK_STATUS_PART_SUCC,
JOB_TASK_STATUS_SUCC,
JOB_TASK_STATUS_FAIL,
JOB_TASK_STATUS_DROP,
JOB_TASK_STATUS_MAX,
} EJobTaskType;
typedef enum {
......@@ -59,10 +60,6 @@ typedef struct STableComInfo {
int32_t rowSize; // row size of the schema
} STableComInfo;
typedef struct SQueryExecRes {
int32_t msgType;
void* res;
} SQueryExecRes;
typedef struct SIndexMeta {
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
......@@ -71,6 +68,13 @@ typedef struct SIndexMeta {
} SIndexMeta;
typedef struct SExecResult {
int32_t code;
uint64_t numOfRows;
int32_t msgType;
void* res;
} SExecResult;
typedef struct STbVerInfo {
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t sversion;
......@@ -210,7 +214,7 @@ char* jobTaskStatusStr(int32_t status);
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
void destroyQueryExecRes(SQueryExecRes* pRes);
void destroyQueryExecRes(SExecResult* pRes);
int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t* len);
char* parseTagDatatoJson(void* p);
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
......
......@@ -53,12 +53,6 @@ typedef struct SQueryProfileSummary {
uint64_t resultSize; // generated result size in Kb.
} SQueryProfileSummary;
typedef struct SQueryResult {
int32_t code;
uint64_t numOfRows;
SQueryExecRes res;
} SQueryResult;
typedef struct STaskInfo {
SQueryNodeAddr addr;
SSubQueryMsg *msg;
......@@ -69,50 +63,34 @@ typedef struct SSchdFetchParam {
int32_t* code;
} SSchdFetchParam;
typedef void (*schedulerExecFp)(SQueryResult* pResult, void* param, int32_t code);
typedef void (*schedulerExecFp)(SExecResult* pResult, void* param, int32_t code);
typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code);
typedef bool (*schedulerChkKillFp)(void* param);
typedef struct SSchedulerReq {
bool syncReq;
SRequestConnInfo *pConn;
SArray *pNodeList;
SQueryPlan *pDag;
const char *sql;
int64_t startTs;
schedulerExecFp execFp;
void* execParam;
schedulerFetchFp fetchFp;
void* cbParam;
schedulerChkKillFp chkKillFp;
void* chkKillParam;
SExecResult* pExecRes;
void** pFetchRes;
} SSchedulerReq;
int32_t schedulerInit(SSchedulerCfg *cfg);
/**
* Process the query job, generated according to the query physical plan.
* This is a synchronized API, and is also thread-safety.
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
* This is a asynchronized API, and is also thread-safety.
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob);
/**
* Fetch query result from the remote query executor
* @param pJob
* @param data
* @return
*/
int32_t schedulerFetchRows(int64_t job, void **data);
int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq);
void schedulerAsyncFetchRows(int64_t job, schedulerFetchFp fp, void* param);
void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param);
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
......@@ -134,7 +112,7 @@ void schedulerFreeJob(int64_t* job, int32_t errCode);
void schedulerDestroy(void);
void schdExecCallback(SQueryResult* pResult, void* param, int32_t code);
void schdExecCallback(SExecResult* pResult, void* param, int32_t code);
#ifdef __cplusplus
}
......
......@@ -32,7 +32,7 @@ extern "C" {
void *taosMemoryMalloc(int32_t size);
void *taosMemoryCalloc(int32_t num, int32_t size);
void *taosMemoryRealloc(void *ptr, int32_t size);
void *taosMemoryStrDup(void *ptr);
void *taosMemoryStrDup(const char *ptr);
void taosMemoryFree(void *ptr);
int32_t taosMemorySize(void *ptr);
void taosPrintBackTrace();
......
......@@ -389,10 +389,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719)
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A)
#define TSDB_CODE_QRY_TASK_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x071B)
//json
#define TSDB_CODE_QRY_JSON_IN_ERROR TAOS_DEF_ERROR_CODE(0, 0x071C)
#define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x071D)
#define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x071E)
#define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x071F)
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)
......
......@@ -156,7 +156,7 @@ typedef struct SResultColumn {
} SResultColumn;
typedef struct SReqResultInfo {
SQueryExecRes execRes;
SExecResult execRes;
const char* pRspMsg;
const char* pData;
TAOS_FIELD* fields; // todo, column names are not needed.
......
......@@ -627,22 +627,26 @@ _return:
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
SQueryResult res = {0};
SExecResult res = {0};
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.execFp = NULL,
.execParam = NULL,
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self};
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res);
pRequest->body.resInfo.execRes = res.res;
SSchedulerReq req = {
.syncReq = true,
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.execFp = NULL,
.cbParam = NULL,
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self,
.pExecRes = &res,
};
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
if (code != TSDB_CODE_SUCCESS) {
schedulerFreeJob(&pRequest->body.queryJob, 0);
......@@ -753,7 +757,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
}
SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp);
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
SExecResult* pRes = &pRequest->body.resInfo.execRes;
switch (pRes->msgType) {
case TDMT_VND_ALTER_TABLE:
......@@ -779,10 +783,10 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
return code;
}
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*)param;
pRequest->code = code;
pRequest->body.resInfo.execRes = pResult->res;
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
TDMT_VND_CREATE_TABLE == pRequest->type) {
......@@ -939,16 +943,20 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
SRequestConnInfo conn = {
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.execFp = schedulerExecCb,
.execParam = pRequest,
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self};
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
SSchedulerReq req = {
.syncReq = false,
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.execFp = schedulerExecCb,
.cbParam = pRequest,
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self,
.pExecRes = NULL,
};
code = schedulerExecJob(&req, &pRequest->body.queryJob);
taosArrayDestroy(pNodeList);
} else {
tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
......@@ -1387,7 +1395,11 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
}
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
SSchedulerReq req = {
.syncReq = true,
.pFetchRes = (void**)&pResInfo->pData,
};
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
return NULL;
......
......@@ -858,7 +858,12 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
}
}
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
SSchedulerReq req = {
.syncReq = false,
.fetchFp = fetchCallback,
.cbParam = pRequest,
};
schedulerFetchRows(pRequest->body.queryJob, &req);
}
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
......
......@@ -266,7 +266,7 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
}
if (pRequest->body.queryFp != NULL) {
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
SExecResult* pRes = &pRequest->body.resInfo.execRes;
if (code == TSDB_CODE_SUCCESS) {
SCatalog* pCatalog = NULL;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tqueue.h"
extern SDataSinkStat gDataSinkStat;
typedef struct SDataInserterBuf {
int32_t useSize;
int32_t allocSize;
char* pData;
} SDataInserterBuf;
typedef struct SDataCacheEntry {
int32_t dataLen;
int32_t numOfRows;
int32_t numOfCols;
int8_t compressed;
char data[];
} SDataCacheEntry;
typedef struct SDataInserterHandle {
SDataSinkHandle sink;
SDataSinkManager* pManager;
SDataBlockDescNode* pSchema;
SDataDeleterNode* pDeleter;
SDeleterParam* pParam;
STaosQueue* pDataBlocks;
SDataInserterBuf nextOutput;
int32_t status;
bool queryEnd;
uint64_t useconds;
uint64_t cachedSize;
TdThreadMutex mutex;
} SDataInserterHandle;
static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
if (tsCompressColData < 0 || 0 == pData->info.rows) {
return false;
}
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
int32_t colSize = pColRes->info.bytes * pData->info.rows;
if (NEEDTO_COMPRESS_QUERY(colSize)) {
return true;
}
}
return false;
}
static void toDataCacheEntry(SDataInserterHandle* pHandle, const SInputData* pInput, SDataInserterBuf* pBuf) {
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = 0;
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
pEntry->dataLen = sizeof(SDeleterRes);
ASSERT(1 == pEntry->numOfRows);
ASSERT(1 == pEntry->numOfCols);
pBuf->useSize = sizeof(SDataCacheEntry);
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
pRes->suid = pHandle->pParam->suid;
pRes->uidList = pHandle->pParam->pUidList;
pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
pRes->affectedRows = *(int64_t*)pColRes->pData;
pBuf->useSize += pEntry->dataLen;
atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
}
static bool allocBuf(SDataInserterHandle* pDeleter, const SInputData* pInput, SDataInserterBuf* pBuf) {
uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) {
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
taosQueueItemSize(pDeleter->pDataBlocks));
return false;
}
pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes);
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
}
return NULL != pBuf->pData;
}
static int32_t updateStatus(SDataInserterHandle* pDeleter) {
taosThreadMutexLock(&pDeleter->mutex);
int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks);
int32_t status =
(0 == blockNums ? DS_BUF_EMPTY
: (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pDeleter->status = status;
taosThreadMutexUnlock(&pDeleter->mutex);
return status;
}
static int32_t getStatus(SDataInserterHandle* pDeleter) {
taosThreadMutexLock(&pDeleter->mutex);
int32_t status = pDeleter->status;
taosThreadMutexUnlock(&pDeleter->mutex);
return status;
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle;
SDataInserterBuf* pBuf = taosAllocateQitem(sizeof(SDataInserterBuf), DEF_QITEM);
if (NULL == pBuf || !allocBuf(pDeleter, pInput, pBuf)) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
toDataCacheEntry(pDeleter, pInput, pBuf);
taosWriteQitem(pDeleter->pDataBlocks, pBuf);
*pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false);
return TSDB_CODE_SUCCESS;
}
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle;
taosThreadMutexLock(&pDeleter->mutex);
pDeleter->queryEnd = true;
pDeleter->useconds = useconds;
taosThreadMutexUnlock(&pDeleter->mutex);
}
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle;
if (taosQueueEmpty(pDeleter->pDataBlocks)) {
*pQueryEnd = pDeleter->queryEnd;
*pLen = 0;
return;
}
SDataInserterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataInserterBuf));
taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen;
*pQueryEnd = pDeleter->queryEnd;
qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle;
if (NULL == pDeleter->nextOutput.pData) {
assert(pDeleter->queryEnd);
pOutput->useconds = pDeleter->useconds;
pOutput->precision = pDeleter->pSchema->precision;
pOutput->bufStatus = DS_BUF_EMPTY;
pOutput->queryEnd = pDeleter->queryEnd;
return TSDB_CODE_SUCCESS;
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed;
atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent
pOutput->bufStatus = updateStatus(pDeleter);
taosThreadMutexLock(&pDeleter->mutex);
pOutput->queryEnd = pDeleter->queryEnd;
pOutput->useconds = pDeleter->useconds;
pOutput->precision = pDeleter->pSchema->precision;
taosThreadMutexUnlock(&pDeleter->mutex);
return TSDB_CODE_SUCCESS;
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
taosMemoryFreeClear(pDeleter->nextOutput.pData);
while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
SDataInserterBuf* pBuf = NULL;
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf);
}
taosCloseQueue(pDeleter->pDataBlocks);
taosThreadMutexDestroy(&pDeleter->mutex);
return TSDB_CODE_SUCCESS;
}
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
*size = atomic_load_64(&pDispatcher->cachedSize);
return TSDB_CODE_SUCCESS;
}
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) {
SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
if (NULL == inserter) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink;
inserter->sink.fPut = putDataBlock;
inserter->sink.fEndPut = endPut;
inserter->sink.fGetLen = getDataLength;
inserter->sink.fGetData = getDataBlock;
inserter->sink.fDestroy = destroyDataSinker;
inserter->sink.fGetCacheSize = getCacheSize;
inserter->pManager = pManager;
inserter->pDeleter = pDeleterNode;
inserter->pSchema = pDataSink->pInputDataBlockDesc;
inserter->pParam = pParam;
inserter->status = DS_BUF_EMPTY;
inserter->queryEnd = false;
inserter->pDataBlocks = taosOpenQueue();
taosThreadMutexInit(&inserter->mutex, NULL);
if (NULL == inserter->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
*pHandle = inserter;
return TSDB_CODE_SUCCESS;
}
......@@ -90,12 +90,14 @@ typedef struct SStddevRes {
double result;
int64_t count;
union {
double quadraticDSum;
int64_t quadraticISum;
double quadraticDSum;
int64_t quadraticISum;
uint64_t quadraticUSum;
};
union {
double dsum;
int64_t isum;
double dsum;
int64_t isum;
uint64_t usum;
};
int16_t type;
} SStddevRes;
......@@ -1729,6 +1731,68 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t* plist = (uint8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t* plist = (uint16_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t* plist = (uint32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t* plist = (uint64_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->usum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float* plist = (float*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
......@@ -1771,9 +1835,12 @@ _stddev_over:
static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) {
pOutput->type = pInput->type;
if (IS_INTEGER_TYPE(pOutput->type)) {
if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) {
pOutput->quadraticISum += pInput->quadraticISum;
pOutput->isum += pInput->isum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pOutput->type)) {
pOutput->quadraticUSum += pInput->quadraticUSum;
pOutput->usum += pInput->usum;
} else {
pOutput->quadraticDSum += pInput->quadraticDSum;
pOutput->dsum += pInput->dsum;
......@@ -1848,6 +1915,22 @@ int32_t stddevInvertFunction(SqlFunctionCtx* pCtx) {
LIST_STDDEV_SUB_N(pStddevRes->isum, int64_t);
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, uint8_t);
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, uint16_t);
break;
}
case TSDB_DATA_TYPE_UINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, uint32_t);
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
LIST_STDDEV_SUB_N(pStddevRes->isum, uint64_t);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
LIST_STDDEV_SUB_N(pStddevRes->dsum, float);
break;
......@@ -1871,9 +1954,12 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t type = pStddevRes->type;
double avg;
if (IS_INTEGER_TYPE(type)) {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
avg = pStddevRes->isum / ((double)pStddevRes->count);
pStddevRes->result = sqrt(fabs(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg));
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
avg = pStddevRes->usum / ((double)pStddevRes->count);
pStddevRes->result = sqrt(fabs(pStddevRes->quadraticUSum / ((double)pStddevRes->count) - avg * avg));
} else {
avg = pStddevRes->dsum / ((double)pStddevRes->count);
pStddevRes->result = sqrt(fabs(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg));
......@@ -1913,9 +1999,12 @@ int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SStddevRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
if (IS_INTEGER_TYPE(type)) {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
pDBuf->isum += pSBuf->isum;
pDBuf->quadraticISum += pSBuf->quadraticISum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
pDBuf->usum += pSBuf->usum;
pDBuf->quadraticUSum += pSBuf->quadraticUSum;
} else {
pDBuf->dsum += pSBuf->dsum;
pDBuf->quadraticDSum += pSBuf->quadraticDSum;
......
......@@ -171,17 +171,17 @@ char* jobTaskStatusStr(int32_t status) {
switch (status) {
case JOB_TASK_STATUS_NULL:
return "NULL";
case JOB_TASK_STATUS_NOT_START:
return "NOT_START";
case JOB_TASK_STATUS_EXECUTING:
case JOB_TASK_STATUS_INIT:
return "INIT";
case JOB_TASK_STATUS_EXEC:
return "EXECUTING";
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
case JOB_TASK_STATUS_PART_SUCC:
return "PARTIAL_SUCCEED";
case JOB_TASK_STATUS_SUCCEED:
case JOB_TASK_STATUS_SUCC:
return "SUCCEED";
case JOB_TASK_STATUS_FAILED:
case JOB_TASK_STATUS_FAIL:
return "FAILED";
case JOB_TASK_STATUS_DROPPING:
case JOB_TASK_STATUS_DROP:
return "DROPPING";
default:
break;
......@@ -200,7 +200,7 @@ SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* nam
return s;
}
void destroyQueryExecRes(SQueryExecRes* pRes) {
void destroyQueryExecRes(SExecResult* pRes) {
if (NULL == pRes || NULL == pRes->res) {
return;
}
......
......@@ -226,8 +226,8 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY(status) \
(status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
(status == JOB_TASK_STATUS_SUCC || status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PART_SUCC)
#define QW_SET_QTID(id, qId, tId, eId) \
do { \
*(uint64_t *)(id) = (qId); \
......
......@@ -19,7 +19,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
int32_t code = 0;
if (oriStatus == newStatus) {
if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
if (newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) {
*ignore = true;
return TSDB_CODE_SUCCESS;
}
......@@ -29,47 +29,47 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
switch (oriStatus) {
case JOB_TASK_STATUS_NULL:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED &&
newStatus != JOB_TASK_STATUS_NOT_START) {
if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL &&
newStatus != JOB_TASK_STATUS_INIT) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_NOT_START:
if (newStatus != JOB_TASK_STATUS_DROPPING && newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_FAILED) {
case JOB_TASK_STATUS_INIT:
if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC
&& newStatus != JOB_TASK_STATUS_FAIL) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_EXECUTING:
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_SUCCEED &&
newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_DROPPING) {
case JOB_TASK_STATUS_EXEC:
if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_SUCC &&
newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_SUCCEED &&
newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_DROPPING) {
case JOB_TASK_STATUS_PART_SUCC:
if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_SUCC &&
newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_SUCCEED:
if (newStatus != JOB_TASK_STATUS_DROPPING && newStatus != JOB_TASK_STATUS_FAILED) {
case JOB_TASK_STATUS_SUCC:
if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_FAIL) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_FAILED:
if (newStatus != JOB_TASK_STATUS_DROPPING) {
case JOB_TASK_STATUS_FAIL:
if (newStatus != JOB_TASK_STATUS_DROP) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_DROPPING:
if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
case JOB_TASK_STATUS_DROP:
if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_PART_SUCC) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
......
......@@ -206,7 +206,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_TASK_DLOG_E("no data in sink and query end");
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp;
......@@ -236,7 +236,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG_E("task all data fetched, done");
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
}
return TSDB_CODE_SUCCESS;
......@@ -330,7 +330,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
break;
}
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
break;
}
case QW_PHASE_PRE_FETCH: {
......@@ -447,7 +447,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
_return:
if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
}
if (rspConnection) {
......@@ -467,7 +467,7 @@ _return:
}
if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
}
QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
......@@ -499,7 +499,7 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->ctrlConnInfo = qwMsg->connInfo;
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_NOT_START));
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
_return:
......@@ -698,7 +698,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_IS_QUERY_RUNNING(ctx)) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
......@@ -749,7 +749,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_IS_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
rsped = true;
......@@ -770,7 +770,7 @@ _return:
QW_UPDATE_RSP_CODE(ctx, code);
}
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
}
if (locked) {
......
......@@ -52,6 +52,7 @@ typedef enum {
SCH_OP_NULL = 0,
SCH_OP_EXEC,
SCH_OP_FETCH,
SCH_OP_GET_STATUS,
} SCH_OP_TYPE;
typedef struct SSchTrans {
......@@ -97,13 +98,30 @@ typedef struct SSchStat {
} SSchStat;
typedef struct SSchResInfo {
SQueryResult* queryRes;
SExecResult* execRes;
void** fetchRes;
schedulerExecFp execFp;
schedulerFetchFp fetchFp;
void* userParam;
void* cbParam;
} SSchResInfo;
typedef struct SSchOpEvent {
SCH_OP_TYPE type;
bool begin;
SSchedulerReq *pReq;
} SSchOpEvent;
typedef int32_t (*schStatusEnterFp)(void* pHandle, void* pParam);
typedef int32_t (*schStatusLeaveFp)(void* pHandle, void* pParam);
typedef int32_t (*schStatusEventFp)(void* pHandle, void* pParam, void* pEvent);
typedef struct SSchStatusFps {
EJobTaskType status;
schStatusEnterFp enterFp;
schStatusLeaveFp leaveFp;
schStatusEventFp eventFp;
} SSchStatusFps;
typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
......@@ -157,7 +175,7 @@ typedef struct SSchLevel {
int32_t taskNum;
int32_t taskLaunchedNum;
int32_t taskDoneNum;
SArray *subTasks; // Element is SQueryTask
SArray *subTasks; // Element is SSchTask
} SSchLevel;
typedef struct SSchTaskProfile {
......@@ -195,12 +213,13 @@ typedef struct SSchTask {
typedef struct SSchJobAttr {
EExplainMode explainMode;
bool queryJob;
bool needFetch;
bool needFlowCtrl;
} SSchJobAttr;
typedef struct {
int32_t op;
bool sync;
bool syncReq;
} SSchOpStatus;
typedef struct SSchJob {
......@@ -231,7 +250,7 @@ typedef struct SSchJob {
SSchTask *fetchTask;
int32_t errCode;
SRWLatch resLock;
SQueryExecRes execRes;
SExecResult execRes;
void *resData; //TODO free it or not
int32_t resNumOfRows;
SSchResInfo userRes;
......@@ -292,19 +311,19 @@ extern SSchedulerMgmt schMgmt;
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.sync)
#define SCH_JOB_IN_ASYNC_EXEC_OP(job) (((job)->opStatus.op == SCH_OP_EXEC) && (!(job)->opStatus.sync))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job) (((job)->opStatus.op == SCH_OP_FETCH) && (!(job)->opStatus.sync))
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.syncReq)
#define SCH_JOB_IN_ASYNC_EXEC_OP(job) ((SCH_OP_EXEC == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_EXEC, SCH_OP_NULL)) && (!(job)->opStatus.syncReq))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job) ((SCH_OP_FETCH == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_FETCH, SCH_OP_NULL)) && (!(job)->opStatus.syncReq))
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job)
#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch)
#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
......@@ -329,9 +348,10 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_WLOG(param, ...) \
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define SCH_SET_ERRNO(_err) do { if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { terrno = (_err); } } while (0)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
......@@ -349,7 +369,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schFetchFromRemote(SSchJob *pJob);
int32_t schLaunchFetchTask(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction);
int32_t schCloneSMsgSendInfo(void *src, void **dst);
......@@ -371,25 +391,45 @@ void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId);
int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
int32_t schExecJobImpl(SSchedulerReq *pReq, SSchJob *pJob, bool sync);
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
int32_t schCancelJob(SSchJob *pJob);
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
uint64_t schGenTaskId(void);
void schCloseJobRef(void);
int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes);
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob);
int32_t schJobFetchRows(SSchJob *pJob);
int32_t schJobFetchRowsA(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
char* schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob);
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes);
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes);
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet);
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode);
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode);
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq);
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_t rId, uint64_t tId);
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask);
bool schJobDone(SSchJob *pJob);
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask);
int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param);
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq);
int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode);
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask);
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode);
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry);
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob);
void schFreeTask(SSchJob *pJob, SSchTask *pTask);
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
#ifdef __cplusplus
......
......@@ -14,16 +14,16 @@
*/
#include "query.h"
#include "schedulerInt.h"
#include "schInt.h"
tsem_t schdRspSem;
void schdExecCallback(SQueryResult* pResult, void* param, int32_t code) {
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
if (code) {
pResult->code = code;
}
*(SQueryResult*)param = *pResult;
*(SExecResult*)param = *pResult;
taosMemoryFree(pResult);
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "schedulerInt.h"
#include "schInt.h"
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
......
此差异已折叠。
......@@ -16,7 +16,7 @@
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schedulerInt.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
......@@ -37,7 +37,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
TMSG_INFO(msgType));
}
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
}
......@@ -51,7 +51,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
......@@ -76,7 +76,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
......@@ -88,9 +88,21 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
}
// Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
int32_t rspCode) {
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
char *msg = pMsg->pData;
int32_t msgSize = pMsg->len;
int32_t msgType = pMsg->msgType;
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId));
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) {
SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode));
}
switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP: {
......@@ -308,7 +320,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schFetchFromRemote(pJob));
SCH_ERR_JRET(schLaunchFetchTask(pJob));
taosMemoryFreeClear(msg);
......@@ -325,7 +337,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
}
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
......@@ -362,65 +374,24 @@ _return:
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
int32_t msgType = pMsg->msgType;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
SSchTask *pTask = NULL;
SSchJob *pJob = NULL;
SSchJob *pJob = schAcquireJob(pParam->refId);
if (NULL == pJob) {
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "taosAcquireRef job failed, may be dropped, refId:0x%" PRIx64,
pParam->queryId, pParam->taskId, pParam->refId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
}
SCH_ERR_JRET(schGetTaskInJob(pJob, pParam->taskId, &pTask));
SCH_LOCK_TASK(pTask);
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
if (pParam->execId != pTask->execId) {
SCH_TASK_DLOG("execId %d mis-match current execId %d", pParam->execId, pTask->execId);
goto _return;
}
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execId));
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp will not be processed cause of job status %s, rspCode:0x%x", jobTaskStatusStr(status), rspCode);
code = atomic_load_32(&pJob->errCode);
goto _return;
}
SCH_ERR_JRET(schValidateReceivedMsgType(pJob, pTask, msgType));
qDebug("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode));
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
if (SCH_NEED_REDIRECT(reqType, rspCode, pMsg->len)) {
code = schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode);
goto _return;
}
SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId));
schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode);
code = schHandleResponseMsg(pJob, pTask, pParam->execId, pMsg, rspCode);
pMsg->pData = NULL;
_return:
if (pTask) {
if (code) {
schProcessOnTaskFailure(pJob, pTask, code);
}
SCH_UNLOCK_TASK(pTask);
}
if (pJob) {
schReleaseJob(pParam->refId);
}
schProcessOnCbEnd(pJob, pTask, code);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(param);
qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode));
SCH_RET(code);
}
......@@ -451,6 +422,37 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
}
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
if (code) {
qError("hb rsp error:%s", tstrerror(code));
SCH_ERR_JRET(code);
}
if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
qError("invalid hb rsp msg, size:%d", pMsg->len);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchTrans trans = {0};
trans.pTrans = pParam->pTrans;
trans.pHandle = pMsg->handle;
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
_return:
tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(param);
SCH_RET(code);
}
int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bool isHb, SSchTrans *trans, void **pParam) {
if (!isHb) {
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
......@@ -692,36 +694,6 @@ _return:
SCH_RET(code);
}
int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
if (code) {
qError("hb rsp error:%s", tstrerror(code));
SCH_ERR_JRET(code);
}
if (tDeserializeSSchedulerHbRsp(pMsg->pData, pMsg->len, &rsp)) {
qError("invalid hb rsp msg, size:%d", pMsg->len);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchTrans trans = {0};
trans.pTrans = pParam->pTrans;
trans.pHandle = pMsg->handle;
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
_return:
tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(param);
SCH_RET(code);
}
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb) {
int32_t code = 0;
int32_t msgType = TDMT_SCH_LINK_BROKEN;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
int32_t code = 0;
SCH_ERR_JRET(schUpdateJobStatus(pJob, status));
switch (status) {
case JOB_TASK_STATUS_INIT:
break;
case JOB_TASK_STATUS_EXEC:
SCH_ERR_JRET(schExecJob(pJob, (SSchedulerReq*)param));
break;
case JOB_TASK_STATUS_PART_SUCC:
SCH_ERR_JRET(schProcessOnJobPartialSuccess(pJob));
break;
case JOB_TASK_STATUS_SUCC:
break;
case JOB_TASK_STATUS_FAIL:
SCH_RET(schProcessOnJobFailure(pJob, (param ? *(int32_t*)param : 0)));
break;
case JOB_TASK_STATUS_DROP:
SCH_ERR_JRET(schProcessOnJobDropped(pJob, *(int32_t*)param));
if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) {
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId);
} else {
SCH_JOB_DLOG("job removed from jobRef list, refId:0x%" PRIx64, pJob->refId);
}
break;
default: {
SCH_JOB_ELOG("unknown job status %d", status);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
}
return TSDB_CODE_SUCCESS;
_return:
SCH_RET(schProcessOnJobFailure(pJob, code));
}
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) {
SSchJob *pJob = schAcquireJob(jobId);
if (NULL == pJob) {
qError("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
*job = pJob;
SCH_RET(schProcessOnOpBegin(pJob, type, pReq));
}
int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
int32_t code = errCode;
if (NULL == pJob) {
SCH_RET(code);
}
schProcessOnOpEnd(pJob, type, pReq, errCode);
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
code = pJob->errCode;
}
schReleaseJob(pJob->refId);
return code;
}
此差异已折叠。
......@@ -16,11 +16,25 @@
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schedulerInt.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) {
qDebug("sch acquire jobId:0x%"PRIx64, refId);
return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
}
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
if (0 == refId) {
return TSDB_CODE_SUCCESS;
}
qDebug("sch release jobId:0x%"PRIx64, refId);
return taosReleaseRef(schMgmt.jobRef, refId);
}
char* schGetOpStr(SCH_OP_TYPE type) {
switch (type) {
case SCH_OP_NULL:
......@@ -29,6 +43,8 @@ char* schGetOpStr(SCH_OP_TYPE type) {
return "EXEC";
case SCH_OP_FETCH:
return "FETCH";
case SCH_OP_GET_STATUS:
return "GET STATUS";
default:
return "UNKNOWN";
}
......@@ -283,3 +299,20 @@ void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo) {
taosMemoryFree(msgSendInfo);
}
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
if (NULL == task || NULL == (*task)) {
return TSDB_CODE_SUCCESS;
}
*pTask = *task;
return TSDB_CODE_SUCCESS;
}
......@@ -16,7 +16,7 @@
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schedulerInt.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
......@@ -67,121 +67,45 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId, SQueryResult *pRes) {
qDebug("scheduler sync exec job start");
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
qDebug("scheduler %s exec job start", pReq->syncReq ? "SYNC" : "ASYNC");
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(pReq, &pJob));
*pJobId = pJob->refId;
SCH_ERR_JRET(schExecJobImpl(pReq, pJob, true));
SCH_ERR_JRET(schInitJob(pJobId, pReq));
_return:
SCH_ERR_JRET(schHandleOpBeginEvent(*pJobId, &pJob, SCH_OP_EXEC, pReq));
if (code && NULL == pJob) {
qDestroyQueryPlan(pReq->pDag);
}
if (pJob) {
schSetJobQueryRes(pJob, pRes);
schReleaseJob(pJob->refId);
}
return code;
}
SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_INIT, pReq));
int32_t schedulerAsyncExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
qDebug("scheduler async exec job start");
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(pReq, &pJob));
*pJobId = pJob->refId;
SCH_ERR_JRET(schExecJobImpl(pReq, pJob, false));
SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_EXEC, pReq));
_return:
if (code && NULL == pJob) {
qDestroyQueryPlan(pReq->pDag);
}
if (pJob) {
schReleaseJob(pJob->refId);
}
return code;
SCH_RET(schHandleOpEndEvent(pJob, SCH_OP_EXEC, pReq, code));
}
int32_t schedulerFetchRows(int64_t job, void **pData) {
qDebug("scheduler sync fetch rows start");
if (NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq) {
qDebug("scheduler %s fetch rows start", pReq->syncReq ? "SYNC" : "ASYNC");
int32_t code = 0;
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_ERR_RET(schBeginOperation(pJob, SCH_OP_FETCH, true));
pJob->userRes.fetchRes = pData;
code = schFetchRows(pJob);
schReleaseJob(job);
SCH_RET(code);
}
void schedulerAsyncFetchRows(int64_t job, schedulerFetchFp fp, void* param) {
qDebug("scheduler async fetch rows start");
SSchJob *pJob = NULL;
int32_t code = 0;
if (NULL == fp || NULL == param) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_JRET(schHandleOpBeginEvent(jobId, &pJob, SCH_OP_FETCH, pReq));
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qError("acquire sch job from job list failed, may be dropped, jobId:0x%" PRIx64, job);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_ERR_JRET(schBeginOperation(pJob, SCH_OP_FETCH, false));
pJob->userRes.fetchFp = fp;
pJob->userRes.userParam = param;
SCH_ERR_JRET(schAsyncFetchRows(pJob));
SCH_ERR_JRET(schJobFetchRows(pJob));
_return:
if (code) {
fp(NULL, param, code);
}
schReleaseJob(job);
SCH_RET(schHandleOpEndEvent(pJob, SCH_OP_FETCH, pReq, code));
}
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) {
int32_t code = 0;
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qDebug("acquire job from jobRef list failed, may not started or dropped, refId:0x%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SSchJob *pJob = NULL;
if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) {
qDebug("job not initialized or not executable job, refId:0x%" PRIx64, job);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_ERR_JRET(schHandleOpBeginEvent(jobId, &pJob, SCH_OP_GET_STATUS, NULL));
for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
......@@ -198,23 +122,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
_return:
schReleaseJob(job);
SCH_RET(code);
}
int32_t scheduleCancelJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
int32_t code = schCancelJob(pJob);
schReleaseJob(job);
SCH_RET(code);
SCH_RET(schHandleOpEndEvent(pJob, SCH_OP_GET_STATUS, NULL, code));
}
void schedulerStopQueryHb(void *pTrans) {
......@@ -225,33 +133,23 @@ void schedulerStopQueryHb(void *pTrans) {
schCleanClusterHb(pTrans);
}
void schedulerFreeJob(int64_t* job, int32_t errCode) {
if (0 == *job) {
void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
if (0 == *jobId) {
return;
}
SSchJob *pJob = schAcquireJob(*job);
SSchJob *pJob = schAcquireJob(*jobId);
if (NULL == pJob) {
qError("acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *job);
*job = 0;
qError("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
return;
}
int32_t code = schProcessOnJobDropped(pJob, errCode);
if (TSDB_CODE_SCH_JOB_IS_DROPPING == code) {
SCH_JOB_DLOG("sch job is already dropping, refId:0x%" PRIx64, *job);
*job = 0;
if (schJobDone(pJob)) {
return;
}
SCH_JOB_DLOG("start to remove job from jobRef list, refId:0x%" PRIx64, *job);
if (taosRemoveRef(schMgmt.jobRef, *job)) {
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, *job);
}
schReleaseJob(*job);
*job = 0;
schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)&errCode);
*jobId = 0;
}
void schedulerDestroy(void) {
......
......@@ -50,7 +50,7 @@
#pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat"
#include "schedulerInt.h"
#include "schInt.h"
#include "stub.h"
#include "tref.h"
......@@ -87,7 +87,7 @@ void schtInitLogFile() {
}
void schtQueryCb(SQueryResult* pResult, void* param, int32_t code) {
void schtQueryCb(SExecResult* pResult, void* param, int32_t code) {
assert(TSDB_CODE_SUCCESS == code);
*(int32_t*)param = 1;
}
......@@ -507,14 +507,15 @@ void* schtRunJobThread(void *aa) {
SRequestConnInfo conn = {0};
conn.pTrans = mockPointer;
SSchedulerReq req = {0};
req.syncReq = false;
req.pConn = &conn;
req.pNodeList = qnodeList;
req.pDag = &dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.execParam = &queryDone;
req.cbParam = &queryDone;
code = schedulerAsyncExecJob(&req, &queryJobRefId);
code = schedulerExecJob(&req, &queryJobRefId);
assert(code == 0);
pJob = schAcquireJob(queryJobRefId);
......@@ -584,7 +585,10 @@ void* schtRunJobThread(void *aa) {
atomic_store_32(&schtStartFetch, 1);
void *data = NULL;
code = schedulerFetchRows(queryJobRefId, &data);
req.syncReq = true;
req.pFetchRes = &data;
code = schedulerFetchRows(queryJobRefId, &req);
assert(code == 0 || code);
if (0 == code) {
......@@ -594,7 +598,7 @@ void* schtRunJobThread(void *aa) {
}
data = NULL;
code = schedulerFetchRows(queryJobRefId, &data);
code = schedulerFetchRows(queryJobRefId, &req);
assert(code == 0 || code);
schtFreeQueryJob(0);
......@@ -658,15 +662,15 @@ TEST(queryTest, normalCase) {
SRequestConnInfo conn = {0};
conn.pTrans = mockPointer;
SSchedulerReq req = {0};
SSchedulerReq req = {0};
req.pConn = &conn;
req.pNodeList = qnodeList;
req.pDag = &dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.execParam = &queryDone;
req.cbParam = &queryDone;
code = schedulerAsyncExecJob(&req, &job);
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -709,7 +713,10 @@ TEST(queryTest, normalCase) {
taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL;
code = schedulerFetchRows(job, &data);
req.syncReq = true;
req.pFetchRes = &data;
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
......@@ -718,7 +725,7 @@ TEST(queryTest, normalCase) {
taosMemoryFreeClear(data);
data = NULL;
code = schedulerFetchRows(job, &data);
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL);
......@@ -768,8 +775,8 @@ TEST(queryTest, readyFirstCase) {
req.pDag = &dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.execParam = &queryDone;
code = schedulerAsyncExecJob(&req, &job);
req.cbParam = &queryDone;
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -813,7 +820,9 @@ TEST(queryTest, readyFirstCase) {
taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL;
code = schedulerFetchRows(job, &data);
req.syncReq = true;
req.pFetchRes = &data;
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
......@@ -822,7 +831,7 @@ TEST(queryTest, readyFirstCase) {
taosMemoryFreeClear(data);
data = NULL;
code = schedulerFetchRows(job, &data);
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL);
......@@ -875,9 +884,9 @@ TEST(queryTest, flowCtrlCase) {
req.pDag = &dag;
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.execParam = &queryDone;
req.cbParam = &queryDone;
code = schedulerAsyncExecJob(&req, &job);
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -925,7 +934,9 @@ TEST(queryTest, flowCtrlCase) {
taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL;
code = schedulerFetchRows(job, &data);
req.syncReq = true;
req.pFetchRes = &data;
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
......@@ -934,7 +945,7 @@ TEST(queryTest, flowCtrlCase) {
taosMemoryFreeClear(data);
data = NULL;
code = schedulerFetchRows(job, &data);
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL);
......@@ -978,7 +989,7 @@ TEST(insertTest, normalCase) {
TdThread thread1;
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
SQueryResult res = {0};
SExecResult res = {0};
SRequestConnInfo conn = {0};
conn.pTrans = mockPointer;
......@@ -988,9 +999,9 @@ TEST(insertTest, normalCase) {
req.pDag = &dag;
req.sql = "insert into tb values(now,1)";
req.execFp = schtQueryCb;
req.execParam = NULL;
req.cbParam = NULL;
code = schedulerExecJob(&req, &insertJobRefId, &res);
code = schedulerExecJob(&req, &insertJobRefId);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20);
......
......@@ -58,6 +58,15 @@ typedef struct TdFile {
#define FILE_WITH_LOCK 1
typedef struct AutoDelFile * AutoDelFilePtr;
typedef struct AutoDelFile {
char *name;
AutoDelFilePtr lastAutoDelFilePtr;
} AutoDelFile;
static TdThreadMutex autoDelFileLock;
static AutoDelFilePtr nowAutoDelFilePtr = NULL;
static TdThreadOnce autoDelFileInit = PTHREAD_ONCE_INIT;
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) {
#ifdef WINDOWS
const char *tdengineTmpFileNamePrefix = "tdengine-";
......@@ -238,7 +247,33 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
return 0;
}
void autoDelFileListAdd(const char *path) { return; }
void autoDelFileList() {
taosThreadMutexLock(&autoDelFileLock);
while (nowAutoDelFilePtr != NULL) {
taosRemoveFile(nowAutoDelFilePtr->name);
AutoDelFilePtr tmp = nowAutoDelFilePtr->lastAutoDelFilePtr;
taosMemoryFree(nowAutoDelFilePtr->name);
taosMemoryFree(nowAutoDelFilePtr);
nowAutoDelFilePtr = tmp;
}
taosThreadMutexUnlock(&autoDelFileLock);
taosThreadMutexDestroy(&autoDelFileLock);
}
void autoDelFileListInit() {
taosThreadMutexInit(&autoDelFileLock, NULL);
atexit(autoDelFileList);
}
void autoDelFileListAdd(const char *path) {
taosThreadOnce(&autoDelFileInit, autoDelFileListInit);
taosThreadMutexLock(&autoDelFileLock);
AutoDelFilePtr tmp = taosMemoryMalloc(sizeof(AutoDelFile));
tmp->lastAutoDelFilePtr = nowAutoDelFilePtr;
tmp->name = taosMemoryStrDup(path);
nowAutoDelFilePtr = tmp;
taosThreadMutexUnlock(&autoDelFileLock);
}
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
int fd = -1;
......@@ -283,10 +318,6 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
}
}
if (tdFileOptions & TD_FILE_AUTO_DEL) {
autoDelFileListAdd(path);
}
TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile));
if (pFile == NULL) {
if (fd >= 0) close(fd);
......@@ -299,6 +330,9 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
pFile->fd = fd;
pFile->fp = fp;
pFile->refId = 0;
if (tdFileOptions & TD_FILE_AUTO_DEL) {
autoDelFileListAdd(path);
}
return pFile;
}
......
......@@ -282,14 +282,14 @@ void *taosMemoryRealloc(void *ptr, int32_t size) {
#endif
}
void *taosMemoryStrDup(void *ptr) {
void *taosMemoryStrDup(const char *ptr) {
#ifdef USE_TD_MEMORY
if (ptr == NULL) return NULL;
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
assert(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL);
void *tmp = tstrdup((const char *)pTdMemoryInfo);
void *tmp = tstrdup(pTdMemoryInfo);
if (tmp == NULL) return NULL;
memcpy(tmp, pTdMemoryInfo, sizeof(TdMemoryInfo));
......@@ -297,7 +297,7 @@ void *taosMemoryStrDup(void *ptr) {
return (char *)tmp + sizeof(TdMemoryInfo);
#else
return tstrdup((const char *)ptr);
return tstrdup(ptr);
#endif
}
......
......@@ -394,6 +394,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_NOT_EXIST, "Job not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_STATUS_ERROR, "Task status error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_ERROR, "Json not support in in/notin operator")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR, "Json not support in this place")
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import time
from datetime import datetime
class GetTime:
def get_ms_timestamp(self,ts_str):
_ts_str = ts_str
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 15 :
_ts_str = ts_str[:-3]
if ':' in _ts_str and '.' in _ts_str:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d %H:%M:%S.%f")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
elif ':' in _ts_str and '.' not in _ts_str:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d %H:%M:%S")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
else:
timestamp = datetime.strptime(_ts_str, "%Y-%m-%d")
date_time = int(int(time.mktime(timestamp.timetuple()))*1000 + timestamp.microsecond/1000)
return date_time
def get_us_timestamp(self,ts_str):
_ts = self.get_ms_timestamp(ts_str) * 1000
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 12:
us_ts = p[12:15]
_ts += int(us_ts)
return _ts
def get_ns_timestamp(self,ts_str):
_ts = self.get_us_timestamp(ts_str) *1000
if " " in ts_str:
p = ts_str.split(" ")[1]
if len(p) > 15:
us_ts = p[15:]
_ts += int(us_ts)
return _ts
def time_transform(self,ts_str,precision):
date_time = []
if precision == 'ms':
for i in ts_str:
date_time.append(self.get_ms_timestamp(i))
elif precision == 'us':
for i in ts_str:
date_time.append(self.get_us_timestamp(i))
elif precision == 'ns':
for i in ts_str:
date_time.append(self.get_ns_timestamp(i))
return date_time
\ No newline at end of file
......@@ -108,8 +108,8 @@ class TDTestCase:
tdSql.error(f'alter stable {self.stbname}_{i} add column {key} {values}')
tdSql.error(f'alter stable {self.stbname}_{i} drop column {key}')
#! bug TD-16921
#tdSql.error(f'alter stable {self.ntbname} add column {key} {values}')
#tdSql.error(f'alter stable {self.ntbname} drop column {key}')
tdSql.error(f'alter stable {self.ntbname} add column {key} {values}')
tdSql.error(f'alter stable {self.ntbname} drop column {key}')
tdSql.execute(f'alter stable {self.stbname} drop column {key}')
tdSql.query(f'describe {self.stbname}')
tdSql.checkRows(len(self.column_dict)+len(self.tag_dict))
......@@ -132,7 +132,7 @@ class TDTestCase:
tdSql.checkEqual(result[0][2],self.binary_length+1)
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
#! bug TD-16921
# tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
elif 'nchar' in values.lower():
v = f'nchar({self.binary_length+1})'
v_error = f'nchar({self.binary_length-1})'
......@@ -147,11 +147,11 @@ class TDTestCase:
tdSql.checkEqual(result[0][2],self.binary_length+1)
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
#! bug TD-16921
#tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
else:
for v in self.column_dict.values():
tdSql.error(f'alter stable {self.stbname} modify column {key} {v}')
# tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
tdSql.error(f'alter stable {self.ntbname} modify column {key} {v}')
for i in range(self.tbnum):
tdSql.error(f'alter stable {self.stbname}_{i} modify column {key} {v}')
def run(self):
......
from util.log import *
from util.sql import *
from util.cases import *
from util.gettime import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
self.get_time = GetTime()
self.ts_str = [
'2020-1-1',
'2020-2-1 00:00:01',
'2020-3-1 00:00:00.001',
'2020-4-1 00:00:00.001002',
'2020-5-1 00:00:00.001002001'
]
self.db_param_precision = ['ms','us','ns']
self.time_unit = ['1w','1d','1h','1m','1s','1a','1u','1b']
self.error_unit = ['2w','2d','2h','2m','2s','2a','2u','1c','#1']
self.ntbname = 'ntb'
self.stbname = 'stb'
self.ctbname = 'ctb'
self.subtractor = 1 # unit:s
def check_tbtype(self,tb_type):
if tb_type.lower() == 'ntb':
tdSql.query(f'select timediff(ts,{self.subtractor}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timediff(ts,{self.subtractor}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.query(f'select timediff(ts,{self.subtractor}) from {self.stbname}')
def check_tb_type(self,unit,tb_type):
if tb_type.lower() == 'ntb':
tdSql.query(f'select timediff(ts,{self.subtractor},{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.query(f'select timediff(ts,{self.subtractor},{unit}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.query(f'select timediff(ts,{self.subtractor},{unit}) from {self.stbname}')
def data_check(self,date_time,precision,tb_type):
for unit in self.time_unit:
if (unit.lower() == '1u' and precision.lower() == 'ms') or (unit.lower() == '1b' and precision.lower() == 'us') or (unit.lower() == '1b' and precision.lower() == 'ms'):
if tb_type.lower() == 'ntb':
tdSql.error(f'select timediff(ts,{self.subtractor},{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.error(f'select timediff(ts,{self.subtractor},{unit}) from {self.ctbname}')
elif tb_type.lower() == 'stb':
tdSql.error(f'select timediff(ts,{self.subtractor},{unit}) from {self.stbname}')
elif precision.lower() == 'ms':
self.check_tb_type(unit,tb_type)
tdSql.checkRows(len(self.ts_str))
if unit.lower() == '1a':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i])-self.subtractor*1000)
elif unit.lower() == '1s':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]/1000)-self.subtractor)
elif unit.lower() == '1m':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000)-self.subtractor)/60))
elif unit.lower() == '1h':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000)-self.subtractor)/60/60))
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000)-self.subtractor)/60/60/24))
elif unit.lower() == '1w':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000)-self.subtractor)/60/60/24/7))
self.check_tbtype(tb_type)
tdSql.checkRows(len(self.ts_str))
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i])-self.subtractor*1000)
elif precision.lower() == 'us':
self.check_tb_type(unit,tb_type)
tdSql.checkRows(len(self.ts_str))
if unit.lower() == '1w':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000)-self.subtractor)/60/60/24/7))
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000)-self.subtractor)/60/60/24))
elif unit.lower() == '1h':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000)-self.subtractor)/60/60))
elif unit.lower() == '1m':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000)-self.subtractor)/60))
elif unit.lower() == '1s':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000)-self.subtractor)))
elif unit.lower() == '1a':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000)-self.subtractor*1000)))
elif unit.lower() == '1u':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i])-self.subtractor*1000000)))
self.check_tbtype(tb_type)
tdSql.checkRows(len(self.ts_str))
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i])-self.subtractor*1000000)))
elif precision.lower() == 'ns':
self.check_tb_type(unit,tb_type)
tdSql.checkRows(len(self.ts_str))
if unit.lower() == '1w':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000000)-self.subtractor)/60/60/24/7))
elif unit.lower() == '1d':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000000)-self.subtractor)/60/60/24))
elif unit.lower() == '1h':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000000)-self.subtractor)/60/60))
elif unit.lower() == '1m':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000000)-self.subtractor)/60))
elif unit.lower() == '1s':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000000)-self.subtractor)))
elif unit.lower() == '1a':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000)-self.subtractor*1000)))
elif unit.lower() == '1u':
for i in range(len(self.ts_str)):
tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000)-self.subtractor*1000000)))
# self.check_tbtype(tb_type)
# tdSql.checkRows(len(self.ts_str))
# for i in range(len(self.ts_str)):
# tdSql.checkEqual(tdSql.queryResult[i][0],int(((date_time[i]/1000000)-self.subtractor*1000000000)))
for unit in self.error_unit:
if tb_type.lower() == 'ntb':
tdSql.error(f'select timediff(ts,{self.subtractor},{unit}) from {self.ntbname}')
tdSql.error(f'select timediff(c0,{self.subtractor},{unit}) from {self.ntbname}')
elif tb_type.lower() == 'ctb':
tdSql.error(f'select timediff(ts,{self.subtractor},{unit}) from {self.ctbname}')
tdSql.error(f'select timediff(c0,{self.subtractor},{unit}) from {self.ntbname}')
elif tb_type.lower() == 'stb':
tdSql.error(f'select timediff(ts,{self.subtractor},{unit}) from {self.stbname}')
tdSql.error(f'select timediff(c0,{self.subtractor},{unit}) from {self.ntbname}')
def function_check_ntb(self):
for precision in self.db_param_precision:
tdSql.execute('drop database if exists db')
tdSql.execute(f'create database db precision "{precision}"')
tdSql.execute('use db')
tdSql.execute(f'create table {self.ntbname} (ts timestamp,c0 int)')
for ts in self.ts_str:
tdSql.execute(f'insert into {self.ntbname} values("{ts}",1)')
for unit in self.error_unit:
tdSql.error(f'select timediff(ts,{self.subtractor},{unit}) from {self.ntbname}')
date_time = self.get_time.time_transform(self.ts_str,precision)
self.data_check(date_time,precision,'ntb')
def function_check_stb(self):
for precision in self.db_param_precision:
tdSql.execute('drop database if exists db')
tdSql.execute(f'create database db precision "{precision}"')
tdSql.execute('use db')
tdSql.execute(f'create table {self.stbname} (ts timestamp,c0 int) tags(t0 int)')
tdSql.execute(f'create table {self.ctbname} using {self.stbname} tags(1)')
for ts in self.ts_str:
tdSql.execute(f'insert into {self.ctbname} values("{ts}",1)')
date_time = self.get_time.time_transform(self.ts_str,precision)
self.data_check(date_time,precision,'ctb')
self.data_check(date_time,precision,'stb')
def run(self): # sourcery skip: extract-duplicate-method
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create tables==========")
tdSql.execute(
'''create table if not exists ntb
(ts timestamp, c1 int, c2 float,c3 double,c4 timestamp)
'''
)
tdSql.execute(
'''create table if not exists stb
(ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) tags(t0 int)
'''
)
tdSql.execute(
'''create table if not exists stb_1 using stb tags(100)
'''
)
tdLog.printNoPrefix("==========step2:insert data into ntb==========")
# RFC3339:2020-01-01T00:00:00+8:00
# ISO8601:2020-01-01T00:00:00.000+0800
tdSql.execute(
'insert into ntb values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())')
tdSql.execute(
'insert into stb_1 values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())')
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00') from ntb")
tdSql.checkRows(3)
tdSql.query("select timediff(1,0,1d) from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff(1,0,1d) from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff(1,0,1s) from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1)
tdSql.query("select timediff(1,0,1s) from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1)
tdSql.query("select timediff(1,0,1w) from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff(1,0,1w) from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff(1,0,1h) from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff(1,0,1h) from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff(1,0,1m) from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff(1,0,1m) from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff(1,0,1a) from ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1000)
tdSql.query("select timediff(1,0,1a) from db.ntb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1000)
tdSql.error("select timediff(1,0,1u) from ntb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,1000000)
tdSql.error("select timediff(1,0,1u) from db.ntb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,1000000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00') from stb")
tdSql.checkRows(3)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00') from db.stb")
tdSql.checkRows(3)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1d) from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1d) from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1h) from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,24)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1h) from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,24)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1w) from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1m) from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1440)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1m) from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,1440)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1s) from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,86400)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1s) from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,86400)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1a) from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,86400000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1a) from db.stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,86400000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1u) from stb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,86400000000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-2 00:00:00',1u) from db.stb")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,86400000000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00') from stb_1")
tdSql.checkRows(3)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00') from db.stb_1")
tdSql.checkRows(3)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1w) from stb_1 ")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1w) from db.stb_1 ")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1d) from stb_1 ")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1d) from db.stb_1 ")
tdSql.checkRows(3)
tdSql.checkData(0,0,0)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1h) from stb_1 ")
tdSql.checkRows(3)
tdSql.checkData(0,0,12)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1h) from db.stb_1 ")
tdSql.checkRows(3)
tdSql.checkData(0,0,12)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1m) from stb_1" )
tdSql.checkRows(3)
tdSql.checkData(0,0,720)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1m) from db.stb_1" )
tdSql.checkRows(3)
tdSql.checkData(0,0,720)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1s) from stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,43200)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1s) from db.stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,43200)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1a) from stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,43200000)
tdSql.query("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1a) from db.stb_1")
tdSql.checkRows(3)
tdSql.checkData(0,0,43200000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1u) from stb_1")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,43200000000)
tdSql.error("select timediff('2020-1-1 00:00:00','2020-1-1 12:00:00',1u) from db.stb_1")
#tdSql.checkRows(3)
#tdSql.checkData(0,0,43200000000)
tdSql.query("select timediff('a','b') from stb")
tdSql.checkRows(3)
tdSql.checkData(0,0,None)
tdSql.checkData(1,0,None)
tdSql.checkData(2,0,None)
tdSql.error("select timediff(1.5,1.5) from stb")
tdSql.error("select timediff(1) from stb")
tdSql.error("select timediff(10,1,1.5) from stb")
# tdSql.error("select timediff(10,1,2s) from stb")
# tdSql.error("select timedifff(10,1,c1) from stb")
tdSql.error("select timediff(1.5,1.5) from stb_1")
tdSql.error("select timediff(1) from stb_1")
tdSql.error("select timediff(10,1,1.5) from stb_1")
# tdSql.error("select timediff(10,1,2s) from stb_1")
# tdSql.error("select timedifff(10,1,c1) from stb_1")
tdSql.error("select timediff(1.5,1.5) from ntb")
tdSql.error("select timediff(1) from ntb")
tdSql.error("select timediff(10,1,1.5) from ntb")
# tdSql.error("select timediff(10,1,2s) from ntb")
# tdSql.error("select timedifff(10,1,c1) from ntb")
self.function_check_ntb()
self.function_check_stb()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册