未验证 提交 a2260fd0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9687 from taosdata/feature/3.0_wxy

TD-12760 simple memory management code.
...@@ -91,9 +91,10 @@ enum { ...@@ -91,9 +91,10 @@ enum {
META_TYPE_NON_TABLE = 1, META_TYPE_NON_TABLE = 1,
META_TYPE_CTABLE, META_TYPE_CTABLE,
META_TYPE_TABLE, META_TYPE_TABLE,
META_TYPE_BOTH_TABLE, META_TYPE_BOTH_TABLE
}; };
typedef struct STableMetaOutput { typedef struct STableMetaOutput {
int32_t metaType; int32_t metaType;
char ctbFname[TSDB_TABLE_FNAME_LEN]; char ctbFname[TSDB_TABLE_FNAME_LEN];
......
...@@ -24,7 +24,7 @@ extern "C" { ...@@ -24,7 +24,7 @@ extern "C" {
#include "catalog.h" #include "catalog.h"
typedef struct SSchedulerCfg { typedef struct SSchedulerCfg {
int32_t maxJobNum; uint32_t maxJobNum;
} SSchedulerCfg; } SSchedulerCfg;
typedef struct SQueryProfileSummary { typedef struct SQueryProfileSummary {
......
...@@ -51,6 +51,7 @@ void taosFreeQitem(void *pItem); ...@@ -51,6 +51,7 @@ void taosFreeQitem(void *pItem);
int32_t taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
bool taosQueueEmpty(STaosQueue *queue); bool taosQueueEmpty(STaosQueue *queue);
int32_t taosQueueSize(STaosQueue *queue);
STaosQall *taosAllocateQall(); STaosQall *taosAllocateQall();
void taosFreeQall(STaosQall *qall); void taosFreeQall(STaosQall *qall);
......
...@@ -1226,6 +1226,16 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S ...@@ -1226,6 +1226,16 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
tNameGetFullDbName(pTableName, db); tNameGetFullDbName(pTableName, db);
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup)); CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
// REMOEV THIS ....
if (0 == tbMeta->vgId) {
SVgroupInfo vgroup = {0};
catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroup);
tbMeta->vgId = vgroup.vgId;
}
// REMOVE THIS ....
if (tbMeta->tableType == TSDB_SUPER_TABLE) { if (tbMeta->tableType == TSDB_SUPER_TABLE) {
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList)); CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
} else { } else {
......
...@@ -26,17 +26,26 @@ extern "C" { ...@@ -26,17 +26,26 @@ extern "C" {
struct SDataSink; struct SDataSink;
struct SDataSinkHandle; struct SDataSinkHandle;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SDataResult* pRes); typedef struct SDataSinkManager {
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, char* pData, int32_t* pLen); SDataSinkMgtCfg cfg;
pthread_mutex_t mutex;
} SDataSinkManager;
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle);
typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef struct SDataSinkHandle { typedef struct SDataSinkHandle {
FPutDataBlock fPut; FPutDataBlock fPut;
FGetDataBlock fGet; FEndPut fEndPut;
FGetDataLength fGetLen;
FGetDataBlock fGetData;
FDestroyDataSinker fDestroy; FDestroyDataSinker fDestroy;
} SDataSinkHandle; } SDataSinkHandle;
int32_t createDataDispatcher(const struct SDataSink* pDataSink, DataSinkHandle* pHandle); int32_t createDataDispatcher(SDataSinkManager* pManager, const struct SDataSink* pDataSink, DataSinkHandle* pHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -26,6 +26,8 @@ extern "C" { ...@@ -26,6 +26,8 @@ extern "C" {
#define DS_CAPACITY_ENOUGH 1 #define DS_CAPACITY_ENOUGH 1
#define DS_CAPACITY_FULL 2 #define DS_CAPACITY_FULL 2
#define DS_NEED_SCHEDULE 3 #define DS_NEED_SCHEDULE 3
#define DS_END 4
#define DS_IN_PROCESS 5
struct SDataSink; struct SDataSink;
struct SSDataBlock; struct SSDataBlock;
...@@ -39,11 +41,16 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); ...@@ -39,11 +41,16 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg);
typedef void* DataSinkHandle; typedef void* DataSinkHandle;
typedef struct SDataResult { typedef struct SInputData {
SQueryCostInfo profile;
const SSDataBlock* pData; const SSDataBlock* pData;
SHashObj* pTableRetrieveTsMap; SHashObj* pTableRetrieveTsMap;
} SDataResult; } SInputData;
typedef struct SOutPutData {
int32_t numOfRows;
int8_t compressed;
char* pData;
} SOutPutData;
/** /**
* Create a subplan's datasinker handle for all later operations. * Create a subplan's datasinker handle for all later operations.
...@@ -59,30 +66,25 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH ...@@ -59,30 +66,25 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
* @param pRes * @param pRes
* @return error code * @return error code
*/ */
int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes); int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus);
void dsEndPut(DataSinkHandle handle);
/** /**
* Get the length of the data returned by the next call to dsGetDataBlock. * Get the length of the data returned by the next call to dsGetDataBlock.
* @param handle * @param handle
* @return data length * @return data length
*/ */
int32_t dsGetDataLength(DataSinkHandle handle); int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus);
/** /**
* Get data, the caller needs to allocate data memory. * Get data, the caller needs to allocate data memory.
* @param handle * @param handle
* @param pData output * @param pOutput output
* @param pLen output * @param pStatus output
* @return error code * @return error code
*/ */
int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen); int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus);
/**
* Get the datasinker state, after each dsPutDataBlock and dsGetDataBlock call.
* @param handle
* @return datasinker status
*/
int32_t dsGetStatus(DataSinkHandle handle);
/** /**
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue. * After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
......
...@@ -20,21 +20,29 @@ ...@@ -20,21 +20,29 @@
#include "tglobal.h" #include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#define GET_BUF_DATA(buf) (buf)->pData + (buf)->pos #define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp))
#define GET_BUF_REMAIN(buf) (buf)->remain
typedef struct SBuf { typedef struct SDataDispatchBuf {
int32_t size; int32_t useSize;
int32_t pos; int32_t allocSize;
int32_t remain;
char* pData; char* pData;
} SBuf; } SDataDispatchBuf;
typedef struct SDataCacheEntry {
int32_t dataLen;
int32_t numOfRows;
int8_t compressed;
char data[];
} SDataCacheEntry;
typedef struct SDataDispatchHandle { typedef struct SDataDispatchHandle {
SDataSinkHandle sink; SDataSinkHandle sink;
SDataSinkManager* pManager;
SDataBlockSchema schema; SDataBlockSchema schema;
STaosQueue* pDataBlocks; STaosQueue* pDataBlocks;
SBuf buf; SDataDispatchBuf nextOutput;
int32_t status;
pthread_mutex_t mutex;
} SDataDispatchHandle; } SDataDispatchHandle;
static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) { static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSchema) {
...@@ -53,87 +61,156 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSche ...@@ -53,87 +61,156 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockSchema* pSche
return false; return false;
} }
static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) { static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
int32_t colSize = pColRes->info.bytes * numOfRows; int32_t colSize = pColRes->info.bytes * numOfRows;
return (*(tDataTypes[pColRes->info.type].compFunc))( return (*(tDataTypes[pColRes->info.type].compFunc))(
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0); pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
} }
static void doCopyQueryResultToMsg(const SDataResult* pRes, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) { static void copyData(const SInputData* pInput, const SDataBlockSchema* pSchema, char* data, int8_t compressed, int32_t *compLen) {
int32_t *compSizes = (int32_t*)data; int32_t *compSizes = (int32_t*)data;
if (compressed) { if (compressed) {
data += pSchema->numOfCols * sizeof(int32_t); data += pSchema->numOfCols * sizeof(int32_t);
} }
for (int32_t col = 0; col < pSchema->numOfCols; ++col) { for (int32_t col = 0; col < pSchema->numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pData->pDataBlock, col); SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
if (compressed) { if (compressed) {
compSizes[col] = compressQueryColData(pColRes, pRes->pData->info.rows, data, compressed); compSizes[col] = compressColData(pColRes, pInput->pData->info.rows, data, compressed);
data += compSizes[col]; data += compSizes[col];
*compLen += compSizes[col]; *compLen += compSizes[col];
compSizes[col] = htonl(compSizes[col]); compSizes[col] = htonl(compSizes[col]);
} else { } else {
memmove(data, pColRes->pData, pColRes->info.bytes * pRes->pData->info.rows); memmove(data, pColRes->pData, pColRes->info.bytes * pInput->pData->info.rows);
data += pColRes->info.bytes * pRes->pData->info.rows; data += pColRes->info.bytes * pInput->pData->info.rows;
} }
} }
int32_t numOfTables = (int32_t) taosHashGetSize(pRes->pTableRetrieveTsMap); int32_t numOfTables = (int32_t) taosHashGetSize(pInput->pTableRetrieveTsMap);
*(int32_t*)data = htonl(numOfTables); *(int32_t*)data = htonl(numOfTables);
data += sizeof(int32_t); data += sizeof(int32_t);
STableIdInfo* item = taosHashIterate(pRes->pTableRetrieveTsMap, NULL); STableIdInfo* item = taosHashIterate(pInput->pTableRetrieveTsMap, NULL);
while (item) { while (item) {
STableIdInfo* pDst = (STableIdInfo*)data; STableIdInfo* pDst = (STableIdInfo*)data;
pDst->uid = htobe64(item->uid); pDst->uid = htobe64(item->uid);
pDst->key = htobe64(item->key); pDst->key = htobe64(item->key);
data += sizeof(STableIdInfo); data += sizeof(STableIdInfo);
item = taosHashIterate(pRes->pTableRetrieveTsMap, item); item = taosHashIterate(pInput->pTableRetrieveTsMap, item);
} }
} }
static void toRetrieveResult(SDataDispatchHandle* pHandle, const SDataResult* pRes, char* pData, int32_t* pContLen) { // data format with compress: SDataCacheEntry | cols_data_offset | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pData; // data format: SDataCacheEntry | col1_data col2_data ... | numOfTables | STableIdInfo STableIdInfo ...
pRsp->useconds = htobe64(pRes->profile.elapsedTime); static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
pRsp->precision = htons(pHandle->schema.precision); SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pRsp->compressed = (int8_t)needCompress(pRes->pData, &(pHandle->schema)); pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema));
pRsp->numOfRows = htonl(pRes->pData->info.rows); pEntry->numOfRows = pInput->pData->info.rows;
*pContLen = sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(pRes->pTableRetrieveTsMap) + sizeof(SRetrieveTableRsp);
doCopyQueryResultToMsg(pRes, &pHandle->schema, pRsp->data, pRsp->compressed, &pRsp->compLen);
*pContLen += (pRsp->compressed ? pRsp->compLen : pHandle->schema.resultRowSize * pRes->pData->info.rows);
pRsp->compLen = htonl(pRsp->compLen); pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap);
copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
pBuf->useSize += (pEntry->compressed ? pEntry->dataLen : pHandle->schema.resultRowSize * pInput->pData->info.rows);
// todo completed // todo completed
} }
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SDataResult* pRes) { static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
if (taosQueueSize(pDispatcher->pDataBlocks) >= pDispatcher->pManager->cfg.maxDataBlockNumPerQuery) {
return false;
}
pBuf->allocSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap) + pDispatcher->schema.resultRowSize * pInput->pData->info.rows;
pBuf->pData = malloc(pBuf->allocSize);
return NULL != pBuf->pData;
}
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
pthread_mutex_lock(&pDispatcher->mutex);
int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL;
pDispatcher->status = status;
pthread_mutex_unlock(&pDispatcher->mutex);
return status;
}
static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
pthread_mutex_lock(&pDispatcher->mutex);
int32_t status = pDispatcher->status;
pthread_mutex_unlock(&pDispatcher->mutex);
return status;
}
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
int32_t useSize = 0; SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
toRetrieveResult(pDispatcher, pRes, GET_BUF_DATA(&pDispatcher->buf), &useSize); if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
toDataCacheEntry(pDispatcher, pInput, pBuf);
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
*pStatus = updateStatus(pDispatcher);
return TSDB_CODE_SUCCESS;
} }
static int32_t getDataBlock(SDataSinkHandle* pHandle, char* pData, int32_t* pLen) { static void endPut(struct SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
pthread_mutex_lock(&pDispatcher->mutex);
pDispatcher->status = DS_END;
pthread_mutex_unlock(&pDispatcher->mutex);
}
static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
*pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS;
return 0;
}
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf);
return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
} }
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
pOutput->compressed = pEntry->compressed;
tfree(pDispatcher->nextOutput.pData); // todo persistent
*pStatus = updateStatus(pDispatcher);
return TSDB_CODE_SUCCESS;
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
tfree(pDispatcher->nextOutput.pData);
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
tfree(pBuf->pData);
taosFreeQitem(pBuf);
}
taosCloseQueue(pDispatcher->pDataBlocks);
pthread_mutex_destroy(&pDispatcher->mutex);
} }
int32_t createDataDispatcher(const SDataSink* pDataSink, DataSinkHandle* pHandle) { int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataSink, DataSinkHandle* pHandle) {
SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle)); SDataDispatchHandle* dispatcher = calloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) { if (NULL == dispatcher) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
dispatcher->sink.fPut = putDataBlock; dispatcher->sink.fPut = putDataBlock;
dispatcher->sink.fGet = getDataBlock; dispatcher->sink.fGetLen = getDataLength;
dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->pManager = pManager;
dispatcher->schema = pDataSink->schema;
dispatcher->status = DS_CAPACITY_ENOUGH;
dispatcher->pDataBlocks = taosOpenQueue(); dispatcher->pDataBlocks = taosOpenQueue();
pthread_mutex_init(&dispatcher->mutex, NULL);
if (NULL == dispatcher->pDataBlocks) { if (NULL == dispatcher->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
*pHandle = dispatcher; *pHandle = dispatcher;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -17,33 +17,38 @@ ...@@ -17,33 +17,38 @@
#include "dataSinkInt.h" #include "dataSinkInt.h"
#include "planner.h" #include "planner.h"
static SDataSinkManager gDataSinkManager = {0};
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) { int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
// todo gDataSinkManager.cfg = *cfg;
pthread_mutex_init(&gDataSinkManager.mutex, NULL);
} }
int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) { int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pHandle) {
if (DSINK_Dispatch == pDataSink->info.type) { if (DSINK_Dispatch == pDataSink->info.type) {
return createDataDispatcher(pDataSink, pHandle); return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
} }
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
int32_t dsPutDataBlock(DataSinkHandle handle, const SDataResult* pRes) { int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fPut(pHandleImpl, pRes); return pHandleImpl->fPut(pHandleImpl, pInput, pStatus);
} }
int32_t dsGetDataLength(DataSinkHandle handle) { void dsEndPut(DataSinkHandle handle) {
// todo SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fEndPut(pHandleImpl);
} }
int32_t dsGetDataBlock(DataSinkHandle handle, char* pData, int32_t* pLen) { int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGet(pHandleImpl, pData, pLen); return pHandleImpl->fGetLen(pHandleImpl, pStatus);
} }
int32_t dsGetStatus(DataSinkHandle handle) { int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) {
// todo SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus);
} }
void dsScheduleProcess(void* ahandle, void* pItem) { void dsScheduleProcess(void* ahandle, void* pItem) {
......
...@@ -3628,6 +3628,33 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf) ...@@ -3628,6 +3628,33 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t setTableVgroupList(SParseBasicCtx *pCtx, SName* name, SVgroupsInfo **pVgList) {
SArray* vgroupList = NULL;
int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t vgroupNum = taosArrayGetSize(vgroupList);
SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupNum);
vgList->numOfVgroups = vgroupNum;
for (int32_t i = 0; i < vgroupNum; ++i) {
SVgroupInfo *vg = taosArrayGet(vgroupList, i);
vgList->vgroups[i].vgId = vg->vgId;
vgList->vgroups[i].numOfEps = vg->numOfEps;
memcpy(vgList->vgroups[i].epAddr, vg->epAddr, sizeof(vgList->vgroups[i].epAddr));
}
*pVgList = vgList;
taosArrayDestroy(vgroupList);
return TSDB_CODE_SUCCESS;
}
int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen) { int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen) {
assert(pCtx != NULL && pInfo != NULL); assert(pCtx != NULL && pInfo != NULL);
int32_t code = 0; int32_t code = 0;
...@@ -3916,7 +3943,7 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt ...@@ -3916,7 +3943,7 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
data.pTableMeta = taosArrayInit(1, POINTER_BYTES); data.pTableMeta = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(data.pTableMeta, &pmt); taosArrayPush(data.pTableMeta, &pmt);
...@@ -3926,6 +3953,12 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt ...@@ -3926,6 +3953,12 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
pQueryInfo->pTableMetaInfo[0]->name = *name; pQueryInfo->pTableMetaInfo[0]->name = *name;
pQueryInfo->numOfTables = 1; pQueryInfo->numOfTables = 1;
code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(data.pTableMeta);
return code;
}
// evaluate the sqlnode // evaluate the sqlnode
STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0); STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0);
assert(pTableMeta != NULL); assert(pTableMeta != NULL);
......
...@@ -237,6 +237,9 @@ void qParserCleanupMetaRequestInfo(SCatalogReq* pMetaReq) { ...@@ -237,6 +237,9 @@ void qParserCleanupMetaRequestInfo(SCatalogReq* pMetaReq) {
} }
void qDestroyQuery(SQueryNode* pQueryNode) { void qDestroyQuery(SQueryNode* pQueryNode) {
if (NULL == pQueryNode) {
return;
}
if (nodeType(pQueryNode) == TSDB_SQL_INSERT || nodeType(pQueryNode) == TSDB_SQL_CREATE_TABLE) { if (nodeType(pQueryNode) == TSDB_SQL_INSERT || nodeType(pQueryNode) == TSDB_SQL_CREATE_TABLE) {
SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode; SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode;
taosArrayDestroy(pModifInfo->pDataBlocks); taosArrayDestroy(pModifInfo->pDataBlocks);
......
...@@ -160,9 +160,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI ...@@ -160,9 +160,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI
return (SPhyNode*)node; return (SPhyNode*)node;
} }
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
}
static bool isSystemTable(SQueryTableInfo* pTable) { static bool isSystemTable(SQueryTableInfo* pTable) {
// todo // todo
...@@ -259,12 +256,20 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { ...@@ -259,12 +256,20 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType); return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
} }
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
}
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo; SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
if (needMultiNodeScan(pTable)) { if (needMultiNodeScan(pTable)) {
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable)); return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
} }
return createSingleTableScanNode(pPlanNode, pTable); return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
} }
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
...@@ -322,12 +327,12 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { ...@@ -322,12 +327,12 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_MODIFY == pRoot->info.type) { if (QNODE_MODIFY == pRoot->info.type) {
splitModificationOpSubPlan(pCxt, pRoot); splitModificationOpSubPlan(pCxt, pRoot);
} else { } else {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
++(pCxt->nextId.templateId); ++(pCxt->nextId.templateId);
subplan->msgType = TDMT_VND_QUERY; subplan->msgType = TDMT_VND_QUERY;
subplan->pNode = createPhyNode(pCxt, pRoot); subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot); subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
} }
// todo deal subquery // todo deal subquery
} }
......
...@@ -398,7 +398,7 @@ static bool exprNodeFromJson(const cJSON* json, void* obj) { ...@@ -398,7 +398,7 @@ static bool exprNodeFromJson(const cJSON* json, void* obj) {
case TEXPR_FUNCTION_NODE: case TEXPR_FUNCTION_NODE:
return fromObject(json, jkExprNodeFunction, functionFromJson, exprInfo, false); return fromObject(json, jkExprNodeFunction, functionFromJson, exprInfo, false);
case TEXPR_COL_NODE: case TEXPR_COL_NODE:
return fromObject(json, jkExprNodeColumn, schemaFromJson, exprInfo->pSchema, false); return fromObjectWithAlloc(json, jkExprNodeColumn, schemaFromJson, (void**)&exprInfo->pSchema, sizeof(SSchema), false);
case TEXPR_VALUE_NODE: case TEXPR_VALUE_NODE:
return fromObject(json, jkExprNodeValue, variantFromJson, exprInfo->pVal, false); return fromObject(json, jkExprNodeValue, variantFromJson, exprInfo->pVal, false);
default: default:
......
...@@ -37,10 +37,10 @@ enum { ...@@ -37,10 +37,10 @@ enum {
}; };
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId; uint64_t taskId; // sequential taksId
uint64_t sId; uint64_t sId; // schedulerId
SSchedulerCfg cfg; SSchedulerCfg cfg;
SHashObj *jobs; // key: queryId, value: SQueryJob* SHashObj *jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt; } SSchedulerMgmt;
typedef struct SSchCallbackParam { typedef struct SSchCallbackParam {
...@@ -83,52 +83,61 @@ typedef struct SSchJobAttr { ...@@ -83,52 +83,61 @@ typedef struct SSchJobAttr {
typedef struct SSchJob { typedef struct SSchJob {
uint64_t queryId; uint64_t queryId;
int32_t levelNum;
int32_t levelIdx;
int8_t status;
SSchJobAttr attr; SSchJobAttr attr;
SEpSet dataSrcEps; int32_t levelNum;
SEpAddr resEp;
void *transport; void *transport;
SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr
SArray *levels; // Element is SQueryLevel, starting from 0. SArray<SSchLevel>
SArray *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
int32_t levelIdx;
SEpSet dataSrcEps;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
int8_t status;
SQueryNodeAddr resNode;
tsem_t rspSem; tsem_t rspSem;
int32_t userFetch; int32_t userFetch;
int32_t remoteFetch; int32_t remoteFetch;
SSchTask *fetchTask; SSchTask *fetchTask;
int32_t errCode; int32_t errCode;
void *res; void *res;
int32_t resNumOfRows; int32_t resNumOfRows;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
SArray *levels; // Element is SQueryLevel, starting from 0. SArray<SSchLevel>
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray<void*>
SQueryProfileSummary summary; SQueryProfileSummary summary;
} SSchJob; } SSchJob;
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE #define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) #define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
extern int32_t schLaunchTask(SSchJob *job, SSchTask *task); static int32_t schLaunchTask(SSchJob *job, SSchTask *task);
extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType); static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -112,6 +112,13 @@ bool taosQueueEmpty(STaosQueue *queue) { ...@@ -112,6 +112,13 @@ bool taosQueueEmpty(STaosQueue *queue) {
return empty; return empty;
} }
int32_t taosQueueSize(STaosQueue *queue) {
pthread_mutex_lock(&queue->mutex);
int32_t numOfItems = queue->numOfItems;
pthread_mutex_unlock(&queue->mutex);
return numOfItems;
}
void *taosAllocateQitem(int32_t size) { void *taosAllocateQitem(int32_t size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册