提交 4e954c9e 编写于 作者: A Alex Duan

[TS-530]<fix> support CQ write to supertable

上级 94b021ce
......@@ -894,6 +894,89 @@ int taos_print_row_ex(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_field
return len;
}
// print field value to str
int taos_print_field(char *str, void* value, TAOS_FIELD *field) {
// check valid
if (str == NULL || value == NULL || field == NULL) {
return 0;
}
// get value
int len = 0;
switch (field->type) {
//
// fixed length
//
case TSDB_DATA_TYPE_TINYINT:
len = sprintf(str, "%d", *((int8_t *)value));
break;
case TSDB_DATA_TYPE_UTINYINT:
len = sprintf(str, "%u", *((uint8_t *)value));
break;
case TSDB_DATA_TYPE_SMALLINT:
len = sprintf(str, "%d", *((int16_t *)value));
break;
case TSDB_DATA_TYPE_USMALLINT:
len = sprintf(str, "%u", *((uint16_t *)value));
break;
case TSDB_DATA_TYPE_INT:
len = sprintf(str, "%d", *((int32_t *)value));
break;
case TSDB_DATA_TYPE_UINT:
len = sprintf(str, "%u", *((uint32_t *)value));
break;
case TSDB_DATA_TYPE_BIGINT:
len = sprintf(str, "%" PRId64, *((int64_t *)value));
break;
case TSDB_DATA_TYPE_UBIGINT:
len = sprintf(str, "%" PRIu64, *((uint64_t *)value));
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(value);
len = sprintf(str, "%f", fv);
} break;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(value);
len = sprintf(str, "%lf", dv);
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len = sprintf(str, "%" PRId64, *((int64_t *)value));
break;
case TSDB_DATA_TYPE_BOOL:
len = sprintf(str, "%d", *((int8_t *)value));
//
// variant length
//
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: {
len = varDataLen((char*)value - VARSTR_HEADER_SIZE);
if (field->type == TSDB_DATA_TYPE_BINARY) {
assert(len <= field->bytes && len >= 0);
} else {
assert(len <= field->bytes * TSDB_NCHAR_SIZE && len >= 0);
}
memcpy(str, value, len);
} break;
default:
break;
}
return len;
}
static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param);
......
......@@ -461,17 +461,30 @@ bool toAnotherTable(STscObj *pTscObj, char *superName, TAOS_FIELD *fields, int32
// add row to hash to group by tbname
bool tbHashAdd(SHashObj *tbHash, TAOS_ROW row, TAOS_FIELD* fields, int32_t idx, int32_t numCols) {
void *v = row[idx];
VarDataLenT len = varDataLen((char*)v - VARSTR_HEADER_SIZE);
char *key = v;
TAOS_FIELD *field = &fields[idx];
VarDataLenT len = 0;
char str[128];
memset(str, 0, sizeof(str));
char *key = str;
// get key and len
if(field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_NCHAR) {
key = v;
len = varDataLen((char*)v - VARSTR_HEADER_SIZE);
} else {
len = taos_print_field(str, v, field);
}
if(len == 0) {
return false;
}
// get array point from hash
// append key with len
SArray *arr = NULL;
// get arr from hash
void* pdata = taosHashGet(tbHash, key, len);
if(pdata) {
arr = *(SArray **)pdata;
}
// get arr from new
// if group is null create new
if(arr == NULL) {
arr = (SArray *)taosArrayInit(10, sizeof(TAOS_ROW));
if(arr == NULL) {
......@@ -481,7 +494,7 @@ bool tbHashAdd(SHashObj *tbHash, TAOS_ROW row, TAOS_FIELD* fields, int32_t idx,
taosHashPut(tbHash, key, len, &arr, sizeof(SArray *));
}
// add to array
// append to group
int32_t new_len = sizeof(void*) * numCols;
TAOS_ROW new_row = (TAOS_ROW)tmalloc(new_len);
memcpy(new_row, row, new_len);
......@@ -507,7 +520,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
// init hash
SHashObj* tbHash = NULL;
int32_t colIdx = 1;
int32_t colIdx = -1;
TAOS_FIELD *fields = NULL;
int32_t dstColsNum = pStream->dstCols;
int32_t fieldsNum = 0;
......@@ -529,6 +542,11 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
break;
}
}
// set default with last fields if
if(colIdx == -1) {
colIdx = fieldsNum - 1;
}
}
// save rows
......@@ -560,7 +578,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
pStream->stime = taosTimeAdd(pStream->stime, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision);
}
// actually only one row is returned. this following is not necessary
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
if(pQueryInfo->pQInfo->code == TSDB_CODE_SUCCESS) {
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
}
} else { // numOfRows == 0, all data has been retrieved
pStream->useconds += pSql->res.useconds;
if (pStream->numOfRes == 0) {
......
......@@ -137,8 +137,11 @@ DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
// row to string
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT int taos_print_row_ex(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields, char split, bool addQuota);
// one field to string
DLL_EXPORT int taos_print_field(char *str, void* value, TAOS_FIELD *field);
DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
......
......@@ -220,6 +220,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId);
setQueryKilled(pQInfo);
return doBuildResCheck(pQInfo);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册