未验证 提交 f2fdca2e 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1802 from taosdata/feature/query

Feature/query
......@@ -280,6 +280,7 @@ typedef struct {
SResRec * pGroupRec;
char * data;
void ** tsrow;
int32_t* length; // length for each field for current row
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex * pColumnIndex;
struct SLocalReducer *pLocalReducer;
......@@ -421,7 +422,7 @@ int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *s
void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column, int16_t bytes);
void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
extern void * pVnodeConn;
extern void * tscCacheHandle;
......
......@@ -317,7 +317,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
if (pSup->pSqlExpr != NULL) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i, pSup->pSqlExpr->resBytes);
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
} else {
// todo add
}
......
......@@ -425,7 +425,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
assert(0);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i, 0);
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
}
*rows = pRes->tsrow;
......@@ -725,6 +725,15 @@ char *taos_get_server_info(TAOS *taos) {
return pObj->sversion;
}
int* taos_fetch_lengths(TAOS_RES *res) {
SSqlObj* pSql = (SSqlObj* ) res;
if (pSql == NULL || pSql->signature != pSql) {
return NULL;
}
return pSql->res.length;
}
char *taos_get_client_info() { return version; }
void taos_stop_query(TAOS_RES *res) {
......@@ -796,7 +805,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
size_t xlen = 0;
for (xlen = 0; xlen <= fields[i].bytes; xlen++) {
for (xlen = 0; xlen < fields[i].bytes - VARSTR_HEADER_SIZE; xlen++) {
char c = ((char *)row[i])[xlen];
if (c == 0) break;
str[len++] = c;
......
......@@ -1849,6 +1849,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
if (pRes->tsrow == NULL) {
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
pRes->length = calloc(numOfExprs, sizeof(int32_t));
}
bool success = false;
......@@ -1967,7 +1968,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
SFieldSupInfo* pSup = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i);
if (pSup->pSqlExpr != NULL) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i, pSup->pSqlExpr->resBytes);
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
}
// primary key column cannot be null in interval query, no need to check
......
......@@ -210,7 +210,7 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI
return false;
}
// order by column exists, not a non-ordered projection query
// order by columnIndex exists, not a non-ordered projection query
return pQueryInfo->order.orderColId < 0;
}
......@@ -219,7 +219,7 @@ bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableInde
return false;
}
// order by column exists, a non-ordered projection query
// order by columnIndex exists, a non-ordered projection query
return pQueryInfo->order.orderColId >= 0;
}
......@@ -286,13 +286,15 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
pRes->numOfCols = numOfOutput;
pRes->tsrow = calloc(POINTER_BYTES, numOfOutput);
pRes->buffer = calloc(POINTER_BYTES, numOfOutput);
pRes->tsrow = calloc(numOfOutput, POINTER_BYTES);
pRes->length = calloc(numOfOutput, sizeof(int32_t)); // todo refactor
pRes->buffer = calloc(numOfOutput, POINTER_BYTES);
// not enough memory
if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
tfree(pRes->tsrow);
tfree(pRes->buffer);
tfree(pRes->length);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pRes->code;
......@@ -312,6 +314,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
tfree(pRes->pRsp);
tfree(pRes->tsrow);
tfree(pRes->length);
tfree(pRes->pGroupRec);
tfree(pRes->pColumnIndex);
......@@ -592,7 +595,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
}
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
// TODO: optimize this function
// TODO: optimize this function, handle the case while binary is not presented
int len = 0;
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
......@@ -924,7 +927,7 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr));
pExpr->functionId = functionId;
// set the correct column index
// set the correct columnIndex index
if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
} else {
......@@ -1063,7 +1066,7 @@ void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy)
}
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
// ignore the tbname column to be inserted into source list
// ignore the tbname columnIndex to be inserted into source list
if (pColIndex->columnIndex < 0) {
return NULL;
}
......@@ -2124,22 +2127,30 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
}
}
char* tscGetResultColumnChr(SSqlRes* pRes, SQueryInfo* pQueryInfo, int32_t column, int16_t bytes) {
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, column);
void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL);
int32_t type = pInfo->pSqlExpr->resType;
int32_t bytes = pInfo->pSqlExpr->resBytes;
char* pData = ((char*) pRes->data) + pInfo->pSqlExpr->offset * pRes->numOfRows + bytes * pRes->row;
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
int32_t realLen = varDataLen(pData);
assert(realLen <= bytes - VARSTR_HEADER_SIZE);
if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
*(char*) (pData + realLen + VARSTR_HEADER_SIZE) = 0;
}
return pData + VARSTR_HEADER_SIZE; // head is the length of binary/nchar data
pRes->tsrow[columnIndex] = pData + VARSTR_HEADER_SIZE;
pRes->length[columnIndex] = realLen;
} else {
return pData;
assert(bytes == tDataTypeDesc[type].nSize);
pRes->tsrow[columnIndex] = pData;
pRes->length[columnIndex] = bytes;
}
}
......@@ -32,7 +32,7 @@ extern "C" {
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) do {\
char* _e = stpncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), (_maxs));\
*(VarDataLenT*)(x) = _e - (x);\
*(VarDataLenT*)(x) = (_e - (x) - VARSTR_HEADER_SIZE);\
} while(0)
#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) do {\
......
......@@ -228,7 +228,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int32_t num = taosGetQueueNumber(pWorker->qset);
if (num > 0) {
usleep(30);
usleep(30000);
sched_yield();
} else {
taosFreeQall(pWorker->qall);
......
......@@ -53,9 +53,9 @@ typedef enum {
} TSDB_OPTION;
typedef struct taosField {
char name[64];
short bytes;
char type;
char name[64];
short bytes;
uint8_t type;
} TAOS_FIELD;
#ifdef _TD_GO_DLL_
......@@ -104,6 +104,8 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res);
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
int taos_validate_sql(TAOS *taos, const char *sql);
int* taos_fetch_lengths(TAOS_RES *res);
// TAOS_RES *taos_list_tables(TAOS *mysql, const char *wild);
// TAOS_RES *taos_list_dbs(TAOS *mysql, const char *wild);
......
......@@ -36,14 +36,17 @@ extern "C" {
typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT;
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT)))
#define varDataCopy(dst, v) memcpy((dst), (void*) (v), varDataTLen(v))
#define varDataLenByData(v) (*(VarDataLenT *)(((char*)(v)) - VARSTR_HEADER_SIZE))
// this data type is internally used only in 'in' query to hold the values
#define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1)
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
// Bytes for each type.
extern const int32_t TYPE_BYTES[11];
......
......@@ -350,6 +350,8 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
TAOS_FIELD *fields = taos_fetch_fields(result);
row = taos_fetch_row(result);
int32_t* length = taos_fetch_lengths(result);
char t_str[TSDB_MAX_BYTES_PER_ROW] = "\0";
int l[TSDB_MAX_COLUMNS] = {0};
int maxLenColumnName = 0;
......@@ -457,7 +459,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memset(t_str, 0, TSDB_MAX_BYTES_PER_ROW);
memcpy(t_str, row[i], fields[i].bytes);
memcpy(t_str, row[i], length[i]);
/* printf("%-*s|",max(fields[i].bytes, strlen(fields[i].name)),
* t_str); */
/* printf("%-*s|", l[i], t_str); */
......@@ -532,7 +534,8 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memset(t_str, 0, TSDB_MAX_BYTES_PER_ROW);
memcpy(t_str, row[i], fields[i].bytes);
memcpy(t_str, row[i], length[i]);
l[i] = MAX(fields[i].bytes, strlen(fields[i].name));
shellPrintNChar(t_str, l[i], printMode);
break;
......@@ -610,7 +613,7 @@ int shellDumpResult(TAOS *con, char *fname, int *error_no, bool printMode) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memset(t_str, 0, TSDB_MAX_BYTES_PER_ROW);
memcpy(t_str, row[i], fields[i].bytes);
memcpy(t_str, row[i], length[i]);
fprintf(fp, "\'%s\'", t_str);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
......
......@@ -672,7 +672,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char *prec = (pDb->cfg.precision == TSDB_TIME_PRECISION_MILLI) ? TSDB_TIME_PRECISION_MILLI_STR
: TSDB_TIME_PRECISION_MICRO_STR;
strcpy(pWrite, prec);
STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
......@@ -676,6 +676,8 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
while ((row = taos_fetch_row(result))) {
if (numOfRows < MAX_QUERY_ROW_NUM) {
TAOS_FIELD *fields = taos_fetch_fields(result);
int* length = taos_fetch_lengths(result);
for (int i = 0; i < num_fields; i++) {
char *value = NULL;
if (i < MAX_QUERY_COL_NUM) {
......@@ -734,8 +736,8 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memset(value, 0, MAX_QUERY_VALUE_LEN);
memcpy(value, row[i], fields[i].bytes);
value[fields[i].bytes] = 0;
memcpy(value, row[i], length[i]);
value[length[i]] = 0;
// snprintf(value, fields[i].bytes, "%s", (char *)row[i]);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册