提交 ddaae713 编写于 作者: H Haojun Liao

[td-13039] support write in and retrieve from vnode.

上级 5fca3e47
...@@ -106,7 +106,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); ...@@ -106,7 +106,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetSize(const SSDataBlock* pBlock);
size_t blockDataGetRowSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(SSDataBlock* pBlock);
double blockDataGetSerialRowSize(const SSDataBlock* pBlock); double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
...@@ -117,7 +117,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF ...@@ -117,7 +117,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataClearup(SSDataBlock* pDataBlock); void blockDataCleanup(SSDataBlock* pDataBlock);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock); void* blockDataDestroy(SSDataBlock* pBlock);
......
...@@ -141,18 +141,27 @@ typedef struct STscObj { ...@@ -141,18 +141,27 @@ typedef struct STscObj {
SAppInstInfo* pAppInfo; SAppInstInfo* pAppInfo;
} STscObj; } STscObj;
typedef struct SResultColumn {
union {
char* nullbitmap; // bitmap, one bit for each item in the list
int32_t* offset;
};
char* pData;
} SResultColumn;
typedef struct SReqResultInfo { typedef struct SReqResultInfo {
const char* pRspMsg; const char* pRspMsg;
const char* pData; const char* pData;
TAOS_FIELD* fields; TAOS_FIELD* fields;
uint32_t numOfCols; uint32_t numOfCols;
int32_t* length; int32_t* length;
TAOS_ROW row; TAOS_ROW row;
char** pCol; SResultColumn* pCol;
uint32_t numOfRows; uint32_t numOfRows;
uint64_t totalRows; uint64_t totalRows;
uint32_t current; uint32_t current;
bool completed; bool completed;
int32_t payloadLen;
} SReqResultInfo; } SReqResultInfo;
typedef struct SShowReqInfo { typedef struct SShowReqInfo {
...@@ -224,7 +233,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, ...@@ -224,7 +233,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
void* doFetchRow(SRequestObj* pRequest); void* doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
...@@ -483,13 +483,16 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -483,13 +483,16 @@ void* doFetchRow(SRequestObj* pRequest) {
} }
SReqResultInfo* pResInfo = &pRequest->body.resInfo; SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
if (code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pRequest->code = code; return NULL;
}
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
if (pRequest->code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
...@@ -556,10 +559,23 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -556,10 +559,23 @@ void* doFetchRow(SRequestObj* pRequest) {
_return: _return:
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current; SResultColumn* pCol = &pResultInfo->pCol[i];
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
pResultInfo->length[i] = varDataLen(pResultInfo->row[i]); if (pCol->offset[pResultInfo->current] != -1) {
pResultInfo->row[i] = varDataVal(pResultInfo->row[i]); char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
pResultInfo->length[i] = varDataLen(pStart);
pResultInfo->row[i] = varDataVal(pStart);
} else {
pResultInfo->row[i] = NULL;
}
} else {
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
pResultInfo->row[i] = pResultInfo->pCol[i].pData + pResultInfo->fields[i].bytes * pResultInfo->current;
} else {
pResultInfo->row[i] = NULL;
}
} }
} }
...@@ -567,30 +583,52 @@ _return: ...@@ -567,30 +583,52 @@ _return:
return pResultInfo->row; return pResultInfo->row;
} }
static void doPrepareResPtr(SReqResultInfo* pResInfo) { static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
if (pResInfo->row == NULL) { if (pResInfo->row == NULL) {
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES); pResInfo->pCol = calloc(pResInfo->numOfCols, sizeof(SResultColumn));
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
} }
if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
}
} }
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
if (numOfRows == 0) { if (numOfRows == 0) {
return; return TSDB_CODE_SUCCESS;
} }
// todo check for the failure of malloc int32_t code = doPrepareResPtr(pResultInfo);
doPrepareResPtr(pResultInfo); if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t offset = 0; int32_t* colLength = (int32_t*)pResultInfo->pData;
char* pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
colLength[i] = htonl(colLength[i]);
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
pResultInfo->pCol[i].offset = (int32_t*)pStart;
pStart += numOfRows * sizeof(int32_t);
} else {
pResultInfo->pCol[i].nullbitmap = pStart;
pStart += BitmapLen(pResultInfo->numOfRows);
}
pResultInfo->pCol[i].pData = pStart;
pResultInfo->length[i] = pResultInfo->fields[i].bytes; pResultInfo->length[i] = pResultInfo->fields[i].bytes;
pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows); pResultInfo->row[i] = pResultInfo->pCol[i].pData;
pResultInfo->pCol[i] = pResultInfo->row[i];
offset += pResultInfo->fields[i].bytes; pStart += colLength[i];
} }
return TSDB_CODE_SUCCESS;
} }
char* getDbOfConnection(STscObj* pObj) { char* getDbOfConnection(STscObj* pObj) {
...@@ -612,15 +650,17 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { ...@@ -612,15 +650,17 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
pthread_mutex_unlock(&pTscObj->mutex); pthread_mutex_unlock(&pTscObj->mutex);
} }
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
assert(pResultInfo != NULL && pRsp != NULL); assert(pResultInfo != NULL && pRsp != NULL);
pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data; pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows); pResultInfo->numOfRows = htonl(pRsp->numOfRows);
pResultInfo->current = 0; pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1); pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->payloadLen = htonl(pRsp->compLen);
// TODO handle the compressed case
pResultInfo->totalRows += pResultInfo->numOfRows; pResultInfo->totalRows += pResultInfo->numOfRows;
setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
} }
...@@ -52,7 +52,7 @@ TEST(testCase, driverInit_Test) { ...@@ -52,7 +52,7 @@ TEST(testCase, driverInit_Test) {
// taosInitGlobalCfg(); // taosInitGlobalCfg();
// taos_init(); // taos_init();
} }
#if 0
TEST(testCase, connect_Test) { TEST(testCase, connect_Test) {
// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg"); // taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg");
...@@ -648,6 +648,7 @@ TEST(testCase, projection_query_stables) { ...@@ -648,6 +648,7 @@ TEST(testCase, projection_query_stables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
TEST(testCase, agg_query_tables) { TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...@@ -656,18 +657,20 @@ TEST(testCase, agg_query_tables) { ...@@ -656,18 +657,20 @@ TEST(testCase, agg_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)"); pRes = taos_query(pConn, "select * from tu");
if (taos_errno(pRes) != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "select count(*) from t_x_19"); // pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // printf("failed to create table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); // }
ASSERT_TRUE(false); // taos_free_result(pRes);
} //
// pRes = taos_query(pConn, "select count(*) from tu");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
......
...@@ -430,11 +430,11 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 ...@@ -430,11 +430,11 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
/** /**
* *
* +------------------+---------------+--------------------+ * +------------------+---------------------------------------------+
* |the number of rows| column length | column #1 | * |the number of rows| column #1 |
* | (4 bytes) | (4 bytes) |--------------------+ * | (4 bytes) |------------+-----------------------+--------+
* | | | null bitmap| values| * | | null bitmap| column length(4bytes) | values |
* +------------------+---------------+--------------------+ * +------------------+------------+-----------------------+--------+
* @param buf * @param buf
* @param pBlock * @param pBlock
* @return * @return
...@@ -515,17 +515,21 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { ...@@ -515,17 +515,21 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
size_t blockDataGetRowSize(const SSDataBlock* pBlock) { size_t blockDataGetRowSize(SSDataBlock* pBlock) {
ASSERT(pBlock != NULL); ASSERT(pBlock != NULL);
size_t rowSize = 0; if (pBlock->info.rowSize == 0) {
size_t rowSize = 0;
size_t numOfCols = pBlock->info.numOfCols; size_t numOfCols = pBlock->info.numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
rowSize += pColInfo->info.bytes; rowSize += pColInfo->info.bytes;
}
pBlock->info.rowSize = rowSize;
} }
return rowSize; return pBlock->info.rowSize;
} }
/** /**
...@@ -1059,7 +1063,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF ...@@ -1059,7 +1063,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
// destroyTupleIndex(index); // destroyTupleIndex(index);
} }
void blockDataClearup(SSDataBlock* pDataBlock) { void blockDataCleanup(SSDataBlock* pDataBlock) {
pDataBlock->info.rows = 0; pDataBlock->info.rows = 0;
if (pDataBlock->info.hasVarCol) { if (pDataBlock->info.hasVarCol) {
......
...@@ -404,7 +404,12 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, ...@@ -404,7 +404,12 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
colInfo.info = pCond->colList[i]; colInfo.info = pCond->colList[i];
colInfo.pData = calloc(1, EXTRA_BYTES + pReadHandle->outputCapacity * pCond->colList[i].bytes); colInfo.pData = calloc(1, EXTRA_BYTES + pReadHandle->outputCapacity * pCond->colList[i].bytes);
if (colInfo.pData == NULL) {
if (!IS_VAR_DATA_TYPE(colInfo.info.type)) {
colInfo.nullbitmap = calloc(1, BitmapLen(pReadHandle->outputCapacity));
}
if (colInfo.pData == NULL || (colInfo.nullbitmap == NULL && (!IS_VAR_DATA_TYPE(colInfo.info.type)))) {
goto _end; goto _end;
} }
......
...@@ -15,11 +15,12 @@ ...@@ -15,11 +15,12 @@
#include "dataSinkInt.h" #include "dataSinkInt.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
#include "executorimpl.h"
#include "planner.h" #include "planner.h"
#include "tcompression.h" #include "tcompression.h"
#include "tglobal.h" #include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "executorimpl.h" #include "tdatablock.h"
typedef struct SDataDispatchBuf { typedef struct SDataDispatchBuf {
int32_t useSize; int32_t useSize;
...@@ -64,29 +65,47 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc ...@@ -64,29 +65,47 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc
} }
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
int32_t colSize = pColRes->info.bytes * numOfRows; int32_t colSize = colDataGetLength(pColRes, numOfRows);
return (*(tDataTypes[pColRes->info.type].compFunc))( return (*(tDataTypes[pColRes->info.type].compFunc))(
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
} }
static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t *compLen) { static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t * dataLen) {
int32_t numOfCols = LIST_LENGTH(pSchema->pSlots); int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
int32_t *compSizes = (int32_t*)data; int32_t * colSizes = (int32_t*)data;
if (compressed) {
data += numOfCols * sizeof(int32_t);
}
data += numOfCols * sizeof(int32_t);
*dataLen = (numOfCols * sizeof(int32_t));
int32_t numOfRows = pInput->pData->info.rows;
for (int32_t col = 0; col < numOfCols; ++col) { for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col); SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
// copy the null bitmap
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
size_t metaSize = numOfRows * sizeof(int32_t);
memcpy(data, pColRes->varmeta.offset, metaSize);
data += metaSize;
(*dataLen) += metaSize;
} else {
int32_t len = BitmapLen(numOfRows);
memcpy(data, pColRes->nullbitmap, len);
data += len;
(*dataLen) += len;
}
if (compressed) { if (compressed) {
compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed); colSizes[col] = compressColData(pColRes, numOfRows, data, compressed);
data += compSizes[col]; data += colSizes[col];
*compLen += compSizes[col]; (*dataLen) += colSizes[col];
compSizes[col] = htonl(compSizes[col]);
} else { } else {
memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows); colSizes[col] = colDataGetLength(pColRes, numOfRows);
data += pColRes->info.bytes * pInput->pData->info.rows; (*dataLen) += colSizes[col];
memmove(data, pColRes->pData, colSizes[col]);
data += colSizes[col];
} }
colSizes[col] = htonl(colSizes[col]);
} }
} }
...@@ -95,16 +114,14 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema ...@@ -95,16 +114,14 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) { static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema); pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema);
pEntry->numOfRows = pInput->pData->info.rows; pEntry->numOfRows = pInput->pData->info.rows;
pEntry->dataLen = 0; pEntry->dataLen = 0;
pBuf->useSize = sizeof(SRetrieveTableRsp); pBuf->useSize = sizeof(SRetrieveTableRsp);
copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen); copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
if (0 == pEntry->compressed) {
pEntry->dataLen = pHandle->pSchema->resultRowSize * pInput->pData->info.rows; pEntry->dataLen = pEntry->dataLen;
} pBuf->useSize += pEntry->dataLen;
pBuf->useSize += pEntry->dataLen;
// todo completed
} }
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) { static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
...@@ -169,6 +186,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE ...@@ -169,6 +186,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
*pLen = 0; *pLen = 0;
return; return;
} }
SDataDispatchBuf* pBuf = NULL; SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
......
...@@ -3986,7 +3986,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI ...@@ -3986,7 +3986,7 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI
static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) { static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
blockDataClearup(pBlock); blockDataCleanup(pBlock);
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
return; return;
} }
...@@ -5794,7 +5794,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan ...@@ -5794,7 +5794,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan
} }
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) { static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) {
blockDataClearup(pDataBlock); blockDataCleanup(pDataBlock);
while(1) { while(1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle); STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
...@@ -5950,7 +5950,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { ...@@ -5950,7 +5950,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
while(1) { while(1) {
blockDataClearup(pDataBlock); blockDataCleanup(pDataBlock);
while (1) { while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle); STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
...@@ -6366,7 +6366,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) ...@@ -6366,7 +6366,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
SOptrBasicInfo *pInfo = &pProjectInfo->binfo; SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
blockDataClearup(pRes); blockDataCleanup(pRes);
if (pProjectInfo->existDataBlock) { // TODO refactor if (pProjectInfo->existDataBlock) { // TODO refactor
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
......
...@@ -182,7 +182,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { ...@@ -182,7 +182,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
start = stop + 1; start = stop + 1;
} }
blockDataClearup(pDataBlock); blockDataCleanup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock); SSDataBlock* pBlock = createOneDataBlock(pDataBlock);
int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
...@@ -312,7 +312,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa ...@@ -312,7 +312,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
} }
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) { static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
blockDataClearup(pHandle->pDataBlock); blockDataCleanup(pHandle->pDataBlock);
while(1) { while(1) {
if (cmpParam->numOfSources == pHandle->numOfCompletedSources) { if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
...@@ -478,7 +478,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -478,7 +478,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
blockDataClearup(pDataBlock); blockDataCleanup(pDataBlock);
} }
tMergeTreeDestroy(pHandle->pMergeTree); tMergeTreeDestroy(pHandle->pMergeTree);
......
...@@ -453,7 +453,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int ...@@ -453,7 +453,7 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
return func(&tmpVal, pSchema->bytes, param); return func(&tmpVal, pSchema->bytes, param);
} }
return func(getNullValue(pSchema->type), 0, param); return func(NULL, 0, param);
} }
switch (pSchema->type) { switch (pSchema->type) {
...@@ -628,7 +628,11 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p ...@@ -628,7 +628,11 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p
varDataSetLen(rowEnd, output); varDataSetLen(rowEnd, output);
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
} else { } else {
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, true, pa->toffset, pa->colIdx); if (value == NULL) { // it is a null data
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, true, pa->toffset, pa->colIdx);
} else {
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, true, pa->toffset, pa->colIdx);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -603,28 +603,21 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void ...@@ -603,28 +603,21 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
} }
QW_TASK_DLOG("no data in sink and query end, phase:%d", ctx->phase); QW_TASK_DLOG("no data in sink and query end, phase:%d", ctx->phase);
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp; *rspMsg = rsp;
*dataLen = 0; *dataLen = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pOutput->bufStatus = DS_BUF_EMPTY; pOutput->bufStatus = DS_BUF_EMPTY;
QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd); QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// Got data from sink // Got data from sink
*dataLen = len; *dataLen = len;
QW_TASK_DLOG("task got data in sink, dataLength:%d", len); QW_TASK_DLOG("task got data in sink, dataLength:%d", len);
QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册