未验证 提交 3bda4cf3 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #17332 from taosdata/refact/code_format

refact: code format
......@@ -22,6 +22,7 @@ extern "C" {
#include "tlog.h"
// clang-format off
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", DEBUG_FATAL, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", DEBUG_ERROR, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", DEBUG_WARN, cDebugFlag, __VA_ARGS__); }} while(0)
......@@ -30,6 +31,7 @@ extern "C" {
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscPerf(...) do { taosPrintLog("TSC ", 0, cDebugFlag, __VA_ARGS__); } while(0)
// clang-format on
#ifdef __cplusplus
}
......
......@@ -42,37 +42,37 @@ typedef enum {
} STMT_STATUS;
typedef struct SStmtTableCache {
STableDataBlocks* pDataBlock;
void* boundTags;
STableDataBlocks *pDataBlock;
void *boundTags;
} SStmtTableCache;
typedef struct SStmtQueryResInfo {
TAOS_FIELD* fields;
TAOS_FIELD* userFields;
uint32_t numOfCols;
int32_t precision;
TAOS_FIELD *fields;
TAOS_FIELD *userFields;
uint32_t numOfCols;
int32_t precision;
} SStmtQueryResInfo;
typedef struct SStmtBindInfo {
bool needParse;
bool inExecCache;
uint64_t tbUid;
uint64_t tbSuid;
int32_t sBindRowNum;
int32_t sBindLastIdx;
int8_t tbType;
bool tagsCached;
void* boundTags;
char tbName[TSDB_TABLE_FNAME_LEN];
char tbFName[TSDB_TABLE_FNAME_LEN];
char stbFName[TSDB_TABLE_FNAME_LEN];
SName sname;
bool needParse;
bool inExecCache;
uint64_t tbUid;
uint64_t tbSuid;
int32_t sBindRowNum;
int32_t sBindLastIdx;
int8_t tbType;
bool tagsCached;
void *boundTags;
char tbName[TSDB_TABLE_FNAME_LEN];
char tbFName[TSDB_TABLE_FNAME_LEN];
char stbFName[TSDB_TABLE_FNAME_LEN];
SName sname;
} SStmtBindInfo;
typedef struct SStmtExecInfo {
int32_t affectedRows;
SRequestObj* pRequest;
SHashObj* pBlockHash;
SRequestObj *pRequest;
SHashObj *pBlockHash;
bool autoCreateTbl;
} SStmtExecInfo;
......@@ -80,20 +80,20 @@ typedef struct SStmtSQLInfo {
STMT_TYPE type;
STMT_STATUS status;
uint64_t runTimes;
SHashObj* pTableCache; //SHash<SStmtTableCache>
SQuery* pQuery;
char* sqlStr;
SHashObj *pTableCache; // SHash<SStmtTableCache>
SQuery *pQuery;
char *sqlStr;
int32_t sqlLen;
SArray* nodeList;
SArray *nodeList;
SStmtQueryResInfo queryRes;
bool autoCreateTbl;
SHashObj* pVgHash;
SHashObj *pVgHash;
} SStmtSQLInfo;
typedef struct STscStmt {
STscObj* taos;
SCatalog* pCatalog;
int32_t affectedRows;
STscObj *taos;
SCatalog *pCatalog;
int32_t affectedRows;
SStmtSQLInfo sql;
SStmtExecInfo exec;
......@@ -103,28 +103,48 @@ typedef struct STscStmt {
#define STMT_STATUS_NE(S) (pStmt->sql.status != STMT_##S)
#define STMT_STATUS_EQ(S) (pStmt->sql.status == STMT_##S)
#define STMT_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define STMT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define STMT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
TAOS_STMT *stmtInit(STscObj* taos);
int stmtClose(TAOS_STMT *stmt);
int stmtExec(TAOS_STMT *stmt);
#define STMT_ERR_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
return _code; \
} \
} while (0)
#define STMT_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
} \
return _code; \
} while (0)
#define STMT_ERR_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
TAOS_STMT *stmtInit(STscObj *taos);
int stmtClose(TAOS_STMT *stmt);
int stmtExec(TAOS_STMT *stmt);
const char *stmtErrstr(TAOS_STMT *stmt);
int stmtAffectedRows(TAOS_STMT *stmt);
int stmtAffectedRowsOnce(TAOS_STMT *stmt);
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName);
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields);
int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields);
int stmtIsInsert(TAOS_STMT *stmt, int *insert);
int stmtGetParamNum(TAOS_STMT *stmt, int *nums);
int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes);
int stmtAddBatch(TAOS_STMT *stmt);
TAOS_RES *stmtUseResult(TAOS_STMT *stmt);
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx);
int stmtAffectedRows(TAOS_STMT *stmt);
int stmtAffectedRowsOnce(TAOS_STMT *stmt);
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName);
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
int stmtGetTagFields(TAOS_STMT *stmt, int *nums, TAOS_FIELD_E **fields);
int stmtGetColFields(TAOS_STMT *stmt, int *nums, TAOS_FIELD_E **fields);
int stmtIsInsert(TAOS_STMT *stmt, int *insert);
int stmtGetParamNum(TAOS_STMT *stmt, int *nums);
int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes);
int stmtAddBatch(TAOS_STMT *stmt);
TAOS_RES *stmtUseResult(TAOS_STMT *stmt);
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx);
#ifdef __cplusplus
}
......
......@@ -7,8 +7,8 @@
static int32_t stmtCreateRequest(STscStmt* pStmt) {
int32_t code = 0;
if (pStmt->exec.pRequest == NULL) {
if (pStmt->exec.pRequest == NULL) {
code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest);
if (TSDB_CODE_SUCCESS == code) {
pStmt->exec.pRequest->syncQuery = true;
......@@ -232,7 +232,7 @@ int32_t stmtParseSql(STscStmt* pStmt) {
};
STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
pStmt->bInfo.needParse = false;
......@@ -402,7 +402,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64 , pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
return TSDB_CODE_SUCCESS;
}
......@@ -600,8 +600,9 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
}
tscDebug("start to bind stmt tag values");
STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName, pStmt->bInfo.sname.tname,
tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
pStmt->exec.pRequest->msgBufLen));
return TSDB_CODE_SUCCESS;
}
......
......@@ -212,7 +212,8 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
tmq_commit_async(tmq, res, commit_cb, consumer);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) {
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj,
jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniError("jobj:%p, tmq is closed", jobj);
......@@ -223,7 +224,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConsumerCloseImp(JNIEnv *env, jobject jobj,
jlong jtmq) {
jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
......
......@@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <iostream>
#include <gtest/gtest.h>
#include <iostream>
#include "clientInt.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "clientInt.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
......@@ -26,8 +26,8 @@
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "taos.h"
#include "executor.h"
#include "taos.h"
namespace {
void showDB(TAOS* pConn) {
......@@ -50,11 +50,11 @@ void printResult(TAOS_RES* pRes) {
int32_t numOfFields = taos_num_fields(pRes);
int32_t n = 0;
char str[512] = {0};
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);
for(int32_t i = 0; i < numOfFields; ++i) {
printf("(%d):%d " , i, length[i]);
for (int32_t i = 0; i < numOfFields; ++i) {
printf("(%d):%d ", i, length[i]);
}
printf("\n");
......@@ -123,16 +123,16 @@ void createNewTable(TAOS* pConn, int32_t index) {
}
taos_free_result(pRes);
for(int32_t i = 0; i < 3280; i += 20) {
for (int32_t i = 0; i < 3280; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", index,
i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
index, i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7,
i + 7, i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14,
i + 14, i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p));
......@@ -150,7 +150,7 @@ int main(int argc, char** argv) {
TEST(testCase, driverInit_Test) {
// taosInitGlobalCfg();
// taos_init();
// taos_init();
}
TEST(testCase, connect_Test) {
......@@ -670,11 +670,11 @@ TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
// if (taos_errno(pRes) != 0) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
......@@ -697,27 +697,27 @@ TEST(testCase, projection_query_tables) {
}
taos_free_result(pRes);
for(int32_t i = 0; i < 2; ++i) {
for (int32_t i = 0; i < 2; ++i) {
printf("create table :%d\n", i);
createNewTable(pConn, i);
}
//
// pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
taos_free_result(pRes);
taos_close(pConn);
......@@ -860,5 +860,4 @@ TEST(testCase, update_test) {
#endif
#pragma GCC diagnostic pop
此差异已折叠。
......@@ -20,8 +20,6 @@
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
......
此差异已折叠。
......@@ -636,7 +636,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
double el = (taosGetTimestampUs() - st)/1000.0;
double el = (taosGetTimestampUs() - st) / 1000.0;
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms",
pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate, el);
return TSDB_CODE_SUCCESS;
......
......@@ -13,16 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "trpc.h"
#include "catalogInt.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h"
#include "trpc.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {0};
SCtgDebug gCTGDebug = {0};
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
ASSERT(*(int32_t*)param == 1);
void ctgdUserCallback(SMetaData *pResult, void *param, int32_t code) {
ASSERT(*(int32_t *)param == 1);
taosMemoryFree(param);
qDebug("async call result: %s", tstrerror(code));
......@@ -36,16 +36,19 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if (pResult->pTableMeta && taosArrayGetSize(pResult->pTableMeta) > 0) {
num = taosArrayGetSize(pResult->pTableMeta);
for (int32_t i = 0; i < num; ++i) {
STableMeta *p = *(STableMeta **)taosArrayGet(pResult->pTableMeta, i);
STableMeta *p = *(STableMeta **)taosArrayGet(pResult->pTableMeta, i);
STableComInfo *c = &p->tableInfo;
if (TSDB_CHILD_TABLE == p->tableType) {
qDebug("table meta: type:%d, vgId:%d, uid:0x%" PRIx64 ",suid:0x%" PRIx64, p->tableType, p->vgId, p->uid, p->suid);
qDebug("table meta: type:%d, vgId:%d, uid:0x%" PRIx64 ",suid:0x%" PRIx64, p->tableType, p->vgId, p->uid,
p->suid);
} else {
qDebug("table meta: type:%d, vgId:%d, uid:0x%" PRIx64 ",suid:0x%" PRIx64 ",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d",
p->tableType, p->vgId, p->uid, p->suid, p->sversion, p->tversion, c->numOfTags, c->precision, c->numOfColumns, c->rowSize);
qDebug("table meta: type:%d, vgId:%d, uid:0x%" PRIx64 ",suid:0x%" PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d",
p->tableType, p->vgId, p->uid, p->suid, p->sversion, p->tversion, c->numOfTags, c->precision,
c->numOfColumns, c->rowSize);
}
int32_t colNum = c->numOfColumns + c->numOfTags;
for (int32_t j = 0; j < colNum; ++j) {
SSchema *s = &p->schema[j];
......@@ -59,11 +62,11 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if (pResult->pDbVgroup && taosArrayGetSize(pResult->pDbVgroup) > 0) {
num = taosArrayGetSize(pResult->pDbVgroup);
for (int32_t i = 0; i < num; ++i) {
SArray *pDb = *(SArray**)taosArrayGet(pResult->pDbVgroup, i);
SArray *pDb = *(SArray **)taosArrayGet(pResult->pDbVgroup, i);
int32_t vgNum = taosArrayGetSize(pDb);
qDebug("db %d vgInfo:", i);
for (int32_t j = 0; j < vgNum; ++j) {
SVgroupInfo* pInfo = taosArrayGet(pDb, j);
SVgroupInfo *pInfo = taosArrayGet(pDb, j);
qDebug("vg :%d info: vgId:%d", j, pInfo->vgId);
}
}
......@@ -84,7 +87,7 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if (pResult->pTableHash && taosArrayGetSize(pResult->pTableHash) > 0) {
num = taosArrayGetSize(pResult->pTableHash);
for (int32_t i = 0; i < num; ++i) {
SVgroupInfo* pInfo = taosArrayGet(pResult->pTableHash, i);
SVgroupInfo *pInfo = taosArrayGet(pResult->pTableHash, i);
qDebug("table %d vg info: vgId:%d", i, pInfo->vgId);
}
} else {
......@@ -94,7 +97,7 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if (pResult->pUdfList && taosArrayGetSize(pResult->pUdfList) > 0) {
num = taosArrayGetSize(pResult->pUdfList);
for (int32_t i = 0; i < num; ++i) {
SFuncInfo* pInfo = taosArrayGet(pResult->pUdfList, i);
SFuncInfo *pInfo = taosArrayGet(pResult->pUdfList, i);
qDebug("udf %d info: name:%s, funcType:%d", i, pInfo->name, pInfo->funcType);
}
} else {
......@@ -104,35 +107,34 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
if (pResult->pDbCfg && taosArrayGetSize(pResult->pDbCfg) > 0) {
num = taosArrayGetSize(pResult->pDbCfg);
for (int32_t i = 0; i < num; ++i) {
SDbCfgInfo* pInfo = taosArrayGet(pResult->pDbCfg, i);
SDbCfgInfo *pInfo = taosArrayGet(pResult->pDbCfg, i);
qDebug("db %d info: numOFVgroups:%d, numOfStables:%d", i, pInfo->numOfVgroups, pInfo->numOfStables);
}
} else {
qDebug("empty db cfg info");
}
}
if (pResult->pUser && taosArrayGetSize(pResult->pUser) > 0) {
num = taosArrayGetSize(pResult->pUser);
for (int32_t i = 0; i < num; ++i) {
bool* auth = taosArrayGet(pResult->pUser, i);
bool *auth = taosArrayGet(pResult->pUser, i);
qDebug("user auth %d info: %d", i, *auth);
}
} else {
qDebug("empty user auth info");
}
}
if (pResult->pQnodeList && taosArrayGetSize(pResult->pQnodeList) > 0) {
num = taosArrayGetSize(pResult->pQnodeList);
for (int32_t i = 0; i < num; ++i) {
SQueryNodeAddr* qaddr = taosArrayGet(pResult->pQnodeList, i);
SQueryNodeAddr *qaddr = taosArrayGet(pResult->pQnodeList, i);
qDebug("qnode %d info: id:%d", i, qaddr->nodeId);
}
} else {
qDebug("empty qnode info");
}
}
}
/*
prepare SQL:
create database db1;
......@@ -147,8 +149,8 @@ grant write on db2.* to user1;
create function udf1 as '/tmp/libudf1.so' outputtype int;
create aggregate function udf2 as '/tmp/libudf2.so' outputtype int;
*/
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t reqId, bool forceUpdate) {
int32_t code = 0;
int32_t ctgdLaunchAsyncCall(SCatalog *pCtg, SRequestConnInfo *pConn, uint64_t reqId, bool forceUpdate) {
int32_t code = 0;
SCatalogReq req = {0};
req.pTableMeta = taosArrayInit(2, sizeof(SName));
req.pDbVgroup = taosArrayInit(2, TSDB_DB_FNAME_LEN);
......@@ -156,16 +158,16 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t re
req.pTableHash = taosArrayInit(2, sizeof(SName));
req.pUdf = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
req.pDbCfg = taosArrayInit(2, TSDB_DB_FNAME_LEN);
req.pIndex = NULL;//taosArrayInit(2, TSDB_INDEX_FNAME_LEN);
req.pIndex = NULL; // taosArrayInit(2, TSDB_INDEX_FNAME_LEN);
req.pUser = taosArrayInit(2, sizeof(SUserAuthInfo));
req.qNodeRequired = true;
req.forceUpdate = forceUpdate;
SName name = {0};
char dbFName[TSDB_DB_FNAME_LEN] = {0};
char funcName[TSDB_FUNC_NAME_LEN] = {0};
SName name = {0};
char dbFName[TSDB_DB_FNAME_LEN] = {0};
char funcName[TSDB_FUNC_NAME_LEN] = {0};
SUserAuthInfo user = {0};
tNameFromString(&name, "1.db1.tb1", T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
taosArrayPush(req.pTableMeta, &name);
taosArrayPush(req.pTableHash, &name);
......@@ -207,7 +209,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t re
int32_t *param = taosMemoryCalloc(1, sizeof(int32_t));
*param = 1;
int64_t jobId = 0;
CTG_ERR_JRET(catalogAsyncGetAllMeta(pCtg, pConn, &req, ctgdUserCallback, param, &jobId));
......@@ -221,7 +223,7 @@ _return:
taosArrayDestroy(req.pDbCfg);
taosArrayDestroy(req.pUser);
CTG_RET(code);
CTG_RET(code);
}
int32_t ctgdEnableDebug(char *option) {
......@@ -250,7 +252,7 @@ int32_t ctgdEnableDebug(char *option) {
}
qError("invalid debug option:%s", option);
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
......@@ -261,7 +263,7 @@ int32_t ctgdGetStatNum(char *option, void *res) {
}
qError("invalid stat option:%s", option);
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
......@@ -287,7 +289,7 @@ int32_t ctgdGetRentNum(SCtgRentMgmt *rent) {
return num;
}
int32_t ctgdGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
int32_t ctgdGetClusterCacheNum(SCatalog *pCtg, int32_t type) {
if (NULL == pCtg || NULL == pCtg->dbCache) {
return 0;
}
......@@ -304,8 +306,8 @@ int32_t ctgdGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
}
SCtgDBCache *dbCache = NULL;
int32_t num = 0;
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
int32_t num = 0;
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) {
dbCache = (SCtgDBCache *)pIter;
switch (type) {
......@@ -325,7 +327,7 @@ int32_t ctgdGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
return num;
}
void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
void ctgdShowTableMeta(SCatalog *pCtg, const char *tbName, STableMeta *p) {
if (!gCTGDebug.metaEnable) {
return;
}
......@@ -333,11 +335,14 @@ void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
STableComInfo *c = &p->tableInfo;
if (TSDB_CHILD_TABLE == p->tableType) {
ctgDebug("table [%s] meta: type:%d, vgId:%d, uid:0x%" PRIx64 ",suid:0x%" PRIx64, tbName, p->tableType, p->vgId, p->uid, p->suid);
ctgDebug("table [%s] meta: type:%d, vgId:%d, uid:0x%" PRIx64 ",suid:0x%" PRIx64, tbName, p->tableType, p->vgId,
p->uid, p->suid);
return;
} else {
ctgDebug("table [%s] meta: type:%d, vgId:%d, uid:0x%" PRIx64 ",suid:0x%" PRIx64 ",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d",
tbName, p->tableType, p->vgId, p->uid, p->suid, p->sversion, p->tversion, c->numOfTags, c->precision, c->numOfColumns, c->rowSize);
ctgDebug("table [%s] meta: type:%d, vgId:%d, uid:0x%" PRIx64 ",suid:0x%" PRIx64
",sv:%d, tv:%d, tagNum:%d, precision:%d, colNum:%d, rowSize:%d",
tbName, p->tableType, p->vgId, p->uid, p->suid, p->sversion, p->tversion, c->numOfTags, c->precision,
c->numOfColumns, c->rowSize);
}
int32_t colNum = c->numOfColumns + c->numOfTags;
......@@ -347,18 +352,18 @@ void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p) {
}
}
void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) {
if (NULL == dbHash || !gCTGDebug.cacheEnable) {
return;
}
int32_t i = 0;
int32_t i = 0;
SCtgDBCache *dbCache = NULL;
void *pIter = taosHashIterate(dbHash, NULL);
void *pIter = taosHashIterate(dbHash, NULL);
while (pIter) {
char *dbFName = NULL;
char *dbFName = NULL;
size_t len = 0;
dbCache = (SCtgDBCache *)pIter;
dbFName = taosHashGetKey(pIter, &len);
......@@ -380,29 +385,29 @@ void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
vgNum = taosHashGetSize(dbCache->vgCache.vgInfo->vgHash);
}
}
ctgDebug("[%d] db [%.*s][0x%"PRIx64"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, prefix:%d, suffix:%d, vgNum:%d",
i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted?"deleted":"", metaNum, stbNum, vgVersion, hashMethod, hashPrefix, hashSuffix, vgNum);
ctgDebug("[%d] db [%.*s][0x%" PRIx64
"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, prefix:%d, suffix:%d, vgNum:%d",
i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted ? "deleted" : "", metaNum, stbNum, vgVersion,
hashMethod, hashPrefix, hashSuffix, vgNum);
pIter = taosHashIterate(dbHash, pIter);
}
}
void ctgdShowClusterCache(SCatalog* pCtg) {
void ctgdShowClusterCache(SCatalog *pCtg) {
if (!gCTGDebug.cacheEnable || NULL == pCtg) {
return;
}
ctgDebug("## cluster 0x%"PRIx64" %p cache Info BEGIN ##", pCtg->clusterId, pCtg);
ctgDebug("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM),
ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM));
ctgDebug("## cluster 0x%" PRIx64 " %p cache Info BEGIN ##", pCtg->clusterId, pCtg);
ctgDebug("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM),
ctgdGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM),
ctgdGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), ctgdGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM));
ctgdShowDBCache(pCtg, pCtg->dbCache);
ctgDebug("## cluster 0x%"PRIx64" %p cache Info END ##", pCtg->clusterId, pCtg);
ctgDebug("## cluster 0x%" PRIx64 " %p cache Info END ##", pCtg->clusterId, pCtg);
}
int32_t ctgdShowCacheInfo(void) {
......@@ -413,19 +418,18 @@ int32_t ctgdShowCacheInfo(void) {
CTG_API_ENTER();
qDebug("# total catalog cluster number %d #", taosHashGetSize(gCtgMgmt.pCluster));
SCatalog *pCtg = NULL;
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
pCtg = *(SCatalog **)pIter;
if (pCtg) {
ctgdShowClusterCache(pCtg);
}
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
}
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
此差异已折叠。
......@@ -16,6 +16,6 @@
#include <gtest/gtest.h>
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -20,16 +20,16 @@
extern "C" {
#endif
#include "tcommon.h"
#include "dataSinkMgt.h"
#include "plannodes.h"
#include "tcommon.h"
struct SDataSink;
struct SDataSinkHandle;
typedef struct SDataSinkManager {
SDataSinkMgtCfg cfg;
TdThreadMutex mutex;
TdThreadMutex mutex;
} SDataSinkManager;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
......@@ -40,17 +40,19 @@ typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size);
typedef struct SDataSinkHandle {
FPutDataBlock fPut;
FEndPut fEndPut;
FGetDataLength fGetLen;
FGetDataBlock fGetData;
FPutDataBlock fPut;
FEndPut fEndPut;
FGetDataLength fGetLen;
FGetDataBlock fGetData;
FDestroyDataSinker fDestroy;
FGetCacheSize fGetCacheSize;
FGetCacheSize fGetCacheSize;
} SDataSinkHandle;
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam);
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam);
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
void* pParam);
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
void* pParam);
#ifdef __cplusplus
}
......
......@@ -23,10 +23,10 @@ extern "C" {
extern int32_t exchangeObjRefPool;
typedef struct {
char* pData;
bool isNull;
int16_t type;
int32_t bytes;
char* pData;
bool isNull;
int16_t type;
int32_t bytes;
} SGroupKeys, SStateKeys;
uint64_t calcGroupId(char* pData, int32_t len);
......
......@@ -24,7 +24,7 @@ extern "C" {
enum {
LINEAR_HASH_STATIS = 0x1,
LINEAR_HASH_DATA = 0x2,
LINEAR_HASH_DATA = 0x2,
};
typedef struct SLHashObj SLHashObj;
......@@ -32,11 +32,11 @@ typedef struct SLHashObj SLHashObj;
SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage);
void* tHashCleanup(SLHashObj* pHashObj);
int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size);
char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen);
int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen);
int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data, size_t size);
char* tHashGet(SLHashObj* pHashObj, const void* key, size_t keyLen);
int32_t tHashRemove(SLHashObj* pHashObj, const void* key, size_t keyLen);
void tHashPrint(const SLHashObj* pHashObj, int32_t type);
void tHashPrint(const SLHashObj* pHashObj, int32_t type);
#ifdef __cplusplus
}
......
......@@ -112,7 +112,7 @@ void tSimpleHashCleanup(SSHashObj *pHashObj);
size_t tSimpleHashGetMemSize(const SSHashObj *pHashObj);
#pragma pack(push, 4)
typedef struct SHNode{
typedef struct SHNode {
struct SHNode *next;
uint32_t keyLen : 20;
uint32_t dataLen : 12;
......
......@@ -20,8 +20,8 @@
extern "C" {
#endif
#include "tcommon.h"
#include "os.h"
#include "tcommon.h"
enum {
SORT_MULTISOURCE_MERGE = 0x1,
......@@ -31,29 +31,29 @@ enum {
typedef struct SMultiMergeSource {
int32_t type;
int32_t rowIndex;
SSDataBlock *pBlock;
SSDataBlock* pBlock;
} SMultiMergeSource;
typedef struct SSortSource {
SMultiMergeSource src;
union{
struct{
SArray* pageIdList;
int32_t pageIndex;
union {
struct {
SArray* pageIdList;
int32_t pageIndex;
};
void *param;
void* param;
};
} SSortSource;
typedef struct SMsortComparParam {
void **pSources;
int32_t numOfSources;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool cmpGroupId;
void** pSources;
int32_t numOfSources;
SArray* orderInfo; // SArray<SBlockOrderInfo>
bool cmpGroupId;
} SMsortComparParam;
typedef struct SSortHandle SSortHandle;
typedef struct SSortHandle SSortHandle;
typedef struct STupleHandle STupleHandle;
typedef SSDataBlock* (*_sort_fetch_block_fn_t)(void* param);
......@@ -64,7 +64,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
* @param type
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr);
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr);
/**
*
......@@ -90,7 +91,8 @@ int32_t tsortClose(SSortHandle* pHandle);
*
* @return
*/
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param);
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
void* param);
/**
*
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "function.h"
#include "os.h"
#include "tname.h"
#include "tdatablock.h"
......@@ -26,8 +26,8 @@
#include "ttypes.h"
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
static void destroyLastrowScanOperator(void* param);
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
static void destroyLastrowScanOperator(void* param);
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
......@@ -40,11 +40,11 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
}
pInfo->readHandle = *readHandle;
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc, &numOfCols,
COL_MATCH_FROM_COL_ID);
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc,
&numOfCols, COL_MATCH_FROM_COL_ID);
code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
......@@ -58,7 +58,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
// partition by tbname
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|CACHESCAN_RETRIEVE_LAST_ROW;
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | CACHESCAN_RETRIEVE_LAST_ROW;
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) {
......@@ -67,23 +67,24 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
} else { // by tags
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|CACHESCAN_RETRIEVE_LAST_ROW;
} else { // by tags
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | CACHESCAN_RETRIEVE_LAST_ROW;
}
if (pScanNode->scan.pScanPseudoCols != NULL) {
SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup;
pPseudoExpr->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs);
pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset);
pPseudoExpr->pCtx =
createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset);
}
pOperator->name = "LastrowScanOperator";
pOperator->name = "LastrowScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet =
......@@ -92,7 +93,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->cost.openCost = 0;
return pOperator;
_error:
_error:
pTaskInfo->code = code;
destroyLastrowScanOperator(pInfo);
taosMemoryFree(pOperator);
......@@ -121,7 +122,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pBufferredRes);
taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
int32_t code =
tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -133,15 +135,15 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) {
SColMatchInfo* pMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i);
int32_t slotId = pMatchInfo->targetSlotId;
int32_t slotId = pMatchInfo->targetSlotId;
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId);
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes);
bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes);
colDataAppend(pDst, 0, p, isNull);
}
......@@ -150,8 +152,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup;
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo));
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
......@@ -180,7 +182,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
......@@ -200,8 +202,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo->pRes->info.groupId = pKeyInfo->groupId;
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
}
......@@ -233,10 +235,10 @@ void destroyLastrowScanOperator(void* param) {
}
int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) {
size_t numOfCols = taosArrayGetSize(pColMatchInfo);
size_t numOfCols = taosArrayGetSize(pColMatchInfo);
*pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
if (*pSlotIds == NULL) {
if (*pSlotIds == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -267,18 +267,19 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
code = blockDataEnsureCapacity(pRes, *nRows + leftNumJoin * rightNumJoin);
if (code != TSDB_CODE_SUCCESS) {
qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo), leftNumJoin, rightNumJoin);
qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo),
leftNumJoin, rightNumJoin);
}
if (code == TSDB_CODE_SUCCESS) {
for (int32_t i = 0; i < leftNumJoin; ++i) {
for (int32_t j = 0; j < rightNumJoin; ++j) {
SRowLocation *leftRow = taosArrayGet(leftRowLocations, i);
SRowLocation *rightRow = taosArrayGet(rightRowLocations, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
for (int32_t i = 0; i < leftNumJoin; ++i) {
for (int32_t j = 0; j < rightNumJoin; ++j) {
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
}
}
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册