提交 628ac58b 编写于 作者: G Ganlin Zhao

Merge branch '3.0' into fix/TD-18287

......@@ -60,7 +60,7 @@ def check_docs() {
def file_changed = sh (
script: '''
cd ${WKC}
git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${CHANGE_TARGET}`|grep -v "^docs/en/"|grep -v "^docs/zh/"
git --no-pager diff --name-only FETCH_HEAD `git merge-base FETCH_HEAD ${CHANGE_TARGET}`|grep -v "^docs/en/"|grep -v "^docs/zh/" || :
''',
returnStdout: true
).trim()
......
"""
Python asynchronous subscribe demo.
run on Linux system with: python3 subscribe_demo.py
"""
from ctypes import c_void_p
import taos
import time
def query_callback(p_sub, p_result, p_param, code):
"""
:param p_sub: pointer returned by native API -- taos_subscribe
:param p_result: pointer to native TAOS_RES
:param p_param: None
:param code: error code
:return: None
"""
print("in callback")
result = taos.TaosResult(c_void_p(p_result))
# raise exception if error occur
result.check_error(code)
for row in result.rows_iter():
print(row)
print(f"{result.row_count} rows consumed.")
if __name__ == '__main__':
conn = taos.connect()
restart = True
topic = "topic-meter-current-bg"
sql = "select * from power.meters where current > 10" # Error sql
interval = 2000 # consumption interval in microseconds.
_ = conn.subscribe(restart, topic, sql, interval, query_callback)
# Note: we received the return value as _ above, to avoid the TaosSubscription object to be deleted by gc.
while True:
time.sleep(10) # use Ctrl + C to interrupt
......@@ -115,6 +115,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo);
int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId);
int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo);
size_t getTableTagsBufLen(const SNodeList* pGroups);
SArray* createSortInfo(SNodeList* pNodeList);
......
......@@ -428,10 +428,6 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
// int64_t st = taosGetTimestampUs();
for (int32_t i = 0; i < rows; i++) {
int64_t* uid = taosArrayGet(uidList, i);
void* tag = taosHashGet(tags, uid, sizeof(int64_t));
if (suid != 0) {
ASSERT(tag);
}
for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
......@@ -443,6 +439,8 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
qDebug("tagfilter uid:%ld, tbname:%s", *uid, str+2);
#endif
}else{
void* tag = taosHashGet(tags, uid, sizeof(int64_t));
ASSERT(tag);
STagVal tagVal = {0};
tagVal.cid = pColInfo->info.colId;
const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal);
......@@ -503,6 +501,241 @@ end:
return output.columnData;
}
static void releaseColInfoData(void* pCol) {
if(pCol){
SColumnInfoData* col = (SColumnInfoData*) pCol;
colDataDestroy(col);
taosMemoryFree(col);
}
}
int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo){
int32_t code = TSDB_CODE_SUCCESS;
SArray *pBlockList = NULL;
SSDataBlock *pResBlock = NULL;
SHashObj *tags = NULL;
SArray *uidList = NULL;
void *keyBuf = NULL;
SArray *groupData = NULL;
int32_t rows = taosArrayGetSize(pTableListInfo->pTableList);
if(rows == 0){
return TDB_CODE_SUCCESS;
}
tagFilterAssist ctx = {0};
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
if(ctx.colHash == NULL){
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
ctx.index = 0;
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
if(ctx.cInfoList == NULL){
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
SNode* pNode = NULL;
FOREACH(pNode, group) {
nodesRewriteExprPostOrder(&pNode, getColumn, (void *)&ctx);
REPLACE_NODE(pNode);
}
pResBlock = createDataBlock();
if (pResBlock == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
for (int32_t i = 0; i < taosArrayGetSize(ctx.cInfoList); ++i) {
SColumnInfoData colInfo = {{0}, 0};
colInfo.info = *(SColumnInfo*)taosArrayGet(ctx.cInfoList, i);
blockDataAppendColInfo(pResBlock, &colInfo);
}
uidList = taosArrayInit(rows, sizeof(uint64_t));
for (int32_t i = 0; i < rows; ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
taosArrayPush(uidList, &pkeyInfo->uid);
}
// int64_t stt = taosGetTimestampUs();
tags = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
code = metaGetTableTags(metaHandle, pTableListInfo->suid, uidList, tags);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
// int64_t stt1 = taosGetTimestampUs();
// qDebug("generate tag meta rows:%d, cost:%ld us", rows, stt1-stt);
code = blockDataEnsureCapacity(pResBlock, rows);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
// int64_t st = taosGetTimestampUs();
for (int32_t i = 0; i < rows; i++) {
int64_t* uid = taosArrayGet(uidList, i);
for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
if(pColInfo->info.colId == -1){ // tbname
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
metaGetTableNameByUid(metaHandle, *uid, str);
colDataAppend(pColInfo, i, str, false);
#if TAG_FILTER_DEBUG
qDebug("tagfilter uid:%ld, tbname:%s", *uid, str+2);
#endif
}else{
void* tag = taosHashGet(tags, uid, sizeof(int64_t));
ASSERT(tag);
STagVal tagVal = {0};
tagVal.cid = pColInfo->info.colId;
const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal);
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)){
colDataAppend(pColInfo, i, p, true);
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
colDataAppend(pColInfo, i, p, false);
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
char *tmp = taosMemoryCalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1, 1);
varDataSetLen(tmp, tagVal.nData);
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
colDataAppend(pColInfo, i, tmp, false);
#if TAG_FILTER_DEBUG
qDebug("tagfilter varch:%s", tmp+2);
#endif
taosMemoryFree(tmp);
} else {
colDataAppend(pColInfo, i, (const char*)&tagVal.i64, false);
#if TAG_FILTER_DEBUG
if(pColInfo->info.type == TSDB_DATA_TYPE_INT){
qDebug("tagfilter int:%d", *(int*)(&tagVal.i64));
}else if(pColInfo->info.type == TSDB_DATA_TYPE_DOUBLE){
qDebug("tagfilter double:%f", *(double *)(&tagVal.i64));
}
#endif
}
}
}
}
pResBlock->info.rows = rows;
// int64_t st1 = taosGetTimestampUs();
// qDebug("generate tag block rows:%d, cost:%ld us", rows, st1-st);
pBlockList = taosArrayInit(2, POINTER_BYTES);
taosArrayPush(pBlockList, &pResBlock);
groupData = taosArrayInit(2, POINTER_BYTES);
FOREACH(pNode, group) {
SScalarParam output = {0};
switch (nodeType(pNode)) {
case QUERY_NODE_VALUE:
break;
case QUERY_NODE_COLUMN:
case QUERY_NODE_OPERATOR:
case QUERY_NODE_FUNCTION:{
SExprNode* expNode = (SExprNode*)pNode;
code = createResultData(&expNode->resType, rows, &output);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
break;
}
default:
code = TSDB_CODE_OPS_NOT_SUPPORT;
goto end;
}
if(nodeType(pNode) == QUERY_NODE_COLUMN){
SColumnNode* pSColumnNode = (SColumnNode*)pNode;
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId);
code = colDataAssign(output.columnData, pColInfo, rows, NULL);
}else if(nodeType(pNode) == QUERY_NODE_VALUE){
continue;
}else{
code = scalarCalculate(pNode, pBlockList, &output);
}
if(code != TSDB_CODE_SUCCESS){
releaseColInfoData(output.columnData);
goto end;
}
taosArrayPush(groupData, &output.columnData);
}
int32_t keyLen = 0;
SNode* node;
FOREACH(node, group) {
SExprNode* pExpr = (SExprNode*)node;
keyLen += pExpr->resType.bytes;
}
int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
keyLen += nullFlagSize;
keyBuf = taosMemoryCalloc(1, keyLen);
if (keyBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
for(int i = 0; i < rows; i++){
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
char* isNull = (char*)keyBuf;
char* pStart = (char*)keyBuf + sizeof(int8_t) * LIST_LENGTH(group);
for(int j = 0; j < taosArrayGetSize(groupData); j++){
SColumnInfoData* pValue = (SColumnInfoData*)taosArrayGetP(groupData, j);
if (colDataIsNull_s(pValue, i)) {
isNull[j] = 1;
} else {
isNull[j] = 0;
char* data = colDataGetData(pValue, i);
if (pValue->info.type == TSDB_DATA_TYPE_JSON) {
if (tTagIsJson(data)) {
code = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
goto end;
}
if(tTagIsJsonNull(data)){
isNull[j] = 1;
continue;
}
int32_t len = getJsonValueLen(data);
memcpy(pStart, data, len);
pStart += len;
} else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
memcpy(pStart, data, varDataTLen(data));
pStart += varDataTLen(data);
} else {
memcpy(pStart, data, pValue->info.bytes);
pStart += pValue->info.bytes;
}
}
}
int32_t len = (int32_t)(pStart - (char*)keyBuf);
info->groupId = calcGroupId(keyBuf, len);
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
}
// int64_t st2 = taosGetTimestampUs();
// qDebug("calculate tag block rows:%d, cost:%ld us", rows, st2-st1);
end:
taosMemoryFreeClear(keyBuf);
taosHashCleanup(tags);
taosHashCleanup(ctx.colHash);
taosArrayDestroy(ctx.cInfoList);
blockDataDestroy(pResBlock);
taosArrayDestroy(pBlockList);
taosArrayDestroy(uidList);
taosArrayDestroyP(groupData, releaseColInfoData);
return code;
}
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
......
......@@ -3894,9 +3894,9 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
tDeleteSSchemaWrapper(pSchemaInfo->qsw);
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
taosArrayClear(pTableListInfo->pGroupList);
SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY;
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
......@@ -3974,48 +3974,26 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
if (pTableListInfo->map == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t keyLen = 0;
void* keyBuf = NULL;
SNode* node;
FOREACH(node, group) {
SExprNode* pExpr = (SExprNode*)node;
keyLen += pExpr->resType.bytes;
}
int32_t nullFlagSize = sizeof(int8_t) * LIST_LENGTH(group);
keyLen += nullFlagSize;
keyBuf = taosMemoryCalloc(1, keyLen);
if (keyBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
bool assignUid = groupbyTbname(group);
int32_t groupNum = 0;
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
if (assignUid) {
if(assignUid){
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = info->uid;
} else {
int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
}
}else{
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
groupNum++;
}
taosMemoryFree(keyBuf);
if (pTableListInfo->needSortTableByGroupId) {
return sortTableGroup(pTableListInfo, groupNum);
return sortTableGroup(pTableListInfo);
}
return TDB_CODE_SUCCESS;
......
......@@ -95,6 +95,7 @@ typedef struct {
TAOS* conn;
TdThread pid;
tsem_t cancelSem;
bool exit;
#ifdef WEBSOCKET
WS_TAOS* ws_conn;
bool stop_query;
......
......@@ -948,6 +948,10 @@ void shellCleanup(void *arg) { taosResetTerminalMode(); }
void *shellCancelHandler(void *arg) {
setThreadName("shellCancelHandler");
while (1) {
if (shell.exit == true) {
break;
}
if (tsem_wait(&shell.cancelSem) != 0) {
taosMsleep(10);
continue;
......@@ -961,7 +965,7 @@ void *shellCancelHandler(void *arg) {
taos_kill_query(shell.conn);
#ifdef WEBSOCKET
}
#endif
#endif
#ifdef WINDOWS
printf("\n%s", shell.info.promptHeader);
#endif
......@@ -1009,7 +1013,7 @@ int32_t shellExecute() {
if (shell.args.restful || shell.args.cloud) {
if (shell_conn_ws_server(1)) {
return -1;
}
}
} else {
#endif
if (shell.args.auth == NULL) {
......@@ -1043,7 +1047,7 @@ int32_t shellExecute() {
if (shell.args.restful || shell.args.cloud) {
ws_close(shell.ws_conn);
} else {
#endif
#endif
taos_close(shell.conn);
#ifdef WEBSOCKET
}
......@@ -1079,7 +1083,12 @@ int32_t shellExecute() {
taosThreadCreate(&shell.pid, NULL, shellThreadLoop, NULL);
taosThreadJoin(shell.pid, NULL);
taosThreadClear(&shell.pid);
if (shell.exit) {
tsem_post(&shell.cancelSem);
break;
}
}
taosThreadJoin(spid, NULL);
shellCleanupHistory();
return 0;
......
......@@ -19,6 +19,7 @@
SShellObj shell = {0};
int main(int argc, char *argv[]) {
shell.exit = false;
#ifdef WEBSOCKET
shell.args.timeout = 10;
shell.args.cloud = true;
......@@ -46,7 +47,7 @@ int main(int argc, char *argv[]) {
shellPrintHelp();
return 0;
}
#ifdef WEBSOCKET
#ifdef WEBSOCKET
shellCheckConnectMode();
#endif
taos_init();
......
......@@ -157,6 +157,6 @@ void shellExit() {
taos_close(shell.conn);
shell.conn = NULL;
}
shell.exit = true;
taos_cleanup();
exit(EXIT_FAILURE);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册