提交 bc6a466b 编写于 作者: L Liu Jicong

refactor TAOS_RES process

上级 6a734a74
...@@ -136,7 +136,8 @@ static FORCE_INLINE void colDataAppendInt8(SColumnInfoData* pColumnInfoData, uin ...@@ -136,7 +136,8 @@ static FORCE_INLINE void colDataAppendInt8(SColumnInfoData* pColumnInfoData, uin
} }
static FORCE_INLINE void colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) { static FORCE_INLINE void colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT); ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT ||
pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int16_t*)p = *(int16_t*)v; *(int16_t*)p = *(int16_t*)v;
} }
...@@ -210,15 +211,19 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); ...@@ -210,15 +211,19 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
void blockDebugShowData(const SArray* dataBlocks); void blockDebugShowData(const SArray* dataBlocks);
static FORCE_INLINE int32_t blockEstimateEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + (int32_t)ceil(blockDataGetSerialRowSize(pBlock) * pBlock->info.rows);
}
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
int8_t compressed) { int8_t compressed) {
int32_t colSize = colDataGetLength(pColRes, numOfRows); int32_t colSize = colDataGetLength(pColRes, numOfRows);
return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data, return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
} }
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress) { int8_t needCompress) {
int32_t* colSizes = (int32_t*)data; int32_t* colSizes = (int32_t*)data;
data += numOfCols * sizeof(int32_t); data += numOfCols * sizeof(int32_t);
......
...@@ -75,12 +75,12 @@ typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq ...@@ -75,12 +75,12 @@ typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq
typedef struct { typedef struct {
int8_t inited; int8_t inited;
// ctl // ctl
int8_t threadStop; int8_t threadStop;
TdThread thread; TdThread thread;
TdThreadMutex lock; // used when app init and cleanup TdThreadMutex lock; // used when app init and cleanup
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX]; FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX];
FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX]; FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr; } SClientHbMgr;
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
...@@ -118,42 +118,42 @@ struct SAppInstInfo { ...@@ -118,42 +118,42 @@ struct SAppInstInfo {
}; };
typedef struct SAppInfo { typedef struct SAppInfo {
int64_t startTime; int64_t startTime;
char appName[TSDB_APP_NAME_LEN]; char appName[TSDB_APP_NAME_LEN];
char* ep; char* ep;
int32_t pid; int32_t pid;
int32_t numOfThreads; int32_t numOfThreads;
SHashObj* pInstMap; SHashObj* pInstMap;
TdThreadMutex mutex; TdThreadMutex mutex;
} SAppInfo; } SAppInfo;
typedef struct STscObj { typedef struct STscObj {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
char ver[128]; char ver[128];
int32_t acctId; int32_t acctId;
uint32_t connId; uint32_t connId;
int32_t connType; int32_t connType;
uint64_t id; // ref ID returned by taosAddRef uint64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo; SAppInstInfo* pAppInfo;
} STscObj; } STscObj;
typedef struct SResultColumn { typedef struct SResultColumn {
union { union {
char* nullbitmap; // bitmap, one bit for each item in the list char* nullbitmap; // bitmap, one bit for each item in the list
int32_t* offset; int32_t* offset;
}; };
char* pData; char* pData;
} SResultColumn; } SResultColumn;
typedef struct SReqResultInfo { typedef struct SReqResultInfo {
const char* pRspMsg; const char* pRspMsg;
const char* pData; const char* pData;
TAOS_FIELD* fields; // todo, column names are not needed. TAOS_FIELD* fields; // todo, column names are not needed.
TAOS_FIELD* userFields; // the fields info that return to user TAOS_FIELD* userFields; // the fields info that return to user
uint32_t numOfCols; uint32_t numOfCols;
int32_t* length; int32_t* length;
char** convertBuf; char** convertBuf;
...@@ -180,13 +180,31 @@ typedef struct SRequestSendRecvBody { ...@@ -180,13 +180,31 @@ typedef struct SRequestSendRecvBody {
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
SDataBuf requestMsg; SDataBuf requestMsg;
int64_t queryJob; // query job, created according to sql query DAG. int64_t queryJob; // query job, created according to sql query DAG.
struct SQueryPlan* pDag; // the query dag, generated according to the sql statement. struct SQueryPlan* pDag; // the query dag, generated according to the sql statement.
SReqResultInfo resInfo; SReqResultInfo resInfo;
} SRequestSendRecvBody; } SRequestSendRecvBody;
#define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define ERROR_MSG_BUF_DEFAULT_SIZE 512
enum {
RES_TYPE__QUERY = 1,
RES_TYPE__TMQ,
};
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
struct tmq_message_t {
int8_t resType;
SMqPollRsp msg;
char* topic;
void* vg;
SArray* res; // SArray<SReqResultInfo>
int32_t resIter;
};
typedef struct SRequestObj { typedef struct SRequestObj {
int8_t resType; // query or tmq
uint64_t requestId; uint64_t requestId;
int32_t type; // request type int32_t type; // request type
STscObj* pTscObj; STscObj* pTscObj;
...@@ -203,6 +221,25 @@ typedef struct SRequestObj { ...@@ -203,6 +221,25 @@ typedef struct SRequestObj {
SRequestSendRecvBody body; SRequestSendRecvBody body;
} SRequestObj; } SRequestObj;
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
tmq_message_t* msg = (tmq_message_t*)res;
int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter;
return (SReqResultInfo*)taosArrayGet(msg->res, resIter);
}
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res) {
tmq_message_t* msg = (tmq_message_t*)res;
if (++msg->resIter < taosArrayGetSize(msg->res)) {
return (SReqResultInfo*)taosArrayGet(msg->res, msg->resIter);
}
return NULL;
}
static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo);
return tmqGetCurResInfo(res);
}
extern SAppInfo appInfo; extern SAppInfo appInfo;
extern int32_t clientReqRefPool; extern int32_t clientReqRefPool;
extern int32_t clientConnRefPool; extern int32_t clientConnRefPool;
...@@ -238,14 +275,17 @@ void initMsgHandleFp(); ...@@ -238,14 +275,17 @@ void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port); uint16_t port);
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr); int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery); void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); void doSetOneRowPtr(SReqResultInfo* pResultInfo);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
// --- heartbeat // --- heartbeat
// global, called by mgmt // global, called by mgmt
......
...@@ -149,6 +149,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty ...@@ -149,6 +149,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
return NULL; return NULL;
} }
pRequest->resType = RES_TYPE__QUERY;
pRequest->pDb = getDbOfConnection(pObj); pRequest->pDb = getDbOfConnection(pObj);
pRequest->requestId = generateRequestId(); pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampMs(); pRequest->metric.start = taosGetTimestampMs();
......
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
...@@ -42,7 +41,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i ...@@ -42,7 +41,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo); SAppInstInfo* pAppInfo);
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port) { uint16_t port) {
...@@ -590,7 +589,7 @@ TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, c ...@@ -590,7 +589,7 @@ TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, c
return taos_connect(ipStr, userStr, passStr, dbStr, port); return taos_connect(ipStr, userStr, passStr, dbStr, port);
} }
static void doSetOneRowPtr(SReqResultInfo* pResultInfo) { void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
SResultColumn* pCol = &pResultInfo->pCol[i]; SResultColumn* pCol = &pResultInfo->pCol[i];
......
...@@ -71,7 +71,7 @@ void taos_cleanup(void) { ...@@ -71,7 +71,7 @@ void taos_cleanup(void) {
tscInfo("all local resources released"); tscInfo("all local resources released");
} }
setConfRet taos_set_config(const char *config) { setConfRet taos_set_config(const char *config) {
// TODO // TODO
setConfRet ret = {SET_CONF_RET_SUCC, {0}}; setConfRet ret = {SET_CONF_RET_SUCC, {0}};
return ret; return ret;
...@@ -133,8 +133,7 @@ int taos_field_count(TAOS_RES *res) { ...@@ -133,8 +133,7 @@ int taos_field_count(TAOS_RES *res) {
return 0; return 0;
} }
SRequestObj *pRequest = (SRequestObj *)res; SReqResultInfo *pResInfo = tscGetCurResInfo(res);
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
return pResInfo->numOfCols; return pResInfo->numOfCols;
} }
...@@ -145,7 +144,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { ...@@ -145,7 +144,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return NULL; return NULL;
} }
SReqResultInfo *pResInfo = &(((SRequestObj *)res)->body.resInfo); SReqResultInfo *pResInfo = tscGetCurResInfo(res);
return pResInfo->userFields; return pResInfo->userFields;
} }
...@@ -162,13 +161,36 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -162,13 +161,36 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL; return NULL;
} }
SRequestObj *pRequest = (SRequestObj *)res; if (TD_RES_QUERY(res)) {
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || SRequestObj *pRequest = (SRequestObj *)res;
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
return NULL; pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
} return NULL;
}
return doFetchRow(pRequest, true);
} else if (TD_RES_TMQ(res)) {
tmq_message_t *msg = ((tmq_message_t *)res);
SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter);
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
return doFetchRow(pRequest, true); if (pResultInfo->row == NULL) {
msg->resIter++;
pResultInfo = taosArrayGet(msg->res, msg->resIter);
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
}
return pResultInfo->row;
} else {
// assert to avoid uninitialization error
ASSERT(0);
}
return NULL;
} }
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
...@@ -260,12 +282,12 @@ int *taos_fetch_lengths(TAOS_RES *res) { ...@@ -260,12 +282,12 @@ int *taos_fetch_lengths(TAOS_RES *res) {
return NULL; return NULL;
} }
return ((SRequestObj *)res)->body.resInfo.length; SReqResultInfo *pResInfo = tscGetCurResInfo(res);
return pResInfo->length;
} }
TAOS_ROW *taos_result_block(TAOS_RES *res) { TAOS_ROW *taos_result_block(TAOS_RES *res) {
SRequestObj* pRequest = (SRequestObj*) res; if (res == NULL) {
if (pRequest == NULL) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return NULL; return NULL;
} }
...@@ -274,7 +296,8 @@ TAOS_ROW *taos_result_block(TAOS_RES *res) { ...@@ -274,7 +296,8 @@ TAOS_ROW *taos_result_block(TAOS_RES *res) {
return NULL; return NULL;
} }
return &pRequest->body.resInfo.row; SReqResultInfo *pResInfo = tscGetCurResInfo(res);
return &pResInfo->row;
} }
// todo intergrate with tDataTypes // todo intergrate with tDataTypes
...@@ -313,7 +336,7 @@ const char *taos_data_type(int type) { ...@@ -313,7 +336,7 @@ const char *taos_data_type(int type) {
const char *taos_get_client_info() { return version; } const char *taos_get_client_info() { return version; }
int taos_affected_rows(TAOS_RES *res) { int taos_affected_rows(TAOS_RES *res) {
if (res == NULL) { if (res == NULL || TD_RES_TMQ(res)) {
return 0; return 0;
} }
...@@ -323,12 +346,17 @@ int taos_affected_rows(TAOS_RES *res) { ...@@ -323,12 +346,17 @@ int taos_affected_rows(TAOS_RES *res) {
} }
int taos_result_precision(TAOS_RES *res) { int taos_result_precision(TAOS_RES *res) {
SRequestObj* pRequest = (SRequestObj*) res; if (res == NULL) {
if (pRequest == NULL) {
return TSDB_TIME_PRECISION_MILLI; return TSDB_TIME_PRECISION_MILLI;
} }
if (TD_RES_QUERY(res)) {
return pRequest->body.resInfo.precision; SRequestObj *pRequest = (SRequestObj *)res;
return pRequest->body.resInfo.precision;
} else if (TD_RES_TMQ(res)) {
SReqResultInfo *info = tmqGetCurResInfo(res);
return info->precision;
}
return TSDB_TIME_PRECISION_MILLI;
} }
int taos_select_db(TAOS *taos, const char *db) { int taos_select_db(TAOS *taos, const char *db) {
...@@ -370,90 +398,115 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -370,90 +398,115 @@ void taos_stop_query(TAOS_RES *res) {
} }
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
SRequestObj *pRequestObj = res; SReqResultInfo *pResultInfo = tscGetCurResInfo(res);
SReqResultInfo *pResultInfo = &pRequestObj->body.resInfo;
if (col >= pResultInfo->numOfCols || col < 0 || row >= pResultInfo->numOfRows || row < 0) { if (col >= pResultInfo->numOfCols || col < 0 || row >= pResultInfo->numOfRows || row < 0) {
return true; return true;
} }
SResultColumn *pCol = &pRequestObj->body.resInfo.pCol[col]; SResultColumn *pCol = &pResultInfo->pCol[col];
return colDataIsNull_f(pCol->nullbitmap, row); return colDataIsNull_f(pCol->nullbitmap, row);
} }
bool taos_is_update_query(TAOS_RES *res) { bool taos_is_update_query(TAOS_RES *res) { return taos_num_fields(res) == 0; }
return taos_num_fields(res) == 0;
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
/*int32_t code = */taos_fetch_block_s(res, &numOfRows, rows); /*int32_t code = */ taos_fetch_block_s(res, &numOfRows, rows);
return numOfRows; return numOfRows;
} }
int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) { int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
SRequestObj *pRequest = (SRequestObj *)res; if (res == NULL) {
if (pRequest == NULL) {
return 0; return 0;
} }
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
(*rows) = NULL; (*rows) = NULL;
(*numOfRows) = 0; (*numOfRows) = 0;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0; return 0;
} }
doFetchRow(pRequest, false); doFetchRow(pRequest, false);
// TODO refactor // TODO refactor
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->current = pResultInfo->numOfRows; pResultInfo->current = pResultInfo->numOfRows;
(*rows) = pResultInfo->row; (*rows) = pResultInfo->row;
(*numOfRows) = pResultInfo->numOfRows; (*numOfRows) = pResultInfo->numOfRows;
return pRequest->code; return pRequest->code;
} } else if (TD_RES_TMQ(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
if (pResultInfo == NULL) return -1;
int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) { pResultInfo->current = pResultInfo->numOfRows;
SRequestObj *pRequest = (SRequestObj *)res; (*rows) = pResultInfo->row;
if (pRequest == NULL) { (*numOfRows) = pResultInfo->numOfRows;
return 0; return 0;
} else {
ASSERT(0);
return -1;
} }
}
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { if (res == NULL) {
return 0; return 0;
} }
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
doFetchRow(pRequest, false); if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0;
}
doFetchRow(pRequest, false);
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; pResultInfo->current = pResultInfo->numOfRows;
(*numOfRows) = pResultInfo->numOfRows;
(*pData) = (void *)pResultInfo->pData;
return 0;
pResultInfo->current = pResultInfo->numOfRows; } else if (TD_RES_TMQ(res)) {
(*numOfRows) = pResultInfo->numOfRows; SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
(*pData) = (void*) pResultInfo->pData; if (pResultInfo == NULL) return -1;
return 0; pResultInfo->current = pResultInfo->numOfRows;
(*numOfRows) = pResultInfo->numOfRows;
(*pData) = (void *)pResultInfo->pData;
return 0;
} else {
ASSERT(0);
return -1;
}
} }
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
SRequestObj *pRequest = (SRequestObj *)res; if (res == NULL) {
if (pRequest == NULL) {
return 0; return 0;
} }
int32_t numOfFields = taos_num_fields(pRequest); int32_t numOfFields = taos_num_fields(res);
if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) { if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) {
return 0; return 0;
} }
TAOS_FIELD* pField = &pRequest->body.resInfo.userFields[columnIndex]; SReqResultInfo *pResInfo = tscGetCurResInfo(res);
TAOS_FIELD *pField = &pResInfo->userFields[columnIndex];
if (!IS_VAR_DATA_TYPE(pField->type)) { if (!IS_VAR_DATA_TYPE(pField->type)) {
return 0; return 0;
} }
return pRequest->body.resInfo.pCol[columnIndex].offset; return pResInfo->pCol[columnIndex].offset;
} }
int taos_validate_sql(TAOS *taos, const char *sql) { return true; } int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
...@@ -483,18 +536,19 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -483,18 +536,19 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
// TODO // TODO
} }
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp,
// TODO void *param, int interval) {
return NULL; // TODO
return NULL;
} }
TAOS_RES *taos_consume(TAOS_SUB *tsub) { TAOS_RES *taos_consume(TAOS_SUB *tsub) {
// TODO // TODO
return NULL; return NULL;
} }
void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
// TODO // TODO
} }
int taos_load_table_info(TAOS *taos, const char *tableNameList) { int taos_load_table_info(TAOS *taos, const char *tableNameList) {
...@@ -553,26 +607,26 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { ...@@ -553,26 +607,26 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
} }
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) { int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
// TODO // TODO
return -1; return -1;
} }
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) { int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
// TODO // TODO
return -1; return -1;
} }
int taos_stmt_add_batch(TAOS_STMT* stmt) { int taos_stmt_add_batch(TAOS_STMT *stmt) {
// TODO // TODO
return -1; return -1;
} }
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) { TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
// TODO // TODO
return NULL; return NULL;
} }
int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) { int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
// TODO // TODO
return -1; return -1;
} }
...@@ -17,27 +17,13 @@ ...@@ -17,27 +17,13 @@
#include "clientLog.h" #include "clientLog.h"
#include "parser.h" #include "parser.h"
#include "planner.h" #include "planner.h"
#include "scheduler.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdef.h" #include "tdef.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsgtype.h" #include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tqueue.h" #include "tqueue.h"
#include "tref.h" #include "tref.h"
typedef struct {
int32_t curBlock;
int32_t curRow;
void** uData;
} SMqRowIter;
struct tmq_message_t {
SMqPollRsp msg;
void* vg;
SMqRowIter iter;
};
struct tmq_list_t { struct tmq_list_t {
SArray container; SArray container;
}; };
...@@ -1566,30 +1552,4 @@ const char* tmq_err2str(tmq_resp_err_t err) { ...@@ -1566,30 +1552,4 @@ const char* tmq_err2str(tmq_resp_err_t err) {
return "fail"; return "fail";
} }
TAOS_ROW tmq_get_row(tmq_message_t* message) {
SMqPollRsp* rsp = &message->msg;
while (1) {
if (message->iter.curBlock < taosArrayGetSize(rsp->pBlockData)) {
SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->iter.curBlock);
if (message->iter.curRow < pBlock->info.rows) {
for (int i = 0; i < pBlock->info.numOfCols; i++) {
SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i);
if (colDataIsNull_s(pData, message->iter.curRow))
message->iter.uData[i] = NULL;
else {
message->iter.uData[i] = colDataGetData(pData, message->iter.curRow);
}
}
message->iter.curRow++;
return message->iter.uData;
} else {
message->iter.curBlock++;
message->iter.curRow = 0;
continue;
}
}
return NULL;
}
}
char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; } char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; }
...@@ -92,9 +92,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, ...@@ -92,9 +92,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false; return false;
} }
// NOTE: there are four bytes of an integer more than the required buffer space. pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockEstimateEncodeSize(pInput->pData);
// struct size + data payload + length for each column + bitmap length
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) + blockDataGetSize(pInput->pData);
pBuf->pData = taosMemoryMalloc(pBuf->allocSize); pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) { if (pBuf->pData == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册