提交 6ba66d58 编写于 作者: sangshuduo's avatar sangshuduo

Merge branch 'develop' into hotfix/sangshuduo/TD-3215-bus-error-arm32

...@@ -46,6 +46,7 @@ def pre_test(){ ...@@ -46,6 +46,7 @@ def pre_test(){
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|//src//connector|Jenkinsfile' || exit 0 git --no-pager diff --name-only FETCH_HEAD $(git merge-base FETCH_HEAD develop)|grep -v -E '.*md|//src//connector|Jenkinsfile' || exit 0
find ${WKC}/tests/pytest -name \'*\'.sql -exec rm -rf {} \\;
cd ${WK} cd ${WK}
git reset --hard HEAD~10 git reset --hard HEAD~10
git checkout develop git checkout develop
...@@ -115,7 +116,6 @@ pipeline { ...@@ -115,7 +116,6 @@ pipeline {
sh ''' sh '''
date date
cd ${WKC}/tests cd ${WKC}/tests
find pytest -name '*'sql|xargs rm -rf
./test-all.sh p1 ./test-all.sh p1
date''' date'''
} }
...@@ -131,7 +131,6 @@ pipeline { ...@@ -131,7 +131,6 @@ pipeline {
sh ''' sh '''
date date
cd ${WKC}/tests cd ${WKC}/tests
find pytest -name '*'sql|xargs rm -rf
./test-all.sh p2 ./test-all.sh p2
date''' date'''
} }
......
...@@ -133,6 +133,7 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo); ...@@ -133,6 +133,7 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo); bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
SSqlExpr* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SSqlExpr* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType); SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType);
......
...@@ -100,6 +100,10 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr ...@@ -100,6 +100,10 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
} else if (functionId == TSDB_FUNC_APERCT) { } else if (functionId == TSDB_FUNC_APERCT) {
pCtx->param[0].i64 = pExpr->param[0].i64; pCtx->param[0].i64 = pExpr->param[0].i64;
pCtx->param[0].nType = pExpr->param[0].nType; pCtx->param[0].nType = pExpr->param[0].nType;
} else if (functionId == TSDB_FUNC_BLKINFO) {
pCtx->param[0].i64 = pExpr->param[0].i64;
pCtx->param[0].nType = pExpr->param[0].nType;
pCtx->numOfParams = 1;
} }
pCtx->interBufBytes = pExpr->interBytes; pCtx->interBufBytes = pExpr->interBytes;
...@@ -951,10 +955,10 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp ...@@ -951,10 +955,10 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
// todo extract function // todo extract function
int64_t actualETime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey; int64_t actualETime = (pQueryInfo->order.order == TSDB_ORDER_ASC)? pQueryInfo->window.ekey: pQueryInfo->window.skey;
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput); void** pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalMerge->resColModel->capacity); pResPages[i] = calloc(1, pField->bytes * pLocalMerge->resColModel->capacity);
} }
while (1) { while (1) {
...@@ -966,7 +970,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp ...@@ -966,7 +970,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
if (pQueryInfo->limit.offset > 0) { if (pQueryInfo->limit.offset > 0) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset, memmove(pResPages[i], ((char*)pResPages[i]) + pField->bytes * pQueryInfo->limit.offset,
(size_t)(newRows * pField->bytes)); (size_t)(newRows * pField->bytes));
} }
} }
...@@ -1010,7 +1014,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp ...@@ -1010,7 +1014,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
int32_t offset = 0; int32_t offset = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, (size_t)(pField->bytes * pRes->numOfRows)); memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i], (size_t)(pField->bytes * pRes->numOfRows));
offset += pField->bytes; offset += pField->bytes;
} }
......
此差异已折叠。
...@@ -497,8 +497,6 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -497,8 +497,6 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code; return pSql->res.code;
} }
} else if (pCmd->command >= TSDB_SQL_LOCAL) { } else if (pCmd->command >= TSDB_SQL_LOCAL) {
//pSql->epSet = tscMgmtEpSet;
// } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql); return (*tscProcessMsgRsp[pCmd->command])(pSql);
} }
...@@ -705,7 +703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -705,7 +703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList); size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) { if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo) && !tscQueryBlockInfo(pQueryInfo)) {
tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols, tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", pSql, (uint64_t)numOfSrcCols,
tscGetNumOfColumns(pTableMeta)); tscGetNumOfColumns(pTableMeta));
...@@ -835,13 +833,31 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -835,13 +833,31 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex); pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag); pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) {
pSqlFuncExpr->colType = htons(pExpr->resType);
pSqlFuncExpr->colBytes = htons(pExpr->resBytes);
} else if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
SSchema *s = tGetTbnameColumnSchema();
pSqlFuncExpr->colType = htons(s->type);
pSqlFuncExpr->colBytes = htons(s->bytes);
} else if (pExpr->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
pSqlFuncExpr->colType = htons(s.type);
pSqlFuncExpr->colBytes = htons(s.bytes);
} else {
SSchema* s = tscGetColumnSchemaById(pTableMeta, pExpr->colInfo.colId);
pSqlFuncExpr->colType = htons(s->type);
pSqlFuncExpr->colBytes = htons(s->bytes);
}
pSqlFuncExpr->functionId = htons(pExpr->functionId); pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams); pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pSqlFuncExpr->resColId = htons(pExpr->resColId); pSqlFuncExpr->resColId = htons(pExpr->resColId);
pMsg += sizeof(SSqlFuncMsg); pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
// todo add log
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen); pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);
...@@ -866,6 +882,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -866,6 +882,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
for (int32_t i = 0; i < output; ++i) { for (int32_t i = 0; i < output; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i); SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SSqlExpr *pExpr = pField->pSqlExpr; SSqlExpr *pExpr = pField->pSqlExpr;
// this should be switched to projection query
if (pExpr != NULL) { if (pExpr != NULL) {
// the queried table has been removed and a new table with the same name has already been created already // the queried table has been removed and a new table with the same name has already been created already
// return error msg // return error msg
...@@ -879,27 +897,25 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -879,27 +897,25 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
pSqlFuncExpr1->colInfo.colId = htons(pExpr->colInfo.colId); pSqlFuncExpr1->numOfParams = 0; // no params for projection query
pSqlFuncExpr1->colInfo.colIndex = htons(pExpr->colInfo.colIndex); pSqlFuncExpr1->functionId = htons(TSDB_FUNC_PRJ);
pSqlFuncExpr1->colInfo.flag = htons(pExpr->colInfo.flag); pSqlFuncExpr1->colInfo.colId = htons(pExpr->resColId);
pSqlFuncExpr1->colInfo.flag = htons(TSDB_COL_NORMAL);
pSqlFuncExpr1->functionId = htons(pExpr->functionId);
pSqlFuncExpr1->numOfParams = htons(pExpr->numOfParams);
pMsg += sizeof(SSqlFuncMsg);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { bool assign = false;
// todo add log for (int32_t f = 0; f < tscSqlExprNumOfExprs(pQueryInfo); ++f) {
pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); SSqlExpr *pe = tscSqlExprGet(pQueryInfo, f);
pSqlFuncExpr1->arg[j].argBytes = htons(pExpr->param[j].nLen); if (pe == pExpr) {
pSqlFuncExpr1->colInfo.colIndex = htons(f);
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) { pSqlFuncExpr1->colType = htons(pe->resType);
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen); pSqlFuncExpr1->colBytes = htons(pe->resBytes);
pMsg += pExpr->param[j].nLen; assign = true;
} else { break;
pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64);
} }
} }
assert(assign);
pMsg += sizeof(SSqlFuncMsg);
pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg; pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
} else { } else {
assert(pField->pArithExprInfo != NULL); assert(pField->pArithExprInfo != NULL);
......
...@@ -503,9 +503,19 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -503,9 +503,19 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single tabel subscription if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription
pQueryInfo->window.skey = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, 0))->key;
tscDebug("subscribe:%s set subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey); size_t size = taosArrayGetSize(pSub->progress);
TSKEY s = INT64_MAX;
for(int32_t i = 0; i < size; ++i) {
TSKEY k = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, i))->key;
if (s > k) {
s = k;
}
}
pQueryInfo->window.skey = s;
tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey);
} }
if (pSub->pTimer == NULL) { if (pSub->pTimer == NULL) {
......
...@@ -74,14 +74,14 @@ static bool allSubqueryDone(SSqlObj *pParentSql) { ...@@ -74,14 +74,14 @@ static bool allSubqueryDone(SSqlObj *pParentSql) {
SSubqueryState *subState = &pParentSql->subState; SSubqueryState *subState = &pParentSql->subState;
//lock in caller //lock in caller
tscDebug("%p total subqueries: %d", pParentSql, subState->numOfSub);
for (int i = 0; i < subState->numOfSub; i++) { for (int i = 0; i < subState->numOfSub; i++) {
if (0 == subState->states[i]) { if (0 == subState->states[i]) {
tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub); tscDebug("%p subquery:%p, index: %d NOT finished, abort query completion check", pParentSql, pParentSql->pSubs[i], i);
done = false; done = false;
break; break;
} else { } else {
tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub); tscDebug("%p subquery:%p, index: %d finished", pParentSql, pParentSql->pSubs[i], i);
} }
} }
...@@ -453,7 +453,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -453,7 +453,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pSubQueryInfo->tsBuf = NULL; pSubQueryInfo->tsBuf = NULL;
// free result for async object will also free sqlObj // free result for async object will also free sqlObj
assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one resutl columns assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one result columns
taos_free_result(pPrevSub); taos_free_result(pPrevSub);
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL); SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
...@@ -507,6 +507,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -507,6 +507,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t funcId = pExpr->functionId; int16_t funcId = pExpr->functionId;
// add the invisible timestamp column
if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) || if ((pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
(funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) { (funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {
...@@ -847,6 +848,8 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -847,6 +848,8 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// todo, the type may not include TSDB_QUERY_TYPE_TAG_FILTER_QUERY
assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)); assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
...@@ -1880,6 +1883,13 @@ void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, S ...@@ -1880,6 +1883,13 @@ void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, S
} }
} }
if (p && taosArrayGetSize(p) > 0) {
SResPair *l = taosArrayGetLast(p);
if (l->key == key && key == INT64_MIN) {
continue;
}
}
//append a new column //append a new column
if (p == NULL) { if (p == NULL) {
SStddevInterResult t = {.colId = id, .pResult = taosArrayInit(10, sizeof(SResPair)),}; SStddevInterResult t = {.colId = id, .pResult = taosArrayInit(10, sizeof(SResPair)),};
...@@ -2643,6 +2653,11 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo ...@@ -2643,6 +2653,11 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
// clear the limit/offset info, since it should not be sent to vnode to be executed.
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0;
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub); assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub);
// launch subquery for each vnode, so the subquery index equals to the vgroupIndex. // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
...@@ -3102,30 +3117,6 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -3102,30 +3117,6 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
} }
} }
static UNUSED_FUNC void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (pRes->tsrow[columnIndex] != NULL && pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[columnIndex] == NULL) {
pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
}
/* string terminated char for binary data*/
memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
int32_t length = taosUcs4ToMbs(pRes->tsrow[columnIndex], pRes->length[columnIndex], pRes->buffer[columnIndex]);
if ( length >= 0 ) {
pRes->tsrow[columnIndex] = (unsigned char*)pRes->buffer[columnIndex];
pRes->length[columnIndex] = length;
} else {
tscError("%p charset:%s to %s. val:%s convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)pRes->tsrow[columnIndex]);
pRes->tsrow[columnIndex] = NULL;
pRes->length[columnIndex] = 0;
}
}
}
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId) { char *getArithmeticInputSrc(void *param, const char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *) param; SArithmeticSupport *pSupport = (SArithmeticSupport *) param;
......
...@@ -97,6 +97,22 @@ bool tscQueryTags(SQueryInfo* pQueryInfo) { ...@@ -97,6 +97,22 @@ bool tscQueryTags(SQueryInfo* pQueryInfo) {
return true; return true;
} }
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo) {
int32_t numOfCols = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t functId = pExpr->functionId;
// "select count(tbname)" query
if (functId == TSDB_FUNC_BLKINFO) {
return true;
}
}
return false;
}
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
if (pQueryInfo == NULL) { if (pQueryInfo == NULL) {
return false; return false;
...@@ -1725,7 +1741,12 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { ...@@ -1725,7 +1741,12 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX; pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX;
pQueryInfo->resColumnId= -1000; pQueryInfo->resColumnId = -1000;
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0;
pQueryInfo->slimit.limit = -1;
pQueryInfo->slimit.offset = 0;
} }
int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
......
...@@ -33,7 +33,7 @@ typedef struct SDataStatis { ...@@ -33,7 +33,7 @@ typedef struct SDataStatis {
typedef struct SColumnInfoData { typedef struct SColumnInfoData {
SColumnInfo info; SColumnInfo info;
void* pData; // the corresponding block data in memory char* pData; // the corresponding block data in memory
} SColumnInfoData; } SColumnInfoData;
typedef struct SResPair { typedef struct SResPair {
......
...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh: ...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup( setuptools.setup(
name="taos", name="taos",
version="2.0.6", version="2.0.7",
author="Taosdata Inc.", author="Taosdata Inc.",
author_email="support@taosdata.com", author_email="support@taosdata.com",
description="TDengine python client package", description="TDengine python client package",
......
...@@ -22,10 +22,10 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False): ...@@ -22,10 +22,10 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
if num_of_rows > 0: if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast( return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)])) data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]))
else: else:
return list(map(_timestamp_converter, ctypes.cast( return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)])) data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]))
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False): def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
...@@ -145,10 +145,10 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False): ...@@ -145,10 +145,10 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
""" """
if num_of_rows > 0: if num_of_rows > 0:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast( return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]] data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
else: else:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast( return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]] data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
def _crow_bigint_unsigned_to_python( def _crow_bigint_unsigned_to_python(
...@@ -162,13 +162,13 @@ def _crow_bigint_unsigned_to_python( ...@@ -162,13 +162,13 @@ def _crow_bigint_unsigned_to_python(
return [ return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast( None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER( data, ctypes.POINTER(
ctypes.c_ulong))[ ctypes.c_uint64))[
:abs(num_of_rows)]] :abs(num_of_rows)]]
else: else:
return [ return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast( None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER( data, ctypes.POINTER(
ctypes.c_ulong))[ ctypes.c_uint64))[
:abs(num_of_rows)]] :abs(num_of_rows)]]
...@@ -600,7 +600,7 @@ class CTaosInterface(object): ...@@ -600,7 +600,7 @@ class CTaosInterface(object):
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE):
...@@ -608,7 +608,7 @@ class CTaosInterface(object): ...@@ -608,7 +608,7 @@ class CTaosInterface(object):
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY):
# return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00') # return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00')
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00') # return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
......
...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh: ...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup( setuptools.setup(
name="taos", name="taos",
version="2.0.5", version="2.0.7",
author="Taosdata Inc.", author="Taosdata Inc.",
author_email="support@taosdata.com", author_email="support@taosdata.com",
description="TDengine python client package", description="TDengine python client package",
......
...@@ -22,10 +22,10 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False): ...@@ -22,10 +22,10 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
if num_of_rows > 0: if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast( return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)])) data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]))
else: else:
return list(map(_timestamp_converter, ctypes.cast( return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)])) data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]))
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False): def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
...@@ -145,10 +145,10 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False): ...@@ -145,10 +145,10 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
""" """
if num_of_rows > 0: if num_of_rows > 0:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast( return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]] data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
else: else:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast( return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]] data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
def _crow_bigint_unsigned_to_python( def _crow_bigint_unsigned_to_python(
...@@ -162,13 +162,13 @@ def _crow_bigint_unsigned_to_python( ...@@ -162,13 +162,13 @@ def _crow_bigint_unsigned_to_python(
return [ return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast( None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER( data, ctypes.POINTER(
ctypes.c_ulong))[ ctypes.c_uint64))[
:abs(num_of_rows)]] :abs(num_of_rows)]]
else: else:
return [ return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast( None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER( data, ctypes.POINTER(
ctypes.c_ulong))[ ctypes.c_uint64))[
:abs(num_of_rows)]] :abs(num_of_rows)]]
...@@ -600,7 +600,7 @@ class CTaosInterface(object): ...@@ -600,7 +600,7 @@ class CTaosInterface(object):
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE):
...@@ -608,7 +608,7 @@ class CTaosInterface(object): ...@@ -608,7 +608,7 @@ class CTaosInterface(object):
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY):
# return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00') # return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00')
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00') # return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
......
from .cinterface import CTaosInterface from .cinterface import CTaosInterface
from .error import * from .error import *
from .constants import FieldType from .constants import FieldType
import threading
# querySeqNum = 0 # querySeqNum = 0
...@@ -38,7 +37,6 @@ class TDengineCursor(object): ...@@ -38,7 +37,6 @@ class TDengineCursor(object):
self._block_iter = 0 self._block_iter = 0
self._affected_rows = 0 self._affected_rows = 0
self._logfile = "" self._logfile = ""
self._threadId = threading.get_ident()
if connection is not None: if connection is not None:
self._connection = connection self._connection = connection
...@@ -105,12 +103,6 @@ class TDengineCursor(object): ...@@ -105,12 +103,6 @@ class TDengineCursor(object):
def execute(self, operation, params=None): def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command). """Prepare and execute a database operation (query or command).
""" """
# if threading.get_ident() != self._threadId:
# info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
if not operation: if not operation:
return None return None
...@@ -280,12 +272,6 @@ class TDengineCursor(object): ...@@ -280,12 +272,6 @@ class TDengineCursor(object):
def _handle_result(self): def _handle_result(self):
"""Handle the return result from query. """Handle the return result from query.
""" """
# if threading.get_ident() != self._threadId:
# info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
self._description = [] self._description = []
for ele in self._fields: for ele in self._fields:
self._description.append( self._description.append(
......
...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh: ...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup( setuptools.setup(
name="taos", name="taos",
version="2.0.5", version="2.0.7",
author="Taosdata Inc.", author="Taosdata Inc.",
author_email="support@taosdata.com", author_email="support@taosdata.com",
description="TDengine python client package", description="TDengine python client package",
......
...@@ -22,10 +22,10 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False): ...@@ -22,10 +22,10 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
if num_of_rows > 0: if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast( return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)])) data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]))
else: else:
return list(map(_timestamp_converter, ctypes.cast( return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)])) data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]))
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False): def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
...@@ -145,10 +145,10 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False): ...@@ -145,10 +145,10 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
""" """
if num_of_rows > 0: if num_of_rows > 0:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast( return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]] data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
else: else:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast( return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]] data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
def _crow_bigint_unsigned_to_python( def _crow_bigint_unsigned_to_python(
...@@ -162,13 +162,13 @@ def _crow_bigint_unsigned_to_python( ...@@ -162,13 +162,13 @@ def _crow_bigint_unsigned_to_python(
return [ return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast( None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER( data, ctypes.POINTER(
ctypes.c_ulong))[ ctypes.c_uint64))[
:abs(num_of_rows)]] :abs(num_of_rows)]]
else: else:
return [ return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast( None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER( data, ctypes.POINTER(
ctypes.c_ulong))[ ctypes.c_uint64))[
:abs(num_of_rows)]] :abs(num_of_rows)]]
...@@ -600,7 +600,7 @@ class CTaosInterface(object): ...@@ -600,7 +600,7 @@ class CTaosInterface(object):
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE):
...@@ -608,7 +608,7 @@ class CTaosInterface(object): ...@@ -608,7 +608,7 @@ class CTaosInterface(object):
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY):
# return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00') # return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00')
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00') # return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
......
from .cinterface import CTaosInterface from .cinterface import CTaosInterface
from .error import * from .error import *
from .constants import FieldType from .constants import FieldType
import threading
# querySeqNum = 0 # querySeqNum = 0
...@@ -38,7 +37,6 @@ class TDengineCursor(object): ...@@ -38,7 +37,6 @@ class TDengineCursor(object):
self._block_iter = 0 self._block_iter = 0
self._affected_rows = 0 self._affected_rows = 0
self._logfile = "" self._logfile = ""
self._threadId = threading.get_ident()
if connection is not None: if connection is not None:
self._connection = connection self._connection = connection
...@@ -105,12 +103,6 @@ class TDengineCursor(object): ...@@ -105,12 +103,6 @@ class TDengineCursor(object):
def execute(self, operation, params=None): def execute(self, operation, params=None):
"""Prepare and execute a database operation (query or command). """Prepare and execute a database operation (query or command).
""" """
# if threading.get_ident() != self._threadId:
# info ="Cursor execute:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
if not operation: if not operation:
return None return None
...@@ -280,12 +272,6 @@ class TDengineCursor(object): ...@@ -280,12 +272,6 @@ class TDengineCursor(object):
def _handle_result(self): def _handle_result(self):
"""Handle the return result from query. """Handle the return result from query.
""" """
# if threading.get_ident() != self._threadId:
# info = "Cursor handleresult:Thread ID not match,creater:"+str(self._threadId)+" caller:"+str(threading.get_ident())
# raise OperationalError(info)
# print(info)
# return None
self._description = [] self._description = []
for ele in self._fields: for ele in self._fields:
self._description.append( self._description.append(
......
...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh: ...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup( setuptools.setup(
name="taos", name="taos",
version="2.0.4", version="2.0.7",
author="Taosdata Inc.", author="Taosdata Inc.",
author_email="support@taosdata.com", author_email="support@taosdata.com",
description="TDengine python client package", description="TDengine python client package",
......
...@@ -22,10 +22,10 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False): ...@@ -22,10 +22,10 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
if num_of_rows > 0: if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast( return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)])) data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]))
else: else:
return list(map(_timestamp_converter, ctypes.cast( return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)])) data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]))
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False): def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
...@@ -145,10 +145,10 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False): ...@@ -145,10 +145,10 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
""" """
if num_of_rows > 0: if num_of_rows > 0:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast( return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]] data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
else: else:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast( return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]] data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
def _crow_bigint_unsigned_to_python( def _crow_bigint_unsigned_to_python(
...@@ -162,13 +162,13 @@ def _crow_bigint_unsigned_to_python( ...@@ -162,13 +162,13 @@ def _crow_bigint_unsigned_to_python(
return [ return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast( None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER( data, ctypes.POINTER(
ctypes.c_ulong))[ ctypes.c_uint64))[
:abs(num_of_rows)]] :abs(num_of_rows)]]
else: else:
return [ return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast( None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER( data, ctypes.POINTER(
ctypes.c_ulong))[ ctypes.c_uint64))[
:abs(num_of_rows)]] :abs(num_of_rows)]]
...@@ -600,7 +600,7 @@ class CTaosInterface(object): ...@@ -600,7 +600,7 @@ class CTaosInterface(object):
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE):
...@@ -608,7 +608,7 @@ class CTaosInterface(object): ...@@ -608,7 +608,7 @@ class CTaosInterface(object):
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY):
# return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00') # return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00')
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00') # return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
......
from .cinterface import CTaosInterface from .cinterface import CTaosInterface
from .error import * from .error import *
from .constants import FieldType from .constants import FieldType
import threading
# querySeqNum = 0 # querySeqNum = 0
...@@ -38,7 +37,6 @@ class TDengineCursor(object): ...@@ -38,7 +37,6 @@ class TDengineCursor(object):
self._block_iter = 0 self._block_iter = 0
self._affected_rows = 0 self._affected_rows = 0
self._logfile = "" self._logfile = ""
self._threadId = threading.get_ident()
if connection is not None: if connection is not None:
self._connection = connection self._connection = connection
......
...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh: ...@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup( setuptools.setup(
name="taos", name="taos",
version="2.0.4", version="2.0.7",
author="Taosdata Inc.", author="Taosdata Inc.",
author_email="support@taosdata.com", author_email="support@taosdata.com",
description="TDengine python client package", description="TDengine python client package",
......
...@@ -22,10 +22,10 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False): ...@@ -22,10 +22,10 @@ def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
if num_of_rows > 0: if num_of_rows > 0:
return list(map(_timestamp_converter, ctypes.cast( return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)])) data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]))
else: else:
return list(map(_timestamp_converter, ctypes.cast( return list(map(_timestamp_converter, ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)])) data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]))
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False): def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
...@@ -145,10 +145,10 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False): ...@@ -145,10 +145,10 @@ def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
""" """
if num_of_rows > 0: if num_of_rows > 0:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast( return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]] data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
else: else:
return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast( return [None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER(ctypes.c_long))[:abs(num_of_rows)]] data, ctypes.POINTER(ctypes.c_int64))[:abs(num_of_rows)]]
def _crow_bigint_unsigned_to_python( def _crow_bigint_unsigned_to_python(
...@@ -162,13 +162,13 @@ def _crow_bigint_unsigned_to_python( ...@@ -162,13 +162,13 @@ def _crow_bigint_unsigned_to_python(
return [ return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast( None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER( data, ctypes.POINTER(
ctypes.c_ulong))[ ctypes.c_uint64))[
:abs(num_of_rows)]] :abs(num_of_rows)]]
else: else:
return [ return [
None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast( None if ele == FieldType.C_BIGINT_UNSIGNED_NULL else ele for ele in ctypes.cast(
data, ctypes.POINTER( data, ctypes.POINTER(
ctypes.c_ulong))[ ctypes.c_uint64))[
:abs(num_of_rows)]] :abs(num_of_rows)]]
...@@ -600,7 +600,7 @@ class CTaosInterface(object): ...@@ -600,7 +600,7 @@ class CTaosInterface(object):
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE):
...@@ -608,7 +608,7 @@ class CTaosInterface(object): ...@@ -608,7 +608,7 @@ class CTaosInterface(object):
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY):
# return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00') # return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00')
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP):
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0] # return ctypes.cast(data, ctypes.POINTER(ctypes.c_int64))[0]
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR): # elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00') # return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
......
from .cinterface import CTaosInterface from .cinterface import CTaosInterface
from .error import * from .error import *
from .constants import FieldType from .constants import FieldType
import threading
# querySeqNum = 0 # querySeqNum = 0
...@@ -38,7 +37,6 @@ class TDengineCursor(object): ...@@ -38,7 +37,6 @@ class TDengineCursor(object):
self._block_iter = 0 self._block_iter = 0
self._affected_rows = 0 self._affected_rows = 0
self._logfile = "" self._logfile = ""
self._threadId = threading.get_ident()
if connection is not None: if connection is not None:
self._connection = connection self._connection = connection
......
...@@ -252,7 +252,7 @@ int32_t* taosGetErrno(); ...@@ -252,7 +252,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_IN_EXEC TAOS_DEF_ERROR_CODE(0, 0x0709) //"Multiple retrieval of this query") #define TSDB_CODE_QRY_IN_EXEC TAOS_DEF_ERROR_CODE(0, 0x0709) //"Multiple retrieval of this query")
#define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query") #define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query")
#define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached") #define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached")
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistance in replica") #define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
// grant // grant
......
...@@ -394,7 +394,7 @@ typedef struct SColIndex { ...@@ -394,7 +394,7 @@ typedef struct SColIndex {
int16_t colId; // column id int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
uint16_t flag; // denote if it is a tag or a normal column uint16_t flag; // denote if it is a tag or a normal column
char name[TSDB_COL_NAME_LEN]; char name[TSDB_COL_NAME_LEN]; // TODO remove it
} SColIndex; } SColIndex;
/* sql function msg, to describe the message to vnode about sql function /* sql function msg, to describe the message to vnode about sql function
...@@ -402,7 +402,10 @@ typedef struct SColIndex { ...@@ -402,7 +402,10 @@ typedef struct SColIndex {
typedef struct SSqlFuncMsg { typedef struct SSqlFuncMsg {
int16_t functionId; int16_t functionId;
int16_t numOfParams; int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column int16_t resColId; // result column id, id of the current output column
int16_t colType;
int16_t colBytes;
SColIndex colInfo; SColIndex colInfo;
struct ArgElem { struct ArgElem {
......
...@@ -158,13 +158,18 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR ...@@ -158,13 +158,18 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR
typedef void *TsdbQueryHandleT; // Use void to hide implementation details typedef void *TsdbQueryHandleT; // Use void to hide implementation details
// query condition to build vnode iterator #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
// query condition to build multi-table data block iterator
typedef struct STsdbQueryCond { typedef struct STsdbQueryCond {
STimeWindow twindow; STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols; int32_t numOfCols;
SColumnInfo *colList; SColumnInfo *colList;
bool loadExternalRows; // load external rows or not bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
} STsdbQueryCond; } STsdbQueryCond;
typedef struct SMemRef { typedef struct SMemRef {
...@@ -181,17 +186,31 @@ typedef struct SDataBlockInfo { ...@@ -181,17 +186,31 @@ typedef struct SDataBlockInfo {
int32_t tid; int32_t tid;
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SFileBlockInfo {
int32_t numOfRows;
} SFileBlockInfo;
typedef struct { typedef struct {
void *pTable; void *pTable;
TSKEY lastKey; TSKEY lastKey;
} STableKeyInfo; } STableKeyInfo;
typedef struct { typedef struct {
size_t numOfTables; uint32_t numOfTables;
SArray * pGroupList; SArray * pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo; } STableGroupInfo;
typedef struct {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
SArray *dataBlockInfos;
} STableBlockDist;
/** /**
* Get the data block iterator, starting from position according to the query condition * Get the data block iterator, starting from position according to the query condition
* *
...@@ -252,16 +271,7 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle); ...@@ -252,16 +271,7 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
* @param pQueryHandle * @param pQueryHandle
* @return * @return
*/ */
bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle); bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle);
/**
* move to next block if exists but not merge data in memtable
*
* @param pQueryHandle
* @return
*/
bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT *pQueryHandle);
SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SMemRef* pMemRef, int16_t type);
/** /**
* Get current data block information * Get current data block information
...@@ -306,7 +316,7 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const ...@@ -306,7 +316,7 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const
SColIndex *pColIndex, int32_t numOfCols); SColIndex *pColIndex, int32_t numOfCols);
/** /**
* destory the created table group list, which is generated by tag query * destroy the created table group list, which is generated by tag query
* @param pGroupList * @param pGroupList
*/ */
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
...@@ -336,6 +346,12 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STabl ...@@ -336,6 +346,12 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STabl
*/ */
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
/** /**
* get the statistics of repo usage * get the statistics of repo usage
* @param repo. point to the tsdbrepo * @param repo. point to the tsdbrepo
......
...@@ -191,52 +191,18 @@ ...@@ -191,52 +191,18 @@
#define TK_STATEMENT 172 #define TK_STATEMENT 172
#define TK_TRIGGER 173 #define TK_TRIGGER 173
#define TK_VIEW 174 #define TK_VIEW 174
#define TK_COUNT 175 #define TK_SEMI 175
#define TK_SUM 176 #define TK_NONE 176
#define TK_AVG 177 #define TK_PREV 177
#define TK_MIN 178 #define TK_LINEAR 178
#define TK_MAX 179 #define TK_IMPORT 179
#define TK_FIRST 180 #define TK_METRIC 180
#define TK_LAST 181 #define TK_TBNAME 181
#define TK_TOP 182 #define TK_JOIN 182
#define TK_BOTTOM 183 #define TK_METRICS 183
#define TK_STDDEV 184 #define TK_INSERT 184
#define TK_PERCENTILE 185 #define TK_INTO 185
#define TK_APERCENTILE 186 #define TK_VALUES 186
#define TK_LEASTSQUARES 187
#define TK_HISTOGRAM 188
#define TK_DIFF 189
#define TK_SPREAD 190
#define TK_TWA 191
#define TK_INTERP 192
#define TK_LAST_ROW 193
#define TK_RATE 194
#define TK_IRATE 195
#define TK_SUM_RATE 196
#define TK_SUM_IRATE 197
#define TK_AVG_RATE 198
#define TK_AVG_IRATE 199
#define TK_TBID 200
#define TK_SEMI 201
#define TK_NONE 202
#define TK_PREV 203
#define TK_LINEAR 204
#define TK_IMPORT 205
#define TK_METRIC 206
#define TK_TBNAME 207
#define TK_JOIN 208
#define TK_METRICS 209
#define TK_INSERT 210
#define TK_INTO 211
#define TK_VALUES 212
#define TK_SPACE 300 #define TK_SPACE 300
......
...@@ -174,7 +174,7 @@ bool isValidDataType(int32_t type); ...@@ -174,7 +174,7 @@ bool isValidDataType(int32_t type);
void setVardataNull(char* val, int32_t type); void setVardataNull(char* val, int32_t type);
void setNull(char *val, int32_t type, int32_t bytes); void setNull(char *val, int32_t type, int32_t bytes);
void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems); void setNullN(char *val, int32_t type, int32_t bytes, int32_t numOfElems);
void* getNullValue(int32_t type); void *getNullValue(int32_t type);
void assignVal(char *val, const char *src, int32_t len, int32_t type); void assignVal(char *val, const char *src, int32_t len, int32_t type);
void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf); void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf);
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
"confirm_parameter_prompt": "no", "confirm_parameter_prompt": "no",
"insert_interval": 0, "insert_interval": 0,
"num_of_records_per_req": 100, "num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{ "databases": [{
"dbinfo": { "dbinfo": {
"name": "db", "name": "db",
...@@ -38,7 +39,9 @@ ...@@ -38,7 +39,9 @@
"auto_create_table": "no", "auto_create_table": "no",
"data_source": "rand", "data_source": "rand",
"insert_mode": "taosc", "insert_mode": "taosc",
"insert_rows": 100000, "childtable_limit": 33,
"childtable_offset": 33,
"insert_rows": 1000,
"multi_thread_write_one_tbl": "no", "multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0, "number_of_tbl_in_one_sql": 0,
"rows_per_tbl": 100, "rows_per_tbl": 100,
......
此差异已折叠。
...@@ -63,9 +63,11 @@ void httpJsonString(JsonBuf* buf, char* sVal, int32_t len); ...@@ -63,9 +63,11 @@ void httpJsonString(JsonBuf* buf, char* sVal, int32_t len);
void httpJsonOriginString(JsonBuf* buf, char* sVal, int32_t len); void httpJsonOriginString(JsonBuf* buf, char* sVal, int32_t len);
void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int32_t maxLen); void httpJsonStringForTransMean(JsonBuf* buf, char* SVal, int32_t maxLen);
void httpJsonInt64(JsonBuf* buf, int64_t num); void httpJsonInt64(JsonBuf* buf, int64_t num);
void httpJsonUInt64(JsonBuf* buf, uint64_t num);
void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us); void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us);
void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us); void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, bool us);
void httpJsonInt(JsonBuf* buf, int32_t num); void httpJsonInt(JsonBuf* buf, int32_t num);
void httpJsonUInt(JsonBuf* buf, uint32_t num);
void httpJsonFloat(JsonBuf* buf, float num); void httpJsonFloat(JsonBuf* buf, float num);
void httpJsonDouble(JsonBuf* buf, double num); void httpJsonDouble(JsonBuf* buf, double num);
void httpJsonNull(JsonBuf* buf); void httpJsonNull(JsonBuf* buf);
......
...@@ -256,6 +256,12 @@ void httpJsonInt64(JsonBuf* buf, int64_t num) { ...@@ -256,6 +256,12 @@ void httpJsonInt64(JsonBuf* buf, int64_t num) {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRId64, num); buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRId64, num);
} }
void httpJsonUInt64(JsonBuf* buf, uint64_t num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%" PRIu64, num);
}
void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) { void httpJsonTimestamp(JsonBuf* buf, int64_t t, bool us) {
char ts[35] = {0}; char ts[35] = {0};
struct tm* ptm; struct tm* ptm;
...@@ -303,6 +309,12 @@ void httpJsonInt(JsonBuf* buf, int32_t num) { ...@@ -303,6 +309,12 @@ void httpJsonInt(JsonBuf* buf, int32_t num) {
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%d", num); buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%d", num);
} }
void httpJsonUInt(JsonBuf* buf, uint32_t num) {
httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
buf->lst += snprintf(buf->lst, MAX_NUM_STR_SZ, "%u", num);
}
void httpJsonFloat(JsonBuf* buf, float num) { void httpJsonFloat(JsonBuf* buf, float num) {
httpJsonItemToken(buf); httpJsonItemToken(buf);
httpJsonTestBuf(buf, MAX_NUM_STR_SZ); httpJsonTestBuf(buf, MAX_NUM_STR_SZ);
......
...@@ -162,6 +162,18 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result, ...@@ -162,6 +162,18 @@ bool restBuildSqlJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
httpJsonInt64(jsonBuf, *((int64_t *)row[i])); httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
break; break;
case TSDB_DATA_TYPE_UTINYINT:
httpJsonUInt(jsonBuf, *((uint8_t *)row[i]));
break;
case TSDB_DATA_TYPE_USMALLINT:
httpJsonUInt(jsonBuf, *((uint16_t *)row[i]));
break;
case TSDB_DATA_TYPE_UINT:
httpJsonUInt(jsonBuf, *((uint32_t *)row[i]));
break;
case TSDB_DATA_TYPE_UBIGINT:
httpJsonUInt64(jsonBuf, *((uint64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
httpJsonFloat(jsonBuf, GET_FLOAT_VAL(row[i])); httpJsonFloat(jsonBuf, GET_FLOAT_VAL(row[i]));
break; break;
......
...@@ -26,6 +26,7 @@ extern "C" { ...@@ -26,6 +26,7 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
#include "trpc.h" #include "trpc.h"
#include "tvariant.h" #include "tvariant.h"
#include "tsdb.h"
#define TSDB_FUNC_INVALID_ID -1 #define TSDB_FUNC_INVALID_ID -1
#define TSDB_FUNC_COUNT 0 #define TSDB_FUNC_COUNT 0
...@@ -70,15 +71,17 @@ extern "C" { ...@@ -70,15 +71,17 @@ extern "C" {
#define TSDB_FUNC_AVG_IRATE 34 #define TSDB_FUNC_AVG_IRATE 34
#define TSDB_FUNC_TID_TAG 35 #define TSDB_FUNC_TID_TAG 35
#define TSDB_FUNC_HISTOGRAM 36 #define TSDB_FUNC_BLKINFO 36
#define TSDB_FUNC_HLL 37
#define TSDB_FUNC_MODE 38 #define TSDB_FUNC_HISTOGRAM 37
#define TSDB_FUNC_SAMPLE 39 #define TSDB_FUNC_HLL 38
#define TSDB_FUNC_CEIL 40 #define TSDB_FUNC_MODE 39
#define TSDB_FUNC_FLOOR 41 #define TSDB_FUNC_SAMPLE 40
#define TSDB_FUNC_ROUND 42 #define TSDB_FUNC_CEIL 41
#define TSDB_FUNC_MAVG 43 #define TSDB_FUNC_FLOOR 42
#define TSDB_FUNC_CSUM 44 #define TSDB_FUNC_ROUND 43
#define TSDB_FUNC_MAVG 44
#define TSDB_FUNC_CSUM 45
#define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_SO 0x1u // single output
...@@ -214,13 +217,14 @@ typedef struct SAggFunctionInfo { ...@@ -214,13 +217,14 @@ typedef struct SAggFunctionInfo {
void (*xFinalize)(SQLFunctionCtx *pCtx); void (*xFinalize)(SQLFunctionCtx *pCtx);
void (*mergeFunc)(SQLFunctionCtx *pCtx); void (*mergeFunc)(SQLFunctionCtx *pCtx);
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId); int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
} SAggFunctionInfo; } SAggFunctionInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo) #define GET_RES_INFO(ctx) ((ctx)->resultInfo)
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable); int16_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable);
int32_t isValidFunction(const char* name, int32_t len);
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0) #define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0) #define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
...@@ -242,12 +246,16 @@ typedef struct STwaInfo { ...@@ -242,12 +246,16 @@ typedef struct STwaInfo {
STimeWindow win; STimeWindow win;
} STwaInfo; } STwaInfo;
struct SBufferWriter;
void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw);
void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist);
/* global sql function array */ /* global sql function array */
extern struct SAggFunctionInfo aAggs[]; extern struct SAggFunctionInfo aAggs[];
extern int32_t functionCompatList[]; // compatible check array list extern int32_t functionCompatList[]; // compatible check array list
bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const char *minval, const char *maxval); bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const char *maxval);
/** /**
* the numOfRes should be kept, since it may be used later * the numOfRes should be kept, since it may be used later
...@@ -258,14 +266,14 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha ...@@ -258,14 +266,14 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha
(_r)->initialized = false; \ (_r)->initialized = false; \
} while (0) } while (0)
static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, uint32_t bufLen) { static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, int32_t bufLen) {
pResInfo->initialized = true; // the this struct has been initialized flag pResInfo->initialized = true; // the this struct has been initialized flag
pResInfo->complete = false; pResInfo->complete = false;
pResInfo->hasResult = false; pResInfo->hasResult = false;
pResInfo->numOfRes = 0; pResInfo->numOfRes = 0;
memset(GET_ROWCELL_INTERBUF(pResInfo), 0, (size_t)bufLen); memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen);
} }
#ifdef __cplusplus #ifdef __cplusplus
......
此差异已折叠。
...@@ -24,6 +24,8 @@ extern "C" { ...@@ -24,6 +24,8 @@ extern "C" {
#include "qExtbuffer.h" #include "qExtbuffer.h"
#include "taosdef.h" #include "taosdef.h"
struct SSDataBlock;
typedef struct { typedef struct {
STColumn col; // column info STColumn col; // column info
int16_t functionId; // sql function id int16_t functionId; // sql function id
...@@ -78,7 +80,7 @@ void* taosDestroyFillInfo(SFillInfo *pFillInfo); ...@@ -78,7 +80,7 @@ void* taosDestroyFillInfo(SFillInfo *pFillInfo);
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosFillSetDataBlockFromFilePage(SFillInfo* pFillInfo, const tFilePage** pInput); void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput); void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* pInput);
...@@ -88,7 +90,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t ...@@ -88,7 +90,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType); int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity); int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -55,7 +55,6 @@ typedef struct SResultBufStatis { ...@@ -55,7 +55,6 @@ typedef struct SResultBufStatis {
} SResultBufStatis; } SResultBufStatis;
typedef struct SDiskbasedResultBuf { typedef struct SDiskbasedResultBuf {
int32_t numOfRowsPerPage;
int32_t numOfPages; int32_t numOfPages;
int64_t totalBufSize; int64_t totalBufSize;
int64_t fileSize; // disk file size int64_t fileSize; // disk file size
...@@ -77,7 +76,7 @@ typedef struct SDiskbasedResultBuf { ...@@ -77,7 +76,7 @@ typedef struct SDiskbasedResultBuf {
SResultBufStatis statis; SResultBufStatis statis;
} SDiskbasedResultBuf; } SDiskbasedResultBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (256L) // in bytes #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
/** /**
...@@ -89,8 +88,7 @@ typedef struct SDiskbasedResultBuf { ...@@ -89,8 +88,7 @@ typedef struct SDiskbasedResultBuf {
* @param handle * @param handle
* @return * @return
*/ */
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize, int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, const void* handle);
int32_t inMemBufSize, const void* handle);
/** /**
* *
...@@ -101,13 +99,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro ...@@ -101,13 +99,6 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro
*/ */
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId); tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId);
/**
*
* @param pResultBuf
* @return
*/
size_t getNumOfRowsPerPage(const SDiskbasedResultBuf* pResultBuf);
/** /**
* *
* @param pResultBuf * @param pResultBuf
......
...@@ -185,19 +185,32 @@ typedef struct SSqlInfo { ...@@ -185,19 +185,32 @@ typedef struct SSqlInfo {
}; };
} SSqlInfo; } SSqlInfo;
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
enum SQL_NODE_TYPE {
SQL_NODE_TABLE_COLUMN= 1,
SQL_NODE_SQLFUNCTION = 2,
SQL_NODE_VALUE = 3,
SQL_NODE_EXPR = 4,
};
typedef struct tSQLExpr { typedef struct tSQLExpr {
uint32_t nSQLOptr; // TK_FUNCTION: sql function, TK_LE: less than(binary expr) uint16_t type; // sql node type
uint32_t tokenId; // TK_FUNCTION: sql function, TK_LE: less than(binary expr)
// the full sql string of function(col, param), which is actually the raw // the whole string of the function(col, param), while the function name is kept in token
// field name, since the function name is kept in nSQLOptr already
SStrToken operand; SStrToken operand;
SStrToken colInfo; // field id uint32_t functionId; // function id
tVariant val; // value only for string, float, int
SStrToken colInfo; // table column info
tVariant value; // the use input value
SStrToken token; // original sql expr string SStrToken token; // original sql expr string
struct tSQLExpr *pLeft; // left child struct tSQLExpr *pLeft; // left child
struct tSQLExpr *pRight; // right child struct tSQLExpr *pRight; // right child
struct tSQLExprList *pParam; // function parameters struct tSQLExprList *pParam; // function parameters list
} tSQLExpr; } tSQLExpr;
// used in select clause. select <tSQLExprList> from xxx // used in select clause. select <tSQLExprList> from xxx
...@@ -294,16 +307,6 @@ void tSqlSetColumnType(TAOS_FIELD *pField, SStrToken *type); ...@@ -294,16 +307,6 @@ void tSqlSetColumnType(TAOS_FIELD *pField, SStrToken *type);
void *ParseAlloc(void *(*mallocProc)(size_t)); void *ParseAlloc(void *(*mallocProc)(size_t));
enum {
TSQL_NODE_TYPE_EXPR = 0x1,
TSQL_NODE_TYPE_ID = 0x2,
TSQL_NODE_TYPE_VALUE = 0x4,
};
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
SSqlInfo qSQLParse(const char *str); SSqlInfo qSQLParse(const char *str);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -113,12 +113,10 @@ STSBuf* tsBufClone(STSBuf* pTSBuf); ...@@ -113,12 +113,10 @@ STSBuf* tsBufClone(STSBuf* pTSBuf);
STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id); STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id);
void tsBufFlush(STSBuf* pTSBuf); void tsBufFlush(STSBuf* pTSBuf);
void tsBufResetPos(STSBuf* pTSBuf); void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf); bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag); STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf); STSCursor tsBufGetCursor(STSBuf* pTSBuf);
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define curTimeWindowIndex(_winres) ((_winres)->curIndex) #define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1) #define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
int32_t getOutputInterResultBufSize(SQuery* pQuery); int32_t getOutputInterResultBufSize(SQuery* pQuery);
...@@ -44,22 +44,18 @@ void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); ...@@ -44,22 +44,18 @@ void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot);
bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type);
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
return pResultRowInfo->pResult[slot]; return pResultRowInfo->pResult[slot];
} }
static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult, static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, int32_t rowOffset, int16_t offset) {
tFilePage* page) { assert(rowOffset >= 0 && pQuery != NULL);
assert(pResult != NULL && pRuntimeEnv != NULL);
SQuery *pQuery = pRuntimeEnv->pQuery; int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows;
int32_t realRowId = (int32_t)(pResult->rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pRuntimeEnv->topBotQuery, pRuntimeEnv->stableQuery));
return ((char *)page->data) + pRuntimeEnv->offset[columnIndex] * pRuntimeEnv->numOfRowsPerPage +
pQuery->pExpr1[columnIndex].bytes * realRowId;
} }
bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
...@@ -74,8 +70,6 @@ void* destroyResultRowPool(SResultRowPool* p); ...@@ -74,8 +70,6 @@ void* destroyResultRowPool(SResultRowPool* p);
int32_t getNumOfAllocatedResultRows(SResultRowPool* p); int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
int32_t getNumOfUsedResultRows(SResultRowPool* p); int32_t getNumOfUsedResultRows(SResultRowPool* p);
bool isPointInterpoQuery(SQuery *pQuery);
typedef struct { typedef struct {
SArray* pResult; // SArray<SResPair> SArray* pResult; // SArray<SResPair>
int32_t colId; int32_t colId;
...@@ -85,12 +79,14 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen); ...@@ -85,12 +79,14 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen);
SArray* interResFromBinary(const char* data, int32_t len); SArray* interResFromBinary(const char* data, int32_t len);
void freeInterResult(void* param); void freeInterResult(void* param);
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset); void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo);
bool hasRemainData(SGroupResInfo* pGroupResInfo); bool hasRemainData(SGroupResInfo* pGroupResInfo);
bool incNextGroup(SGroupResInfo* pGroupResInfo); bool incNextGroup(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQInfo *pQInfo); int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset);
#endif // TDENGINE_QUERYUTIL_H #endif // TDENGINE_QUERYUTIL_H
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册