未验证 提交 5618e7e6 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #9449 from taosdata/feature/qnode

Feature/qnode
...@@ -1018,7 +1018,7 @@ typedef struct { ...@@ -1018,7 +1018,7 @@ typedef struct {
} SUpdateTagValRsp; } SUpdateTagValRsp;
typedef struct SSubQueryMsg { typedef struct SSubQueryMsg {
uint64_t schedulerId; uint64_t sId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
uint32_t contentLen; uint32_t contentLen;
...@@ -1026,7 +1026,7 @@ typedef struct SSubQueryMsg { ...@@ -1026,7 +1026,7 @@ typedef struct SSubQueryMsg {
} SSubQueryMsg; } SSubQueryMsg;
typedef struct SResReadyMsg { typedef struct SResReadyMsg {
uint64_t schedulerId; uint64_t sId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} SResReadyMsg; } SResReadyMsg;
...@@ -1036,13 +1036,13 @@ typedef struct SResReadyRsp { ...@@ -1036,13 +1036,13 @@ typedef struct SResReadyRsp {
} SResReadyRsp; } SResReadyRsp;
typedef struct SResFetchMsg { typedef struct SResFetchMsg {
uint64_t schedulerId; uint64_t sId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} SResFetchMsg; } SResFetchMsg;
typedef struct SSchTasksStatusMsg { typedef struct SSchTasksStatusMsg {
uint64_t schedulerId; uint64_t sId;
} SSchTasksStatusMsg; } SSchTasksStatusMsg;
typedef struct STaskStatus { typedef struct STaskStatus {
...@@ -1057,7 +1057,7 @@ typedef struct SSchedulerStatusRsp { ...@@ -1057,7 +1057,7 @@ typedef struct SSchedulerStatusRsp {
} SSchedulerStatusRsp; } SSchedulerStatusRsp;
typedef struct STaskCancelMsg { typedef struct STaskCancelMsg {
uint64_t schedulerId; uint64_t sId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} STaskCancelMsg; } STaskCancelMsg;
...@@ -1067,7 +1067,7 @@ typedef struct STaskCancelRsp { ...@@ -1067,7 +1067,7 @@ typedef struct STaskCancelRsp {
} STaskCancelRsp; } STaskCancelRsp;
typedef struct STaskDropMsg { typedef struct STaskDropMsg {
uint64_t schedulerId; uint64_t sId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} STaskDropMsg; } STaskDropMsg;
......
...@@ -227,7 +227,7 @@ void taos_init_imp(void) { ...@@ -227,7 +227,7 @@ void taos_init_imp(void) {
rpcInit(); rpcInit();
SCatalogCfg cfg = {.enableVgroupCache = true, .maxDBCacheNum = 100, .maxTblCacheNum = 100}; SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
catalogInit(&cfg); catalogInit(&cfg);
tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp); tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp);
......
...@@ -73,7 +73,7 @@ struct SVnode { ...@@ -73,7 +73,7 @@ struct SVnode {
SVnodeSync* pSync; SVnodeSync* pSync;
SVnodeFS* pFs; SVnodeFS* pFs;
tsem_t canCommit; tsem_t canCommit;
void* pQuery; SQHandle* pQuery;
}; };
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
......
...@@ -22,6 +22,9 @@ extern "C" { ...@@ -22,6 +22,9 @@ extern "C" {
#include "vnodeInt.h" #include "vnodeInt.h"
#include "qworker.h" #include "qworker.h"
typedef struct SQWorkerMgmt SQHandle;
int vnodeQueryOpen(SVnode *pVnode); int vnodeQueryOpen(SVnode *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -22,13 +22,27 @@ int vnodeQueryOpen(SVnode *pVnode) { ...@@ -22,13 +22,27 @@ int vnodeQueryOpen(SVnode *pVnode) {
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("query message is processed"); vInfo("query message is processed");
qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg);
return 0;
} }
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("fetch message is processed"); vInfo("fetch message is processed");
qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); switch (pMsg->msgType) {
case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_RES_READY:
return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_TASKS_STATUS:
return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_CANCEL_TASK:
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
break;
}
return 0; return 0;
} }
......
...@@ -46,7 +46,6 @@ typedef struct STableMetaCache { ...@@ -46,7 +46,6 @@ typedef struct STableMetaCache {
} STableMetaCache; } STableMetaCache;
typedef struct SCatalog { typedef struct SCatalog {
SVgroupListCache vgroupCache;
SDBVgroupCache dbCache; SDBVgroupCache dbCache;
STableMetaCache tableCache; STableMetaCache tableCache;
} SCatalog; } SCatalog;
...@@ -67,6 +66,7 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); ...@@ -67,6 +66,7 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define CTG_CACHE_ENABLED() (ctgMgmt.cfg.maxDBCacheNum > 0 || ctgMgmt.cfg.maxTblCacheNum > 0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
......
...@@ -146,8 +146,44 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) { ...@@ -146,8 +146,44 @@ void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
} }
} }
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == output) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
char tbFullName[TSDB_TABLE_FNAME_LEN];
snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);
SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
CTG_ERR_RET(queryBuildMsg[TDMT_MND_STB_META](&bInput, &msg, 0, &msgLen));
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STB_META,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
ctgError("error rsp for table meta, code:%x", rpcRsp.code);
CTG_ERR_RET(rpcRsp.code);
}
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) { CTG_ERR_RET(queryProcessMsgRsp[TDMT_MND_STB_META](output, rpcRsp.pCont, rpcRsp.contLen));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
...@@ -307,7 +343,9 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out ...@@ -307,7 +343,9 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
}
if (NULL == pCatalog->tableCache.stableCache) {
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.stableCache) { if (NULL == pCatalog->tableCache.stableCache) {
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
...@@ -318,55 +356,51 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out ...@@ -318,55 +356,51 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
if (output->metaNum == 2) { if (output->metaNum == 2) {
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
ctgError("push ctable[%s] to table cache failed", output->ctbFname); ctgError("push ctable[%s] to table cache failed", output->ctbFname);
goto error_exit; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
goto error_exit; CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
} }
int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
ctgError("push table[%s] to table cache failed", output->tbFname); ctgError("push table[%s] to table cache failed", output->tbFname);
goto error_exit; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) { if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) {
ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
goto error_exit; CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
error_exit:
if (pCatalog->vgroupCache.cache) {
taosHashCleanup(pCatalog->vgroupCache.cache);
pCatalog->vgroupCache.cache = NULL;
}
pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
int32_t catalogInit(SCatalogCfg *cfg) { int32_t catalogInit(SCatalogCfg *cfg) {
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (ctgMgmt.pCluster) {
if (NULL == ctgMgmt.pCluster) { ctgError("catalog already init");
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
if (cfg) { if (cfg) {
memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg)); memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
} else { } else {
ctgMgmt.cfg.enableVgroupCache = true;
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER; ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
} }
if (CTG_CACHE_ENABLED()) {
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == ctgMgmt.pCluster) {
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -381,21 +415,19 @@ int32_t catalogGetHandle(const char* clusterId , struct SCatalog** catalogHandle ...@@ -381,21 +415,19 @@ int32_t catalogGetHandle(const char* clusterId , struct SCatalog** catalogHandle
} }
size_t clen = strlen(clusterId); size_t clen = strlen(clusterId);
SCatalog *clusterCtg = (SCatalog *)taosHashGet(ctgMgmt.pCluster, clusterId, clen); SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, clusterId, clen);
if (clusterCtg) { if (ctg && (*ctg)) {
*catalogHandle = clusterCtg; *catalogHandle = *ctg;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
clusterCtg = calloc(1, sizeof(*clusterCtg)); SCatalog *clusterCtg = calloc(1, sizeof(SCatalog));
if (NULL == clusterCtg) { if (NULL == clusterCtg) {
ctgError("calloc %d failed", (int32_t)sizeof(*clusterCtg)); ctgError("calloc %d failed", (int32_t)sizeof(SCatalog));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
clusterCtg->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;
if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) { if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) {
ctgError("put cluster %s cache to hash failed", clusterId); ctgError("put cluster %s cache to hash failed", clusterId);
tfree(clusterCtg); tfree(clusterCtg);
...@@ -443,7 +475,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName ...@@ -443,7 +475,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
} }
if (NULL == pCatalog->dbCache.cache) { if (NULL == pCatalog->dbCache.cache) {
pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_CACHE_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->dbCache.cache) { if (NULL == pCatalog->dbCache.cache) {
ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER); ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
...@@ -515,7 +547,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe ...@@ -515,7 +547,9 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe
STableMetaOutput output = {0}; STableMetaOutput output = {0};
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output)); //CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));
CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output));
CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output)); CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output));
......
...@@ -313,7 +313,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -313,7 +313,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
return code; return code;
} }
code = tNameGetTableName(&name, pCreateTableInfo->tagdata.name); const char* pSTableName = tNameGetTableName(&name);
SArray* pValList = pCreateTableInfo->pTagVals; SArray* pValList = pCreateTableInfo->pTagVals;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -326,7 +326,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p ...@@ -326,7 +326,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
char dbName[TSDB_DB_FNAME_LEN] = {0}; char dbName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbName); tNameGetFullDbName(&name, dbName);
catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, pCreateTableInfo->tagdata.name, &pSuperTableMeta); catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbName, pSTableName, &pSuperTableMeta);
// too long tag values will return invalid sql, not be truncated automatically // too long tag values will return invalid sql, not be truncated automatically
SSchema *pTagSchema = getTableTagSchema(pSuperTableMeta); SSchema *pTagSchema = getTableTagSchema(pSuperTableMeta);
......
...@@ -266,9 +266,11 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { ...@@ -266,9 +266,11 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
void initQueryModuleMsgHandle() { void initQueryModuleMsgHandle() {
queryBuildMsg[TDMT_VND_TABLE_META] = queryBuildTableMetaReqMsg; queryBuildMsg[TDMT_VND_TABLE_META] = queryBuildTableMetaReqMsg;
queryBuildMsg[TDMT_MND_STB_META] = queryBuildTableMetaReqMsg;
queryBuildMsg[TDMT_MND_USE_DB] = queryBuildUseDbMsg; queryBuildMsg[TDMT_MND_USE_DB] = queryBuildUseDbMsg;
queryProcessMsgRsp[TDMT_VND_TABLE_META] = queryProcessTableMetaRsp; queryProcessMsgRsp[TDMT_VND_TABLE_META] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TDMT_MND_STB_META] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TDMT_MND_USE_DB] = queryProcessUseDBRsp; queryProcessMsgRsp[TDMT_MND_USE_DB] = queryProcessUseDBRsp;
} }
......
...@@ -42,25 +42,41 @@ enum { ...@@ -42,25 +42,41 @@ enum {
QW_WRITE, QW_WRITE,
}; };
typedef struct SQWorkerTaskStatus { enum {
QW_EXIST_ACQUIRE = 1,
QW_EXIST_RET_ERR,
};
enum {
QW_NOT_EXIST_RET_ERR = 1,
QW_NOT_EXIST_ADD,
};
enum {
QW_ADD_RET_ERR = 1,
QW_ADD_ACQUIRE,
};
typedef struct SQWTaskStatus {
SRWLatch lock; SRWLatch lock;
int32_t code; int32_t code;
int8_t status; int8_t status;
int8_t ready; int8_t ready;
bool cancel; bool cancel;
bool drop; bool drop;
} SQWorkerTaskStatus; } SQWTaskStatus;
typedef struct SQWorkerResCache { typedef struct SQWorkerResCache {
SRWLatch lock; SRWLatch lock;
void *data; void *data;
} SQWorkerResCache; } SQWorkerResCache;
typedef struct SQWorkerSchStatus { typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second int32_t lastAccessTs; // timestamp in second
SRWLatch tasksLock; SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWorkerTaskStatus SHashObj *tasksHash; // key:queryId+taskId, value: SQWorkerTaskStatus
} SQWorkerSchStatus; } SQWSchStatus;
// Qnode/Vnode level task management // Qnode/Vnode level task management
typedef struct SQWorkerMgmt { typedef struct SQWorkerMgmt {
...@@ -71,7 +87,7 @@ typedef struct SQWorkerMgmt { ...@@ -71,7 +87,7 @@ typedef struct SQWorkerMgmt {
SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_GOT_RES_DATA(data) (false) #define QW_GOT_RES_DATA(data) (true)
#define QW_LOW_RES_DATA(data) (false) #define QW_LOW_RES_DATA(data) (false)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) #define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
...@@ -86,8 +102,31 @@ typedef struct SQWorkerMgmt { ...@@ -86,8 +102,31 @@ typedef struct SQWorkerMgmt {
#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QW_LOCK(type, _lock) (QW_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) #define QW_LOCK(type, _lock) do { \
#define QW_UNLOCK(type, _lock) (QW_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) if (QW_READ == (type)) { \
if ((*(_lock)) < 0) assert(0); \
taosRLockLatch(_lock); \
qDebug("RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} else { \
if ((*(_lock)) < 0) assert(0); \
taosWLockLatch(_lock); \
qDebug("WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} \
} while (0)
#define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \
if ((*(_lock)) <= 0) assert(0); \
taosRUnLockLatch(_lock); \
qDebug("RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} else { \
if ((*(_lock)) <= 0) assert(0); \
taosWUnLockLatch(_lock); \
qDebug("WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} \
} while (0)
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -4,38 +4,42 @@ ...@@ -4,38 +4,42 @@
#include "qworkerInt.h" #include "qworkerInt.h"
#include "planner.h" #include "planner.h"
int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) { int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
int32_t code = 0; int32_t code = 0;
if (oriStatus == newStatus) { if (oriStatus == newStatus) {
if (newStatus == JOB_TASK_STATUS_CANCELLING) {
return TSDB_CODE_SUCCESS;
}
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
switch (oriStatus) { switch (oriStatus) {
case JOB_TASK_STATUS_NULL: case JOB_TASK_STATUS_NULL:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED ) { if (newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_NOT_START) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
break; break;
case JOB_TASK_STATUS_NOT_START: case JOB_TASK_STATUS_NOT_START:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED) { if (newStatus != JOB_TASK_STATUS_CANCELLED) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
break; break;
case JOB_TASK_STATUS_EXECUTING: case JOB_TASK_STATUS_EXECUTING:
if (newStatus != JOB_TASK_STATUS_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING) { if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_CANCELLING
&& newStatus != JOB_TASK_STATUS_CANCELLED
&& newStatus != JOB_TASK_STATUS_DROPPING) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
break; break;
case JOB_TASK_STATUS_PARTIAL_SUCCEED: case JOB_TASK_STATUS_PARTIAL_SUCCEED:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_CANCELLING) { if (newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_SUCCEED
&& newStatus != JOB_TASK_STATUS_CANCELLED) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -49,6 +53,10 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) { ...@@ -49,6 +53,10 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) {
break; break;
case JOB_TASK_STATUS_CANCELLED: case JOB_TASK_STATUS_CANCELLED:
case JOB_TASK_STATUS_DROPPING:
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
break;
default: default:
qError("invalid task status:%d", oriStatus); qError("invalid task status:%d", oriStatus);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
...@@ -58,17 +66,17 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) { ...@@ -58,17 +66,17 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) {
_return: _return:
qError("invalid task status:%d", oriStatus); qError("invalid task status, from %d to %d", oriStatus, newStatus);
QW_ERR_RET(code); QW_ERR_RET(code);
} }
int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) { int32_t qwUpdateTaskInfo(SQWTaskStatus *task, int8_t type, void *data) {
int32_t code = 0; int32_t code = 0;
switch (type) { switch (type) {
case QW_TASK_INFO_STATUS: { case QW_TASK_INFO_STATUS: {
int8_t newStatus = *(int8_t *)data; int8_t newStatus = *(int8_t *)data;
QW_ERR_RET(qwCheckStatusSwitch(task->status, newStatus)); QW_ERR_RET(qwValidateStatus(task->status, newStatus));
task->status = newStatus; task->status = newStatus;
break; break;
} }
...@@ -80,9 +88,9 @@ int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) { ...@@ -80,9 +88,9 @@ int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void *data) { int32_t qwAddTaskResCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, void *data) {
char id[sizeof(queryId) + sizeof(taskId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, queryId, taskId); QW_SET_QTID(id, qId, tId);
SQWorkerResCache resCache = {0}; SQWorkerResCache resCache = {0};
resCache.data = data; resCache.data = data;
...@@ -90,7 +98,7 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v ...@@ -90,7 +98,7 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v
QW_LOCK(QW_WRITE, &mgmt->resLock); QW_LOCK(QW_WRITE, &mgmt->resLock);
if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) { if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) {
QW_UNLOCK(QW_WRITE, &mgmt->resLock); QW_UNLOCK(QW_WRITE, &mgmt->resLock);
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", queryId, taskId); qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
...@@ -99,37 +107,8 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v ...@@ -99,37 +107,8 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) {
int32_t qwGetTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void **data) { SQWSchStatus newSch = {0};
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerResCache *resCache = taosHashGet(mgmt->resHash, id, sizeof(id));
if (NULL == resCache) {
qError("no task res for queryId[%"PRIx64"] taskId[%"PRIx64"]", queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
*data = resCache->data;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) {
QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &schedulerId, sizeof(schedulerId));
if (NULL == (*sch)) {
QW_LOCK(rwType, &mgmt->schLock);
return TSDB_CODE_QRY_SCH_NOT_EXIST;
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) {
SQWorkerSchStatus newSch = {0};
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == newSch.tasksHash) { if (NULL == newSch.tasksHash) {
qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
...@@ -138,19 +117,18 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker ...@@ -138,19 +117,18 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker
while (true) { while (true) {
QW_LOCK(QW_WRITE, &mgmt->schLock); QW_LOCK(QW_WRITE, &mgmt->schLock);
int32_t code = taosHashPut(mgmt->schHash, &schedulerId, sizeof(schedulerId), &newSch, sizeof(newSch)); int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
if (0 != code) { if (0 != code) {
if (!HASH_NODE_EXIST(code)) { if (!HASH_NODE_EXIST(code)) {
QW_UNLOCK(QW_WRITE, &mgmt->schLock); QW_UNLOCK(QW_WRITE, &mgmt->schLock);
qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId); qError("taosHashPut sId[%"PRIx64"] to scheduleHash failed", sId);
taosHashCleanup(newSch.tasksHash); taosHashCleanup(newSch.tasksHash);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} }
QW_UNLOCK(QW_WRITE, &mgmt->schLock); QW_UNLOCK(QW_WRITE, &mgmt->schLock);
if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, schedulerId, sch)) { if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD)) {
taosHashCleanup(newSch.tasksHash);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -159,63 +137,122 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker ...@@ -159,63 +137,122 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker
} }
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) {
QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
if (NULL == (*sch)) {
QW_UNLOCK(rwType, &mgmt->schLock);
if (QW_NOT_EXIST_ADD == nOpt) {
return qwAddScheduler(rwType, mgmt, sId, sch);
} else if (QW_NOT_EXIST_RET_ERR == nOpt) {
return TSDB_CODE_QRY_SCH_NOT_EXIST;
} else {
assert(0);
}
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->schLock); QW_UNLOCK(rwType, &mgmt->schLock);
} }
static FORCE_INLINE int32_t qwAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **task) { static int32_t qwAcquireTaskImpl(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) {
char id[sizeof(queryId) + sizeof(taskId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, queryId, taskId); QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &sch->tasksLock); QW_LOCK(rwType, &sch->tasksLock);
*task = taosHashGet(sch->tasksHash, id, sizeof(id)); *task = taosHashGet(sch->tasksHash, id, sizeof(id));
if (NULL == (*task)) { if (NULL == (*task)) {
QW_UNLOCK(rwType, &sch->tasksLock); QW_UNLOCK(rwType, &sch->tasksLock);
return TSDB_CODE_QRY_TASK_NOT_EXIST; return TSDB_CODE_QRY_TASK_NOT_EXIST;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE int32_t qwInsertAndAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, int8_t status, bool *inserted, SQWorkerTaskStatus **task) { static int32_t qwAcquireTask(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) {
char id[sizeof(queryId) + sizeof(taskId)] = {0}; return qwAcquireTaskImpl(rwType, sch, qId, tId, task);
QW_SET_QTID(id, queryId, taskId); }
while (true) {
*inserted = false;
static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) {
QW_UNLOCK(rwType, &sch->tasksLock);
}
int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int8_t status, int32_t eOpt, SQWTaskStatus **task) {
int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWTaskStatus ntask = {0};
ntask.status = status;
while (true) {
QW_LOCK(QW_WRITE, &sch->tasksLock); QW_LOCK(QW_WRITE, &sch->tasksLock);
int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &status, sizeof(status)); int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
if (0 != code) { if (0 != code) {
QW_UNLOCK(QW_WRITE, &sch->tasksLock); QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (HASH_NODE_EXIST(code)) { if (HASH_NODE_EXIST(code)) {
if (qwAcquireTask(rwType, sch, queryId, taskId, task)) { if (QW_EXIST_ACQUIRE == eOpt && rwType && task) {
if (qwAcquireTask(rwType, sch, qId, tId, task)) {
continue; continue;
} }
} else if (QW_EXIST_RET_ERR == eOpt) {
return TSDB_CODE_QRY_TASK_ALREADY_EXIST;
} else {
assert(0);
}
break; break;
} else { } else {
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", queryId, taskId); qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} }
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
*inserted = true; QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, queryId, taskId, task)) { if (rwType && task) {
if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, qId, tId, task)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else {
break;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWorkerSchStatus *sch) { static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWSchStatus **sch, SQWTaskStatus **task) {
QW_UNLOCK(rwType, &sch->tasksLock); SQWSchStatus *tsch = NULL;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &tsch, QW_NOT_EXIST_ADD));
int32_t code = qwAddTaskToSch(QW_READ, tsch, qId, tId, status, eOpt, task);
if (code) {
qwReleaseScheduler(QW_WRITE, mgmt);
}
if (NULL == task) {
qwReleaseScheduler(QW_READ, mgmt);
} else if (sch) {
*sch = tsch;
}
QW_RET(code);
} }
static FORCE_INLINE int32_t qwAcquireTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerResCache **res) { static FORCE_INLINE int32_t qwAcquireTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerResCache **res) {
char id[sizeof(queryId) + sizeof(taskId)] = {0}; char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId); QW_SET_QTID(id, queryId, taskId);
...@@ -235,27 +272,24 @@ static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgm ...@@ -235,27 +272,24 @@ static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgm
} }
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) { int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
SQWorkerSchStatus *schStatus = NULL; SQWSchStatus *sch = NULL;
int32_t taskNum = 0; int32_t taskNum = 0;
if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus)) { QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
qWarn("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
} else {
schStatus->lastAccessTs = taosGetTimestampSec();
QW_LOCK(QW_READ, &schStatus->tasksLock); sch->lastAccessTs = taosGetTimestampSec();
taskNum = taosHashGetSize(schStatus->tasksHash);
} QW_LOCK(QW_READ, &sch->tasksLock);
taskNum = taosHashGetSize(sch->tasksHash);
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum; int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
*rsp = calloc(1, size); *rsp = calloc(1, size);
if (NULL == *rsp) { if (NULL == *rsp) {
qError("calloc %d failed", size); qError("calloc %d failed", size);
if (schStatus) { QW_UNLOCK(QW_READ, &sch->tasksLock);
QW_UNLOCK(QW_READ, &schStatus->tasksLock);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
}
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
...@@ -264,23 +298,19 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler ...@@ -264,23 +298,19 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler
size_t keyLen = 0; size_t keyLen = 0;
int32_t i = 0; int32_t i = 0;
if (schStatus) { void *pIter = taosHashIterate(sch->tasksHash, NULL);
void *pIter = taosHashIterate(schStatus->tasksHash, NULL);
while (pIter) { while (pIter) {
SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter; SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
taosHashGetKey(pIter, &key, &keyLen); taosHashGetKey(pIter, &key, &keyLen);
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
(*rsp)->status[i].status = taskStatus->status; (*rsp)->status[i].status = taskStatus->status;
pIter = taosHashIterate(schStatus->tasksHash, pIter); pIter = taosHashIterate(sch->tasksHash, pIter);
}
} }
if (schStatus) { QW_UNLOCK(QW_READ, &sch->tasksLock);
QW_UNLOCK(QW_READ, &schStatus->tasksLock);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
}
(*rsp)->num = taskNum; (*rsp)->num = taskNum;
...@@ -289,88 +319,58 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler ...@@ -289,88 +319,58 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) { int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId) {
SQWorkerSchStatus *schStatus = NULL; SQWSchStatus *sch = NULL;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
schStatus->lastAccessTs = taosGetTimestampSec(); sch->lastAccessTs = taosGetTimestampSec();
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) {
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) { SQWSchStatus *sch = NULL;
SQWorkerSchStatus *sch = NULL; SQWTaskStatus *task = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, qId, tId, &task));
*taskStatus = task->status; QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
QW_UNLOCK(QW_WRITE, &task->lock);
_return: _return:
if (task) {
qwReleaseTask(QW_READ, sch);
}
if (sch) { qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
}
QW_RET(code); QW_RET(code);
} }
int32_t qwSwitchTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) { int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) {
SQWorkerSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
bool inserted = false;
if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) {
if (qwCheckStatusSwitch(JOB_TASK_STATUS_NULL, taskStatus)) {
qError("switch status error, not start to %d", taskStatus);
QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_RET(qwInsertAndAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) {
*taskStatus = JOB_TASK_STATUS_NULL;
return TSDB_CODE_SUCCESS;
} }
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
if (qwCheckStatusSwitch(JOB_TASK_STATUS_NOT_START, taskStatus)) {
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
qError("switch status error, not start to %d", taskStatus);
QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_JRET(qwInsertAndAcquireTask(QW_READ, sch, queryId, taskId, taskStatus, &inserted, &task)); *taskStatus = JOB_TASK_STATUS_NULL;
if (inserted) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
QW_LOCK(QW_WRITE, &task->lock); *taskStatus = task->status;
code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
QW_LOCK(QW_WRITE, &task->lock);
code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
_return:
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -379,25 +379,21 @@ _return: ...@@ -379,25 +379,21 @@ _return:
} }
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) {
SQWorkerSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD));
QW_ERR_RET(qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
}
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
code = qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START); qwReleaseScheduler(QW_READ, mgmt);
code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
if (code) { if (code) {
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
} }
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
...@@ -423,6 +419,7 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, ...@@ -423,6 +419,7 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId,
} }
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -449,9 +446,9 @@ _return: ...@@ -449,9 +446,9 @@ _return:
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) {
SQWorkerSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
char id[sizeof(queryId) + sizeof(taskId)] = {0}; char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId); QW_SET_QTID(id, queryId, taskId);
...@@ -462,15 +459,15 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u ...@@ -462,15 +459,15 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u
} }
QW_UNLOCK(QW_WRITE, &mgmt->resLock); QW_UNLOCK(QW_WRITE, &mgmt->resLock);
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, schedulerId, &sch)) { if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) {
qWarn("scheduler %"PRIx64" doesn't exist", schedulerId); qWarn("scheduler %"PRIx64" doesn't exist", sId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (qwAcquireTask(QW_WRITE, sch, queryId, taskId, &task)) { if (qwAcquireTask(QW_WRITE, sch, queryId, taskId, &task)) {
qwReleaseScheduler(QW_WRITE, mgmt); qwReleaseScheduler(QW_WRITE, mgmt);
qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId); qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", sId, queryId, taskId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -483,21 +480,21 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u ...@@ -483,21 +480,21 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u
} }
int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) {
SQWorkerSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD));
qWarn("scheduler %"PRIx64" doesn't exist", schedulerId);
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId); code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
return TSDB_CODE_SUCCESS; if (code) {
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
}
} }
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
...@@ -508,7 +505,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer ...@@ -508,7 +505,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer
int8_t newStatus = 0; int8_t newStatus = 0;
if (task->status == JOB_TASK_STATUS_EXECUTING) { if (task->status == JOB_TASK_STATUS_EXECUTING) {
newStatus = JOB_TASK_STATUS_CANCELLING; newStatus = JOB_TASK_STATUS_DROPPING;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus));
} else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) { } else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) {
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
...@@ -521,7 +518,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer ...@@ -521,7 +518,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(qwDropTask(mgmt, schedulerId, queryId, taskId)); QW_ERR_RET(qwDropTask(mgmt, sId, queryId, taskId));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -604,6 +601,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { ...@@ -604,6 +601,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = pMsg->msgType + 1,
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.pCont = pRsp, .pCont = pRsp,
...@@ -673,12 +671,12 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) { ...@@ -673,12 +671,12 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) {
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) {
SQWorkerSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
...@@ -706,10 +704,8 @@ _return: ...@@ -706,10 +704,8 @@ _return:
if (task) { if (task) {
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
}
if (sch) {
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
} }
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -717,12 +713,12 @@ _return: ...@@ -717,12 +713,12 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWorkerSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
...@@ -745,9 +741,6 @@ _return: ...@@ -745,9 +741,6 @@ _return:
if (task) { if (task) {
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
}
if (sch) {
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
} }
...@@ -756,15 +749,15 @@ _return: ...@@ -756,15 +749,15 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, bool *needStop) { int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, bool *needStop) {
SQWorkerSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
int8_t status = JOB_TASK_STATUS_CANCELLED; int8_t status = JOB_TASK_STATUS_CANCELLED;
*needStop = false; *needStop = false;
if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -776,11 +769,13 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_ ...@@ -776,11 +769,13 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_
QW_LOCK(QW_READ, &task->lock); QW_LOCK(QW_READ, &task->lock);
if ((!task->cancel) && (!task->drop)) { if ((!task->cancel) && (!task->drop)) {
qError("no cancel or drop, but task:%"PRIx64" exists", taskId);
QW_UNLOCK(QW_READ, &task->lock); QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; QW_RET(TSDB_CODE_QRY_APP_ERROR);
} }
QW_UNLOCK(QW_READ, &task->lock); QW_UNLOCK(QW_READ, &task->lock);
...@@ -791,30 +786,40 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_ ...@@ -791,30 +786,40 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
} else if (task->drop) { }
if (task->drop) {
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
qwDropTask(mgmt, schedulerId, queryId, taskId); return qwDropTask(mgmt, sId, queryId, taskId);
} }
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWorkerSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
int32_t needRsp = true; int32_t needRsp = true;
void *data = NULL; void *data = NULL;
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_READ, &task->lock); QW_LOCK(QW_READ, &task->lock);
if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED && task->status != JOB_TASK_STATUS_SUCCEED) { if (task->cancel || task->drop) {
qError("task is already cancelled or dropped");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
qError("invalid status %d for fetch", task->status); qError("invalid status %d for fetch", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -840,10 +845,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu ...@@ -840,10 +845,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
_return: _return:
if (task) { if (task) {
QW_UNLOCK(QW_READ, &task->lock); QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch);
} }
if (sch) { if (sch) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
} }
...@@ -854,37 +859,46 @@ _return: ...@@ -854,37 +859,46 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t status, int32_t errCode) { int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status, int32_t errCode) {
SQWorkerSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
int8_t newStatus = JOB_TASK_STATUS_CANCELLED; int8_t newStatus = JOB_TASK_STATUS_CANCELLED;
code = qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch); code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD);
if (code) { if (code) {
qError("schedulerId:%"PRIx64" not in cache", schedulerId); qError("sId:%"PRIx64" not in cache", sId);
QW_ERR_RET(code); QW_ERR_RET(code);
} }
code = qwAcquireTask(QW_READ, sch, queryId, taskId, &task); code = qwAcquireTask(QW_READ, sch, qId, tId, &task);
if (code) { if (code) {
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
qError("schedulerId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", schedulerId, queryId, taskId);
if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) {
qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId);
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task));
}
if (task->cancel) { if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus); qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus);
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
} else if (task->drop) { }
if (task->drop) {
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
qwDropTask(mgmt, schedulerId, queryId, taskId); qwDropTask(mgmt, sId, qId, tId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { }
if (!(task->cancel || task->drop)) {
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
task->code = errCode; task->code = errCode;
...@@ -938,24 +952,24 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -938,24 +952,24 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
int32_t code = 0;
SSubQueryMsg *msg = pMsg->pCont; SSubQueryMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
qError("invalid query msg"); qError("invalid query msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->schedulerId = htobe64(msg->schedulerId); msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId); msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId); msg->taskId = htobe64(msg->taskId);
msg->contentLen = ntohl(msg->contentLen); msg->contentLen = ntohl(msg->contentLen);
bool queryDone = false; bool queryDone = false;
bool queryRsp = false; bool queryRsped = false;
bool needStop = false; bool needStop = false;
SSubplan *plan = NULL; SSubplan *plan = NULL;
int32_t code = 0;
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop)); QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop));
if (needStop) { if (needStop) {
qWarn("task need stop"); qWarn("task need stop");
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
...@@ -963,7 +977,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -963,7 +977,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
code = qStringToSubplan(msg->msg, &plan); code = qStringToSubplan(msg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->schedulerId, msg->queryId, msg->taskId, code); qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->sId, msg->queryId, msg->taskId, code);
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
...@@ -974,12 +988,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -974,12 +988,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (code) { if (code) {
QW_ERR_JRET(code); QW_ERR_JRET(code);
} else { } else {
QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING)); QW_ERR_JRET(qwAddTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING, QW_EXIST_RET_ERR, NULL, NULL));
} }
QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
queryRsp = true; queryRsped = true;
//TODO call executer to execute subquery //TODO call executer to execute subquery
code = 0; code = 0;
...@@ -990,29 +1004,29 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -990,29 +1004,29 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (code) { if (code) {
QW_ERR_JRET(code); QW_ERR_JRET(code);
} else { } else {
QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data)); QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, data));
QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED));
} }
_return: _return:
if (queryRsp) { if (queryRsped) {
code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg, code); code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg, code);
} else { } else {
code = qwBuildAndSendQueryRsp(pMsg, code); code = qwBuildAndSendQueryRsp(pMsg, code);
} }
int8_t status = 0; int8_t status = 0;
if (TSDB_CODE_SUCCESS != code || queryDone) { if (TSDB_CODE_SUCCESS != code) {
if (code) { status = JOB_TASK_STATUS_FAILED;
status = JOB_TASK_STATUS_FAILED; //TODO set CANCELLED from code } else if (queryDone) {
} else {
status = JOB_TASK_STATUS_SUCCEED; status = JOB_TASK_STATUS_SUCCEED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
} }
qwQueryPostProcess(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, status, code); qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, code);
}
QW_RET(code); QW_RET(code);
} }
...@@ -1023,12 +1037,16 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ ...@@ -1023,12 +1037,16 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
} }
SResReadyMsg *msg = pMsg->pCont; SResReadyMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg"); qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg)); msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1040,14 +1058,16 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1040,14 +1058,16 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
SSchTasksStatusMsg *msg = pMsg->pCont; SSchTasksStatusMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg"); qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->sId = htobe64(msg->sId);
SSchedulerStatusRsp *sStatus = NULL; SSchedulerStatusRsp *sStatus = NULL;
QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus)); QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return: _return:
...@@ -1062,11 +1082,15 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1062,11 +1082,15 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
} }
SResFetchMsg *msg = pMsg->pCont; SResFetchMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->schedulerId)); msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->sId));
void *data = NULL; void *data = NULL;
SQWorkerResCache *res = NULL; SQWorkerResCache *res = NULL;
...@@ -1074,7 +1098,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1074,7 +1098,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(qwAcquireTaskResCache(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &res)); QW_ERR_RET(qwAcquireTaskResCache(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &res));
QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg)); QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
_return: _return:
...@@ -1090,12 +1114,16 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1090,12 +1114,16 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
STaskCancelMsg *msg = pMsg->pCont; STaskCancelMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task cancel msg"); qError("invalid task cancel msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return: _return:
...@@ -1111,12 +1139,16 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1111,12 +1139,16 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
STaskDropMsg *msg = pMsg->pCont; STaskDropMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task drop msg"); qError("invalid task drop msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return: _return:
...@@ -1125,6 +1157,31 @@ _return: ...@@ -1125,6 +1157,31 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
int8_t status = 0;
bool queryDone = false;
uint64_t sId, qId, tId;
//TODO call executer to continue execute subquery
code = 0;
void *data = NULL;
queryDone = false;
//TODO call executer to continue execute subquery
if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED;
} else if (queryDone) {
status = JOB_TASK_STATUS_SUCCEED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code);
QW_RET(code);
}
void qWorkerDestroy(void **qWorkerMgmt) { void qWorkerDestroy(void **qWorkerMgmt) {
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
......
...@@ -36,10 +36,25 @@ ...@@ -36,10 +36,25 @@
namespace { namespace {
bool testStop = false;
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
return 0; return 0;
} }
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) {
SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont;
printf("task num:%d\n", rsp->num);
for (int32_t i = 0; i < rsp->num; ++i) {
STaskStatus *task = &rsp->status[i];
printf("qId:%"PRIx64",tId:%"PRIx64",status:%d\n", task->queryId, task->taskId, task->status);
}
}
return;
}
void stubSetStringToPlan() { void stubSetStringToPlan() {
static Stub stub; static Stub stub;
...@@ -54,11 +69,148 @@ void stubSetStringToPlan() { ...@@ -54,11 +69,148 @@ void stubSetStringToPlan() {
} }
} }
void stubSetRpcSendResponse() {
static Stub stub;
stub.set(rpcSendResponse, qwtRpcSendResponse);
{
AddrAny any("libplanner.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendResponse$", result);
for (const auto& f : result) {
stub.set(f.second, qwtRpcSendResponse);
}
}
}
void *queryThread(void *param) {
SRpcMsg queryRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
while (!testStop) {
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
printf("query:%d\n", n);
}
}
return NULL;
}
void *readyThread(void *param) {
SRpcMsg readyRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResReadyMsg readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
while (!testStop) {
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
printf("ready:%d\n", n);
}
}
return NULL;
} }
void *fetchThread(void *param) {
SRpcMsg fetchRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResFetchMsg fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
TEST(testCase, normalCase) { while (!testStop) {
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
printf("fetch:%d\n", n);
}
}
return NULL;
}
void *dropThread(void *param) {
SRpcMsg dropRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
STaskDropMsg dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
while (!testStop) {
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
printf("drop:%d\n", n);
}
}
return NULL;
}
void *statusThread(void *param) {
SRpcMsg statusRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SSchTasksStatusMsg statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
while (!testStop) {
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
printf("status:%d\n", n);
}
}
return NULL;
}
}
TEST(seqTest, normalCase) {
void *mgmt = NULL; void *mgmt = NULL;
int32_t code = 0; int32_t code = 0;
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
...@@ -66,48 +218,254 @@ TEST(testCase, normalCase) { ...@@ -66,48 +218,254 @@ TEST(testCase, normalCase) {
SRpcMsg readyRpc = {0}; SRpcMsg readyRpc = {0};
SRpcMsg fetchRpc = {0}; SRpcMsg fetchRpc = {0};
SRpcMsg dropRpc = {0}; SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1); queryMsg->queryId = htobe64(1);
queryMsg->schedulerId = htobe64(1); queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1); queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100); queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg; queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyMsg readyMsg = {0}; SResReadyMsg readyMsg = {0};
readyMsg.schedulerId = htobe64(1); readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1); readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1); readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg; readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
SResFetchMsg fetchMsg = {0}; SResFetchMsg fetchMsg = {0};
fetchMsg.schedulerId = htobe64(1); fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1); fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1); fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg; fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
STaskDropMsg dropMsg = {0}; STaskDropMsg dropMsg = {0};
dropMsg.schedulerId = htobe64(1); dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1); dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1); dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg; dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
SSchTasksStatusMsg statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan(); stubSetStringToPlan();
stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt); code = qWorkerInit(NULL, &mgmt);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
qWorkerDestroy(&mgmt);
}
TEST(seqTest, cancelFirst) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
SRpcMsg queryRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
STaskDropMsg dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
SSchTasksStatusMsg statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();
stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
qWorkerDestroy(&mgmt);
}
TEST(seqTest, randCase) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
SRpcMsg queryRpc = {0};
SRpcMsg readyRpc = {0};
SRpcMsg fetchRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyMsg readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
SResFetchMsg fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
STaskDropMsg dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
SSchTasksStatusMsg statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();
stubSetRpcSendResponse();
srand(time(NULL));
code = qWorkerInit(NULL, &mgmt);
ASSERT_EQ(code, 0);
int32_t t = 0;
int32_t maxr = 10001;
while (true) {
int32_t r = rand() % maxr;
if (r >= 0 && r < maxr/5) {
printf("Query,%d\n", t++);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
} else if (r >= maxr/5 && r < maxr * 2/5) {
printf("Ready,%d\n", t++);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
} else if (r >= maxr * 2/5 && r < maxr* 3/5) {
printf("Fetch,%d\n", t++);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
} else if (r >= maxr * 3/5 && r < maxr * 4/5) {
printf("Drop,%d\n", t++);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
} else if (r >= maxr * 4/5 && r < maxr-1) {
printf("Status,%d\n", t++);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
} else {
printf("QUIT RAND NOW");
break;
}
}
qWorkerDestroy(&mgmt);
}
TEST(seqTest, multithreadRand) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
stubSetStringToPlan();
stubSetRpcSendResponse();
srand(time(NULL));
code = qWorkerInit(NULL, &mgmt);
ASSERT_EQ(code, 0);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t t1,t2,t3,t4,t5;
pthread_create(&(t1), &thattr, queryThread, mgmt);
pthread_create(&(t2), &thattr, readyThread, NULL);
pthread_create(&(t3), &thattr, fetchThread, NULL);
pthread_create(&(t4), &thattr, dropThread, NULL);
pthread_create(&(t5), &thattr, statusThread, NULL);
int32_t t = 0;
int32_t maxr = 10001;
sleep(300);
testStop = true;
sleep(1);
qWorkerDestroy(&mgmt);
} }
......
...@@ -38,11 +38,16 @@ enum { ...@@ -38,11 +38,16 @@ enum {
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId; uint64_t taskId;
uint64_t schedulerId; uint64_t sId;
SSchedulerCfg cfg; SSchedulerCfg cfg;
SHashObj *jobs; // key: queryId, value: SQueryJob* SHashObj *jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt; } SSchedulerMgmt;
typedef struct SSchCallbackParam {
uint64_t queryId;
uint64_t taskId;
} SSchCallbackParam;
typedef struct SSchLevel { typedef struct SSchLevel {
int32_t level; int32_t level;
int8_t status; int8_t status;
...@@ -120,6 +125,7 @@ typedef struct SSchJob { ...@@ -120,6 +125,7 @@ typedef struct SSchJob {
extern int32_t schLaunchTask(SSchJob *job, SSchTask *task); extern int32_t schLaunchTask(SSchJob *job, SSchTask *task);
extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -21,36 +21,6 @@ ...@@ -21,36 +21,6 @@
SSchedulerMgmt schMgmt = {0}; SSchedulerMgmt schMgmt = {0};
int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) {
/*
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
if (pRequest == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SRequestMsgBody body = {0};
buildConnectMsg(pRequest, &body);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
tsem_wait(&pRequest->body.rspSem);
destroyConnectMsg(&body);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
printf("failed to connect to server, reason: %s\n\n", errorMsg);
destroyRequest(pRequest);
taos_close(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
destroyRequest(pRequest);
}
*/
}
int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
for (int32_t i = 0; i < job->levelNum; ++i) { for (int32_t i = 0; i < job->levelNum; ++i) {
SSchLevel *level = taosArrayGet(job->levels, i); SSchLevel *level = taosArrayGet(job->levels, i);
...@@ -312,100 +282,6 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { ...@@ -312,100 +282,6 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
int32_t msgSize = 0;
void *msg = NULL;
switch (msgType) {
case TDMT_VND_SUBMIT: {
if (NULL == task->msg || task->msgLen <= 0) {
qError("submit msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = task->msgLen;
msg = task->msg;
break;
}
case TDMT_VND_QUERY: {
if (NULL == task->msg) {
qError("query msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = sizeof(SSubQueryMsg) + task->msgLen;
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSubQueryMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = htonl(task->msgLen);
memcpy(pMsg->msg, task->msg, task->msgLen);
break;
}
case TDMT_VND_RES_READY: {
msgSize = sizeof(SResReadyMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResReadyMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TDMT_VND_FETCH: {
if (NULL == task) {
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
msgSize = sizeof(SResFetchMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResFetchMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TDMT_VND_DROP_TASK:{
msgSize = sizeof(STaskDropMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
STaskDropMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
default:
qError("unknown msg type:%d", msgType);
break;
}
//TODO SEND MSG
//taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code);
return TSDB_CODE_SUCCESS;
}
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info // TODO if needRetry, set task retry info
...@@ -424,7 +300,7 @@ int32_t schFetchFromRemote(SSchJob *job) { ...@@ -424,7 +300,7 @@ int32_t schFetchFromRemote(SSchJob *job) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SCH_ERR_JRET(schAsyncSendMsg(job, job->fetchTask, TDMT_VND_FETCH)); SCH_ERR_JRET(schBuildAndSendMsg(job, job->fetchTask, TDMT_VND_FETCH));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -577,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { ...@@ -577,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
switch (msgType) { switch (msgType) {
case TDMT_VND_SUBMIT: { case TDMT_VND_SUBMIT_RSP: {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) { if (rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
...@@ -595,20 +471,20 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg ...@@ -595,20 +471,20 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg
} }
break; break;
} }
case TDMT_VND_QUERY: { case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg; SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) { if (rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else { } else {
code = schAsyncSendMsg(job, task, TDMT_VND_RES_READY); code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY);
if (code) { if (code) {
goto _task_error; goto _task_error;
} }
} }
break; break;
} }
case TDMT_VND_RES_READY: { case TDMT_VND_RES_READY_RSP: {
SResReadyRsp *rsp = (SResReadyRsp *)msg; SResReadyRsp *rsp = (SResReadyRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) { if (rsp->code != TSDB_CODE_SUCCESS) {
...@@ -621,7 +497,7 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg ...@@ -621,7 +497,7 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg
} }
break; break;
} }
case TDMT_VND_FETCH: { case TDMT_VND_FETCH_RSP: {
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
...@@ -631,6 +507,9 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg ...@@ -631,6 +507,9 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg
SCH_ERR_JRET(schProcessOnDataFetched(job)); SCH_ERR_JRET(schProcessOnDataFetched(job));
break; break;
} }
case TDMT_VND_DROP_TASK: {
}
default: default:
qError("unknown msg type:%d received", msgType); qError("unknown msg type:%d received", msgType);
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
...@@ -648,6 +527,211 @@ _return: ...@@ -648,6 +527,211 @@ _return:
} }
int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == job || NULL == (*job)) {
qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) {
qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode);
_return:
tfree(param);
SCH_RET(code);
}
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
}
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
switch (msgType) {
case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback;
break;
case TDMT_VND_QUERY:
*fp = schHandleQueryCallback;
break;
case TDMT_VND_RES_READY:
*fp = schHandleReadyCallback;
break;
case TDMT_VND_FETCH:
*fp = schHandleFetchCallback;
break;
case TDMT_VND_DROP_TASK:
*fp = schHandleDropCallback;
break;
default:
qError("unknown msg type:%d", msgType);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t code = 0;
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam));
if (NULL == param) {
qError("calloc %d failed", (int32_t)sizeof(SSchCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
param->queryId = qId;
param->taskId = tId;
pMsgSendInfo->param = param;
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp;
int64_t transporterId = 0;
SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo));
return TSDB_CODE_SUCCESS;
_return:
tfree(param);
tfree(pMsgSendInfo);
SCH_RET(code);
}
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
uint32_t msgSize = 0;
void *msg = NULL;
int32_t code = 0;
switch (msgType) {
case TDMT_VND_SUBMIT: {
if (NULL == task->msg || task->msgLen <= 0) {
qError("submit msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = task->msgLen;
msg = task->msg;
break;
}
case TDMT_VND_QUERY: {
if (NULL == task->msg) {
qError("query msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = sizeof(SSubQueryMsg) + task->msgLen;
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSubQueryMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = htonl(task->msgLen);
memcpy(pMsg->msg, task->msg, task->msgLen);
break;
}
case TDMT_VND_RES_READY: {
msgSize = sizeof(SResReadyMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResReadyMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TDMT_VND_FETCH: {
if (NULL == task) {
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
msgSize = sizeof(SResFetchMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResFetchMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TDMT_VND_DROP_TASK:{
msgSize = sizeof(STaskDropMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
STaskDropMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
default:
qError("unknown msg type:%d", msgType);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
}
SCH_ERR_JRET(schAsyncSendMsg(job->transport, &task->plan->execEpSet, job->queryId, task->taskId, msgType, msg, msgSize));
return TSDB_CODE_SUCCESS;
_return:
tfree(msg);
SCH_RET(code);
}
int32_t schLaunchTask(SSchJob *job, SSchTask *task) { int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
...@@ -664,7 +748,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { ...@@ -664,7 +748,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY; int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
SCH_ERR_RET(schAsyncSendMsg(job, task, msgType)); SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType));
SCH_ERR_RET(schPushTaskToExecList(job, task)); SCH_ERR_RET(schPushTaskToExecList(job, task));
...@@ -673,6 +757,8 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { ...@@ -673,6 +757,8 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schLaunchJob(SSchJob *job) { int32_t schLaunchJob(SSchJob *job) {
SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
for (int32_t i = 0; i < level->taskNum; ++i) { for (int32_t i = 0; i < level->taskNum; ++i) {
...@@ -690,7 +776,7 @@ void schDropJobAllTasks(SSchJob *job) { ...@@ -690,7 +776,7 @@ void schDropJobAllTasks(SSchJob *job) {
while (pIter) { while (pIter) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK); schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
pIter = taosHashIterate(job->succTasks, pIter); pIter = taosHashIterate(job->succTasks, pIter);
} }
...@@ -699,7 +785,7 @@ void schDropJobAllTasks(SSchJob *job) { ...@@ -699,7 +785,7 @@ void schDropJobAllTasks(SSchJob *job) {
while (pIter) { while (pIter) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK); schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
pIter = taosHashIterate(job->succTasks, pIter); pIter = taosHashIterate(job->succTasks, pIter);
} }
...@@ -717,7 +803,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { ...@@ -717,7 +803,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
} }
schMgmt.schedulerId = 1; //TODO GENERATE A UUID schMgmt.sId = 1; //TODO GENERATE A UUID
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
namespace { namespace {
extern "C" int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); extern "C" int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
void schtBuildQueryDag(SQueryDag *dag) { void schtBuildQueryDag(SQueryDag *dag) {
uint64_t qId = 0x0000000000000001; uint64_t qId = 0x0000000000000001;
...@@ -182,7 +182,7 @@ void *schtSendRsp(void *param) { ...@@ -182,7 +182,7 @@ void *schtSendRsp(void *param) {
SShellSubmitRspMsg rsp = {0}; SShellSubmitRspMsg rsp = {0};
rsp.affectedRows = 10; rsp.affectedRows = 10;
schHandleRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); schProcessRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
} }
...@@ -227,7 +227,7 @@ TEST(queryTest, normalCase) { ...@@ -227,7 +227,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0}; SQueryTableRsp rsp = {0};
code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
...@@ -238,7 +238,7 @@ TEST(queryTest, normalCase) { ...@@ -238,7 +238,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0}; SResReadyRsp rsp = {0};
code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
...@@ -249,7 +249,7 @@ TEST(queryTest, normalCase) { ...@@ -249,7 +249,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0}; SQueryTableRsp rsp = {0};
code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
...@@ -260,7 +260,7 @@ TEST(queryTest, normalCase) { ...@@ -260,7 +260,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter; SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0}; SResReadyRsp rsp = {0};
code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter); pIter = taosHashIterate(job->execTasks, pIter);
...@@ -269,7 +269,7 @@ TEST(queryTest, normalCase) { ...@@ -269,7 +269,7 @@ TEST(queryTest, normalCase) {
SRetrieveTableRsp rsp = {0}; SRetrieveTableRsp rsp = {0};
rsp.completed = 1; rsp.completed = 1;
rsp.numOfRows = 10; rsp.numOfRows = 10;
code = schHandleRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); code = schProcessRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册