提交 b2017d9f 编写于 作者: S Shuaiqiang Chang

Merge branch 'feature/query' of github.com:taosdata/TDengine into feature/query

* 'feature/query' of github.com:taosdata/TDengine:
  [TD-314] modify python connector to adapt C API
  [td-225] add query cost statistics.
  [TD-314] modify python connector to adapt the taos API change
......@@ -100,7 +100,7 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
char *sql = malloc(sqlSize);
if (sql == NULL) {
tscError("%p failed to allocate memory to sent slow to dnode", pSql);
tscError("%p failed to allocate memory to sent slow query to dnode", pSql);
return;
}
......@@ -112,8 +112,8 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
} else {
sqlLen += len;
}
strcpy(sql + sqlLen, "')");
taosTmrStart(tscSaveSlowQueryFp, 200, sql, tscTmr);
}
......
......@@ -142,12 +142,13 @@ class CTaosInterface(object):
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p
libtaos.taos_use_result.restype = ctypes.c_void_p
#libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p
libtaos.taos_subscribe.restype = ctypes.c_void_p
libtaos.taos_consume.restype = ctypes.c_void_p
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
def __init__(self, config=None):
'''
......@@ -249,12 +250,12 @@ class CTaosInterface(object):
raise AttributeError("sql is expected as a string")
# finally:
# CTaosInterface.libtaos.close(connection)
@staticmethod
def affectedRows(connection):
def affectedRows(result):
"""The affected rows after runing query
"""
return CTaosInterface.libtaos.taos_affected_rows(connection)
return CTaosInterface.libtaos.taos_affected_rows(result)
@staticmethod
def subscribe(connection, restart, topic, sql, interval):
......@@ -292,18 +293,17 @@ class CTaosInterface(object):
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
@staticmethod
def useResult(connection):
def useResult(result):
'''Use result after calling self.query
'''
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
fields = []
pfields = CTaosInterface.fetchFields(result)
for i in range(CTaosInterface.fieldsCount(connection)):
for i in range(CTaosInterface.fieldsCount(result)):
fields.append({'name': pfields[i].name.decode('utf-8'),
'bytes': pfields[i].bytes,
'type': ord(pfields[i].type)})
return result, fields
return fields
@staticmethod
def fetchBlock(result, fields):
......@@ -337,8 +337,8 @@ class CTaosInterface(object):
result.value = None
@staticmethod
def fieldsCount(connection):
return CTaosInterface.libtaos.taos_field_count(connection)
def fieldsCount(result):
return CTaosInterface.libtaos.taos_field_count(result)
@staticmethod
def fetchFields(result):
......@@ -386,29 +386,30 @@ class CTaosInterface(object):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
@staticmethod
def errno(connection):
def errno(result):
"""Return the error number.
"""
return CTaosInterface.libtaos.taos_errno(connection)
return CTaosInterface.libtaos.taos_errno(result)
@staticmethod
def errStr(connection):
def errStr(result):
"""Return the error styring
"""
return CTaosInterface.libtaos.taos_errstr(connection)
return CTaosInterface.libtaos.taos_errstr(result)
if __name__ == '__main__':
cinter = CTaosInterface()
conn = cinter.connect()
result = cinter.query(conn, 'show databases')
print('Query return value: {}'.format(cinter.query(conn, 'show databases')))
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
print('Query Affected rows: {}'.format(cinter.affectedRows(result)))
result, des = CTaosInterface.useResult(conn)
fields = CTaosInterface.useResult(result)
data, num_of_rows = CTaosInterface.fetchBlock(result, des)
data, num_of_rows = CTaosInterface.fetchBlock(result, fields)
print(data)
cinter.freeresult(result)
cinter.close(conn)
\ No newline at end of file
......@@ -78,9 +78,7 @@ class TDengineConnection(object):
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
result = self._chandle.useResult(self._conn)[0]
if result:
self._chandle.freeResult(result)
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
......
......@@ -28,6 +28,6 @@ class FieldType(object):
C_FLOAT_NULL = float('nan')
C_DOUBLE_NULL = float('nan')
C_BINARY_NULL = bytearray([int('0xff', 16)])
# Time precision definition
# Timestamp precision definition
C_TIMESTAMP_MILLI = 0
C_TIMESTAMP_MICRO = 1
......@@ -116,25 +116,29 @@ class TDengineCursor(object):
if params is not None:
pass
res = CTaosInterface.query(self._connection._conn, stmt)
# global querySeqNum
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid raice condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
self._result = CTaosInterface.query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if (self._logfile):
with open(self._logfile, "a") as logfile:
logfile.write("%s;\n" % operation)
if res == 0:
if CTaosInterface.fieldsCount(self._connection._conn) == 0:
if self._result is not None:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._connection._conn)
return CTaosInterface.affectedRows(self._connection._conn)
self._result )
return CTaosInterface.affectedRows(self._result )
else:
self._result, self._fields = CTaosInterface.useResult(
self._connection._conn)
self._fields = CTaosInterface.useResult(
self._result)
return self._handle_result()
else:
raise ProgrammingError(
CTaosInterface.errStr(
self._connection._conn))
self._result ))
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
......
......@@ -142,12 +142,13 @@ class CTaosInterface(object):
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p
libtaos.taos_use_result.restype = ctypes.c_void_p
#libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p
libtaos.taos_subscribe.restype = ctypes.c_void_p
libtaos.taos_consume.restype = ctypes.c_void_p
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
def __init__(self, config=None):
'''
......@@ -249,12 +250,12 @@ class CTaosInterface(object):
raise AttributeError("sql is expected as a string")
# finally:
# CTaosInterface.libtaos.close(connection)
@staticmethod
def affectedRows(connection):
def affectedRows(result):
"""The affected rows after runing query
"""
return CTaosInterface.libtaos.taos_affected_rows(connection)
return CTaosInterface.libtaos.taos_affected_rows(result)
@staticmethod
def subscribe(connection, restart, topic, sql, interval):
......@@ -292,18 +293,17 @@ class CTaosInterface(object):
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
@staticmethod
def useResult(connection):
def useResult(result):
'''Use result after calling self.query
'''
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
fields = []
pfields = CTaosInterface.fetchFields(result)
for i in range(CTaosInterface.fieldsCount(connection)):
for i in range(CTaosInterface.fieldsCount(result)):
fields.append({'name': pfields[i].name.decode('utf-8'),
'bytes': pfields[i].bytes,
'type': ord(pfields[i].type)})
return result, fields
return fields
@staticmethod
def fetchBlock(result, fields):
......@@ -337,8 +337,8 @@ class CTaosInterface(object):
result.value = None
@staticmethod
def fieldsCount(connection):
return CTaosInterface.libtaos.taos_field_count(connection)
def fieldsCount(result):
return CTaosInterface.libtaos.taos_field_count(result)
@staticmethod
def fetchFields(result):
......@@ -386,29 +386,30 @@ class CTaosInterface(object):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
@staticmethod
def errno(connection):
def errno(result):
"""Return the error number.
"""
return CTaosInterface.libtaos.taos_errno(connection)
return CTaosInterface.libtaos.taos_errno(result)
@staticmethod
def errStr(connection):
def errStr(result):
"""Return the error styring
"""
return CTaosInterface.libtaos.taos_errstr(connection).decode('utf-8')
return CTaosInterface.libtaos.taos_errstr(result).decode('utf-8')
if __name__ == '__main__':
cinter = CTaosInterface()
conn = cinter.connect()
result = cinter.query(conn, 'show databases')
print('Query return value: {}'.format(cinter.query(conn, 'show databases')))
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
print('Query Affected rows: {}'.format(cinter.affectedRows(result)))
result, des = CTaosInterface.useResult(conn)
fields = CTaosInterface.useResult(result)
data, num_of_rows = CTaosInterface.fetchBlock(result, des)
data, num_of_rows = CTaosInterface.fetchBlock(result, fields)
print(data)
cinter.freeresult(result)
cinter.close(conn)
\ No newline at end of file
......@@ -78,9 +78,7 @@ class TDengineConnection(object):
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
result = self._chandle.useResult(self._conn)[0]
if result:
self._chandle.freeResult(result)
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
......
......@@ -122,26 +122,26 @@ class TDengineCursor(object):
# querySeqNum += 1
# localSeqNum = querySeqNum # avoid raice condition
# print(" >> Exec Query ({}): {}".format(localSeqNum, str(stmt)))
res = CTaosInterface.query(self._connection._conn, stmt)
self._result = CTaosInterface.query(self._connection._conn, stmt)
# print(" << Query ({}) Exec Done".format(localSeqNum))
if (self._logfile):
with open(self._logfile, "a") as logfile:
logfile.write("%s;\n" % operation)
if res == 0:
if CTaosInterface.fieldsCount(self._connection._conn) == 0:
if self._result is not None:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(
self._connection._conn)
return CTaosInterface.affectedRows(self._connection._conn)
self._result )
return CTaosInterface.affectedRows(self._result )
else:
self._result, self._fields = CTaosInterface.useResult(
self._connection._conn)
self._fields = CTaosInterface.useResult(
self._result)
return self._handle_result()
else:
raise ProgrammingError(
CTaosInterface.errStr(
self._connection._conn))
self._result ))
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
......
......@@ -142,12 +142,13 @@ class CTaosInterface(object):
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p
libtaos.taos_use_result.restype = ctypes.c_void_p
#libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p
libtaos.taos_subscribe.restype = ctypes.c_void_p
libtaos.taos_consume.restype = ctypes.c_void_p
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
def __init__(self, config=None):
'''
......@@ -251,10 +252,10 @@ class CTaosInterface(object):
# CTaosInterface.libtaos.close(connection)
@staticmethod
def affectedRows(connection):
def affectedRows(result):
"""The affected rows after runing query
"""
return CTaosInterface.libtaos.taos_affected_rows(connection)
return CTaosInterface.libtaos.taos_affected_rows(result)
@staticmethod
def subscribe(connection, restart, topic, sql, interval):
......@@ -292,18 +293,17 @@ class CTaosInterface(object):
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
@staticmethod
def useResult(connection):
def useResult(result):
'''Use result after calling self.query
'''
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
fields = []
pfields = CTaosInterface.fetchFields(result)
for i in range(CTaosInterface.fieldsCount(connection)):
for i in range(CTaosInterface.fieldsCount(result)):
fields.append({'name': pfields[i].name.decode('utf-8'),
'bytes': pfields[i].bytes,
'type': ord(pfields[i].type)})
return result, fields
return fields
@staticmethod
def fetchBlock(result, fields):
......@@ -337,8 +337,8 @@ class CTaosInterface(object):
result.value = None
@staticmethod
def fieldsCount(connection):
return CTaosInterface.libtaos.taos_field_count(connection)
def fieldsCount(result):
return CTaosInterface.libtaos.taos_field_count(result)
@staticmethod
def fetchFields(result):
......@@ -386,29 +386,30 @@ class CTaosInterface(object):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
@staticmethod
def errno(connection):
def errno(result):
"""Return the error number.
"""
return CTaosInterface.libtaos.taos_errno(connection)
return CTaosInterface.libtaos.taos_errno(result)
@staticmethod
def errStr(connection):
def errStr(result):
"""Return the error styring
"""
return CTaosInterface.libtaos.taos_errstr(connection)
return CTaosInterface.libtaos.taos_errstr(result)
if __name__ == '__main__':
cinter = CTaosInterface()
conn = cinter.connect()
result = cinter.query(conn, 'show databases')
print('Query return value: {}'.format(cinter.query(conn, 'show databases')))
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
print('Query Affected rows: {}'.format(cinter.affectedRows(result)))
result, des = CTaosInterface.useResult(conn)
fields = CTaosInterface.useResult(result)
data, num_of_rows = CTaosInterface.fetchBlock(result, des)
data, num_of_rows = CTaosInterface.fetchBlock(result, fields)
print(data)
cinter.freeresult(result)
cinter.close(conn)
\ No newline at end of file
......@@ -79,9 +79,7 @@ class TDengineConnection(object):
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
result = self._chandle.useResult(self._conn)[0]
if result:
self._chandle.freeResult(result)
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
......
......@@ -109,16 +109,16 @@ class TDengineCursor(object):
if params is not None:
pass
res = CTaosInterface.query(self._connection._conn, stmt)
if res == 0:
if CTaosInterface.fieldsCount(self._connection._conn) == 0:
self._affected_rows += CTaosInterface.affectedRows(self._connection._conn)
return CTaosInterface.affectedRows(self._connection._conn)
self._result = CTaosInterface.query(self._connection._conn, stmt)
if self._result is not None:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(self._result)
return CTaosInterface.affectedRows(self._result )
else:
self._result, self._fields = CTaosInterface.useResult(self._connection._conn)
self._fields = CTaosInterface.useResult(self._result)
return self._handle_result()
else:
raise ProgrammingError(CTaosInterface.errStr(self._connection._conn))
raise ProgrammingError(CTaosInterface.errStr(self._result))
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
......
......@@ -142,12 +142,13 @@ class CTaosInterface(object):
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
libtaos.taos_init.restype = None
libtaos.taos_connect.restype = ctypes.c_void_p
libtaos.taos_use_result.restype = ctypes.c_void_p
#libtaos.taos_use_result.restype = ctypes.c_void_p
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
libtaos.taos_errstr.restype = ctypes.c_char_p
libtaos.taos_subscribe.restype = ctypes.c_void_p
libtaos.taos_consume.restype = ctypes.c_void_p
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
libtaos.taos_free_result.restype = None
def __init__(self, config=None):
'''
......@@ -251,10 +252,10 @@ class CTaosInterface(object):
# CTaosInterface.libtaos.close(connection)
@staticmethod
def affectedRows(connection):
def affectedRows(result):
"""The affected rows after runing query
"""
return CTaosInterface.libtaos.taos_affected_rows(connection)
return CTaosInterface.libtaos.taos_affected_rows(result)
@staticmethod
def subscribe(connection, restart, topic, sql, interval):
......@@ -292,18 +293,17 @@ class CTaosInterface(object):
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
@staticmethod
def useResult(connection):
def useResult(result):
'''Use result after calling self.query
'''
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
fields = []
pfields = CTaosInterface.fetchFields(result)
for i in range(CTaosInterface.fieldsCount(connection)):
for i in range(CTaosInterface.fieldsCount(result)):
fields.append({'name': pfields[i].name.decode('utf-8'),
'bytes': pfields[i].bytes,
'type': ord(pfields[i].type)})
return result, fields
return fields
@staticmethod
def fetchBlock(result, fields):
......@@ -337,8 +337,8 @@ class CTaosInterface(object):
result.value = None
@staticmethod
def fieldsCount(connection):
return CTaosInterface.libtaos.taos_field_count(connection)
def fieldsCount(result):
return CTaosInterface.libtaos.taos_field_count(result)
@staticmethod
def fetchFields(result):
......@@ -386,29 +386,30 @@ class CTaosInterface(object):
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
@staticmethod
def errno(connection):
def errno(result):
"""Return the error number.
"""
return CTaosInterface.libtaos.taos_errno(connection)
return CTaosInterface.libtaos.taos_errno(result)
@staticmethod
def errStr(connection):
def errStr(result):
"""Return the error styring
"""
return CTaosInterface.libtaos.taos_errstr(connection).decode('utf-8')
return CTaosInterface.libtaos.taos_errstr(result).decode('utf-8')
if __name__ == '__main__':
cinter = CTaosInterface()
conn = cinter.connect()
result = cinter.query(conn, 'show databases')
print('Query return value: {}'.format(cinter.query(conn, 'show databases')))
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
print('Query Affected rows: {}'.format(cinter.affectedRows(result)))
result, des = CTaosInterface.useResult(conn)
fields = CTaosInterface.useResult(result)
data, num_of_rows = CTaosInterface.fetchBlock(result, des)
data, num_of_rows = CTaosInterface.fetchBlock(result, fields)
print(data)
cinter.freeresult(result)
cinter.close(conn)
\ No newline at end of file
......@@ -79,9 +79,7 @@ class TDengineConnection(object):
def clear_result_set(self):
"""Clear unused result set on this connection.
"""
result = self._chandle.useResult(self._conn)[0]
if result:
self._chandle.freeResult(result)
pass
if __name__ == "__main__":
conn = TDengineConnection(host='192.168.1.107')
......
......@@ -109,16 +109,16 @@ class TDengineCursor(object):
if params is not None:
pass
res = CTaosInterface.query(self._connection._conn, stmt)
if res == 0:
if CTaosInterface.fieldsCount(self._connection._conn) == 0:
self._affected_rows += CTaosInterface.affectedRows(self._connection._conn)
return CTaosInterface.affectedRows(self._connection._conn)
self._result = CTaosInterface.query(self._connection._conn, stmt)
if self._result is not None:
if CTaosInterface.fieldsCount(self._result) == 0:
self._affected_rows += CTaosInterface.affectedRows(self._result )
return CTaosInterface.affectedRows(self._result )
else:
self._result, self._fields = CTaosInterface.useResult(self._connection._conn)
self._fields = CTaosInterface.useResult(self._result )
return self._handle_result()
else:
raise ProgrammingError(CTaosInterface.errStr(self._connection._conn))
raise ProgrammingError(CTaosInterface.errStr(self._result ))
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings found in the sequence seq_of_parameters.
......
......@@ -544,7 +544,7 @@ int taosDumpOut(SDumpArguments *arguments) {
taosDumpCreateDbClause(dbInfos[0], arguments->with_property, fp);
sprintf(command, "use %s", dbInfos[0]->name);
if (taos_query(taos, command) != 0) {
if (taos_query(taos, command) == NULL ) {
fprintf(stderr, "invalid database %s\n", dbInfos[0]->name);
goto _exit_failure;
}
......@@ -1174,7 +1174,7 @@ int taosDumpIn(SDumpArguments *arguments) {
tcommand = command;
}
taosReplaceCtrlChar(tcommand);
if (taos_query(taos, tcommand) != 0)
if (taos_query(taos, tcommand) == NULL)
fprintf(stderr, "linenu: %" PRId64 " failed to run command %s reason:%s \ncontinue...\n", linenu, command,
taos_errstr(taos));
......@@ -1250,7 +1250,7 @@ int taosDumpIn(SDumpArguments *arguments) {
tcommand = command;
}
taosReplaceCtrlChar(lcommand);
if (taos_query(taos, tcommand) != 0)
if (taos_query(taos, tcommand) == NULL)
fprintf(stderr, "linenu:%" PRId64 " failed to run command %s reason:%s \ncontinue...\n", linenu, command,
taos_errstr(taos));
}
......
......@@ -116,11 +116,14 @@ typedef struct SQueryCostInfo {
uint64_t loadDataInCacheSize;
uint64_t loadDataTime;
uint64_t dataInRows;
uint64_t checkRows;
uint32_t dataBlocks;
uint64_t totalRows;
uint64_t totalCheckedRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockStatis;
uint32_t discardBlocks;
uint64_t elapsedTime;
uint64_t computTime;
} SQueryCostInfo;
typedef struct SGroupItem {
......@@ -168,7 +171,7 @@ typedef struct SQueryRuntimeEnv {
SWindowResInfo windowResInfo;
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostInfo summary;
SQueryCostInfo summary;
bool stableQuery; // super table query or not
void* pQueryHandle;
void* pSecQueryHandle; // another thread for
......@@ -177,8 +180,6 @@ typedef struct SQueryRuntimeEnv {
typedef struct SQInfo {
void* signature;
TSKEY startTime;
TSKEY elapsedTime;
int32_t pointsInterpo;
int32_t code; // error code to returned to client
sem_t dataReady;
......
......@@ -1183,6 +1183,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
STableQueryInfo* pTableQInfo = pQuery->current;
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
......@@ -1190,10 +1191,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
}
// update the lastkey of current table
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pTableQInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// interval query with limit applied
int32_t numOfRes = 0;
......@@ -2013,7 +2014,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
if (*pStatis == NULL) { // data block statistics does not exist, load data block
pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
pRuntimeEnv->summary.checkRows += pBlockInfo->rows;
pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows;
}
} else {
assert(r == BLK_DATA_ALL_NEEDED);
......@@ -2032,7 +2033,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
// return DISK_DATA_DISCARDED;
}
pRuntimeEnv->summary.checkRows += pBlockInfo->rows;
pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows;
pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
}
......@@ -2149,7 +2150,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
pRuntimeEnv->summary.dataBlocks += 1;
pRuntimeEnv->summary.totalBlocks += 1;
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return 0;
}
......@@ -2185,12 +2186,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
ensureOutputBuffer(pRuntimeEnv, &blockInfo);
SDataStatis *pStatis = NULL;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
pRuntimeEnv->summary.dataInRows += blockInfo.rows;
pRuntimeEnv->summary.totalRows += blockInfo.rows;
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey);
......@@ -3247,7 +3246,7 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
free(pTableQueryInfo);
}
void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
void setCurrentQueryTable(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->current = pTableQueryInfo;
......@@ -3316,7 +3315,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
int32_t setAdditionalInfo(SQInfo *pQInfo, STableId* pTableId, STableQueryInfo *pTableQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
assert(pTableQueryInfo->lastKey >= 0);
assert(pTableQueryInfo->lastKey >= TSKEY_INITIAL_VAL);
setTagVal(pRuntimeEnv, pTableId, pQInfo->tsdb);
......@@ -3528,10 +3527,11 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryIn
}
}
void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo,
SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, SArray *pDataBlock,
__block_search_fn_t searchFn) {
void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis,
SArray *pDataBlock, __block_search_fn_t searchFn) {
SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
......@@ -3685,8 +3685,9 @@ static void queryCostStatis(SQInfo *pQInfo) {
// pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0,
// pSummary->skippedFileBlocks, pSummary->totalGenData);
qTrace("QInfo:%p cost: check blocks:%d, statis:%d, rows:%"PRId64", check rows:%"PRId64, pQInfo, pSummary->dataBlocks,
pSummary->loadBlockStatis, pSummary->dataInRows, pSummary->checkRows);
qTrace("QInfo:%p :cost summary: elpased time:%"PRId64" us, total blocks:%d, use block statis:%d, use block data:%d, "
"total rows:%"PRId64 ", check rows:%"PRId64, pQInfo, pSummary->elapsedTime, pSummary->totalBlocks,
pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
// qTrace("QInfo:%p cost: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk);
//
......@@ -4082,12 +4083,13 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryCostInfo* summary = &pRuntimeEnv->summary;
int64_t st = taosGetTimestampMs();
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (isQueryKilled(pQInfo)) {
break;
}
......@@ -4119,11 +4121,9 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
}
assert(pTableQueryInfo != NULL);
restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo);
printf("table:%d, groupIndex:%d, rows:%d\n", pTableQueryInfo->id.tid, pTableQueryInfo->groupIndex, blockInfo.tid);
setCurrentQueryTable(pRuntimeEnv, pTableQueryInfo);
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
if (!isIntervalQuery(pQuery)) {
......@@ -4132,15 +4132,14 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
} else { // interval query
TSKEY nextKey = blockInfo.window.skey;
setIntervalQueryRange(pQInfo, nextKey);
int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret;
return taosGetTimestampMs() - st;
}
/*int32_t ret = */setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
}
stableApplyFunctionsOnBlock(pRuntimeEnv, pTableQueryInfo, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
summary->totalRows += blockInfo.rows;
stableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pQuery->current->lastKey);
}
int64_t et = taosGetTimestampMs();
......@@ -4502,10 +4501,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
}
if (pQuery->rec.rows == 0) {
// queryCostStatis(pSupporter);
}
qTrace("QInfo:%p current:%lld, total:%lld", pQInfo, pQuery->rec.rows, pQuery->rec.total);
return;
}
......@@ -4810,7 +4805,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
}
// record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st);
assert(pQInfo->groupInfo.numOfTables == 1);
/* check if query is killed or not */
......@@ -4840,13 +4835,10 @@ static void stableQueryImpl(SQInfo *pQInfo) {
}
// record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
// taosFillSetStartInfo(&pQInfo->runtimeEnv.pFillInfo, pQuery->size, pQInfo->query.fillType);
pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st);
if (pQuery->rec.rows == 0) {
qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables,
pQuery->rec.total);
// queryCostStatis(pSupporter);
qTrace("QInfo:%p over, %d tables queried, %d rows are returned", pQInfo, pQInfo->groupInfo.numOfTables, pQuery->rec.total);
}
}
......@@ -5949,6 +5941,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return TSDB_CODE_INVALID_QHANDLE;
}
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
size += sizeof(int32_t);
......@@ -5962,7 +5955,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
int32_t code = pQInfo->code;
if (code == TSDB_CODE_SUCCESS) {
(*pRsp)->offset = htobe64(pQuery->limit.offset);
(*pRsp)->useconds = htobe64(pQInfo->elapsedTime);
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
} else {
(*pRsp)->offset = 0;
(*pRsp)->useconds = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册