diff --git a/CMakeLists.txt b/CMakeLists.txt index cda71fb3bf01ba1018a77c2741709a9d28a836c8..99e48006b19b0d12d9faa3ce2d51987d1cf4bfe7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,6 +11,7 @@ set(CMAKE_CONTRIB_DIR "${CMAKE_SOURCE_DIR}/contrib") include(${CMAKE_SUPPORT_DIR}/cmake.options) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma -g3") +SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC -gdwarf-2 -msse4.2 -mfma -g3") # contrib add_subdirectory(contrib) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 60c688d95bbd869984248532a6c0c0f2d671a82b..4d3370630370117c8b125ec656cfabaa051aaed6 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -110,7 +110,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList); /** * Get a table's vgroup from its name's hash value. @@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList); diff --git a/include/libs/index/index.h b/include/libs/index/index.h index f93c46da0c412e3529dd0c016ef7c42dc8de1686..d2b157542f7449013869fbca60a812aeebcf5730 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -85,6 +85,18 @@ SIndexTerm* indexTermCreate(int64_t suid, int32_t nColVal); void indexTermDestroy(SIndexTerm* p); +/* + * init index + * + */ +int32_t indexInit(); +/* + * destory index + * + */ + +void indexCleanUp(); + #ifdef __cplusplus } #endif diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 975b10353871f269abecb70d4974836d3e4a44b2..fa1483de873632f6a32148e27b945a92a0e74035 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -59,7 +59,15 @@ int32_t schedulerInit(SSchedulerCfg *cfg); * @param qnodeList Qnode address list, element is SEpAddr * @return */ -int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob); +int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows); + +/** + * Process the query job, generated according to the query physical plan. + * This is a asynchronized API, and is also thread-safety. + * @param qnodeList Qnode address list, element is SEpAddr + * @return + */ +int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob); int32_t scheduleFetchRows(void *pJob, void **data); @@ -79,4 +87,4 @@ void schedulerDestroy(void); } #endif -#endif /*_TD_SCHEDULER_H_*/ \ No newline at end of file +#endif /*_TD_SCHEDULER_H_*/ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 00fe769299812376b18ec18bf438fe3befb46048..c7ea3c2e09ed0fa0548c2f366b601fc8655cbd6e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -209,7 +209,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { } int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { - return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); + return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); } TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 7ca2822332a906a8854f3bdb9ab36ec34aede535..b882e1a42c39b11e55cb6a8a32c6cc1387a3122d 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -197,15 +197,21 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { return TSDB_CODE_SUCCESS; } -int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray* vgroupList) { +int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) { SHashObj *vgroupHash = NULL; SVgroupInfo *vgInfo = NULL; + *vgroupList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo)); + if (NULL == *vgroupList) { + ctgError("taosArrayInit failed"); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + void *pIter = taosHashIterate(dbInfo->vgInfo, NULL); while (pIter) { vgInfo = pIter; - if (NULL == taosArrayPush(vgroupList, vgInfo)) { + if (NULL == taosArrayPush(*vgroupList, vgInfo)) { ctgError("taosArrayPush failed"); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -295,14 +301,6 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } - if (NULL == pCatalog->tableCache.cache) { - pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == pCatalog->tableCache.cache) { - ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - } - if (NULL == pCatalog->tableCache.cache) { pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.cache) { @@ -329,7 +327,8 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out } } - if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, sizeof(*output->tbMeta)) != 0) { + int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); + if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { ctgError("push table[%s] to table cache failed", output->tbFname); goto error_exit; } @@ -529,7 +528,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta); } -int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) { +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -549,17 +548,29 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S int32_t vgId = tbMeta->vgId; if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { ctgError("vgId[%d] not found in vgroup list", vgId); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + *pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo)); + if (NULL == *pVgroupList) { + ctgError("taosArrayInit failed"); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) { + if (NULL == taosArrayPush(*pVgroupList, &vgroupInfo)) { ctgError("push vgroupInfo to array failed"); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } } + tfree(tbMeta); + + return TSDB_CODE_SUCCESS; + _return: tfree(tbMeta); + + taosArrayDestroy(*pVgroupList); CTG_RET(code); } @@ -634,8 +645,8 @@ _return: CTG_RET(code); } -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeEpSet) { +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index aa5a8e2b3e1aae9f75df30f0b2fc083bd9934c0f..62279b9e1f435e25e7ed34c0d49873dc806b20ef 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -32,27 +32,28 @@ #include "stub.h" #include "addr_any.h" -typedef struct SAppInstInfo { - int64_t numOfConns; - SCorEpSet mgmtEp; -} SAppInstInfo; - -typedef struct STscObj { - char user[TSDB_USER_LEN]; - char pass[TSDB_PASSWORD_LEN]; - char acctId[TSDB_ACCT_ID_LEN]; - char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; - uint32_t connId; - uint64_t id; // ref ID returned by taosAddRef -// struct SSqlObj *sqlList; - void *pTransporter; - pthread_mutex_t mutex; // used to protect the operation on db - int32_t numOfReqs; // number of sqlObj from this tscObj - SAppInstInfo *pAppInfo; -} STscObj; + namespace { +void ctgTestSetPrepareTableMeta(); +void ctgTestSetPrepareCTableMeta(); +void ctgTestSetPrepareSTableMeta(); + + +int32_t ctgTestVgNum = 10; +int32_t ctgTestColNum = 2; +int32_t ctgTestTagNum = 1; +int32_t ctgTestSVersion = 1; +int32_t ctgTestTVersion = 1; + +char *ctgTestClusterId = "cluster1"; +char *ctgTestDbname = "1.db1"; +char *ctgTestTablename = "table1"; +char *ctgTestCTablename = "ctable1"; +char *ctgTestSTablename = "stable1"; + + void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); strcpy(pReq->db, "1.db1"); @@ -88,22 +89,281 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { ASSERT_EQ(rpcRsp.code, 0); } -void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SUseDbRsp *rspMsg = NULL; //todo + pRsp->code =0; + pRsp->contLen = sizeof(SUseDbRsp) + ctgTestVgNum * sizeof(SVgroupInfo); + pRsp->pCont = calloc(1, pRsp->contLen); + rspMsg = (SUseDbRsp *)pRsp->pCont; + strcpy(rspMsg->db, ctgTestDbname); + rspMsg->vgVersion = htonl(1); + rspMsg->vgNum = htonl(ctgTestVgNum); + rspMsg->hashMethod = 0; + + SVgroupInfo *vg = NULL; + uint32_t hashUnit = UINT32_MAX / ctgTestVgNum; + for (int32_t i = 0; i < ctgTestVgNum; ++i) { + vg = &rspMsg->vgroupInfo[i]; + + vg->vgId = htonl(i + 1); + vg->hashBegin = htonl(i * hashUnit); + vg->hashEnd = htonl(hashUnit * (i + 1) - 1); + vg->numOfEps = i % TSDB_MAX_REPLICA + 1; + vg->inUse = i % vg->numOfEps; + for (int32_t n = 0; n < vg->numOfEps; ++n) { + SEpAddrMsg *addr = &vg->epAddr[n]; + strcpy(addr->fqdn, "a0"); + addr->port = htons(n + 22); + } + } + + vg->hashEnd = htonl(UINT32_MAX); + + return; +} + + + + +void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + STableMetaMsg *rspMsg = NULL; //todo + + pRsp->code =0; + pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->pCont = calloc(1, pRsp->contLen); + rspMsg = (STableMetaMsg *)pRsp->pCont; + sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestTablename); + rspMsg->numOfTags = 0; + rspMsg->numOfColumns = htonl(ctgTestColNum); + rspMsg->precision = 1; + rspMsg->tableType = TSDB_NORMAL_TABLE; + rspMsg->update = 1; + rspMsg->sversion = htonl(ctgTestSVersion); + rspMsg->tversion = htonl(ctgTestTVersion); + rspMsg->suid = 0; + rspMsg->tuid = htobe64(0x0000000000000001); + rspMsg->vgId = htonl(8); + + SSchema *s = NULL; + s = &rspMsg->pSchema[0]; + s->type = TSDB_DATA_TYPE_TIMESTAMP; + s->colId = htonl(0); + s->bytes = htonl(8); + strcpy(s->name, "ts"); + + s = &rspMsg->pSchema[1]; + s->type = TSDB_DATA_TYPE_INT; + s->colId = htonl(1); + s->bytes = htonl(4); + strcpy(s->name, "col1"); + + return; +} + + +void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + STableMetaMsg *rspMsg = NULL; //todo + + pRsp->code =0; + pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->pCont = calloc(1, pRsp->contLen); + rspMsg = (STableMetaMsg *)pRsp->pCont; + sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestCTablename); + sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); + rspMsg->numOfTags = htonl(ctgTestTagNum); + rspMsg->numOfColumns = htonl(ctgTestColNum); + rspMsg->precision = 1; + rspMsg->tableType = TSDB_CHILD_TABLE; + rspMsg->update = 1; + rspMsg->sversion = htonl(ctgTestSVersion); + rspMsg->tversion = htonl(ctgTestTVersion); + rspMsg->suid = htobe64(0x0000000000000002); + rspMsg->tuid = htobe64(0x0000000000000003); + rspMsg->vgId = htonl(9); + + SSchema *s = NULL; + s = &rspMsg->pSchema[0]; + s->type = TSDB_DATA_TYPE_TIMESTAMP; + s->colId = htonl(0); + s->bytes = htonl(8); + strcpy(s->name, "ts"); + + s = &rspMsg->pSchema[1]; + s->type = TSDB_DATA_TYPE_INT; + s->colId = htonl(1); + s->bytes = htonl(4); + strcpy(s->name, "col1s"); + + s = &rspMsg->pSchema[2]; + s->type = TSDB_DATA_TYPE_BINARY; + s->colId = htonl(2); + s->bytes = htonl(12); + strcpy(s->name, "tag1s"); + + + return; +} + + +void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + STableMetaMsg *rspMsg = NULL; //todo + + pRsp->code =0; + pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->pCont = calloc(1, pRsp->contLen); + rspMsg = (STableMetaMsg *)pRsp->pCont; + sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); + sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); + rspMsg->numOfTags = htonl(ctgTestTagNum); + rspMsg->numOfColumns = htonl(ctgTestColNum); + rspMsg->precision = 1; + rspMsg->tableType = TSDB_SUPER_TABLE; + rspMsg->update = 1; + rspMsg->sversion = htonl(ctgTestSVersion); + rspMsg->tversion = htonl(ctgTestTVersion); + rspMsg->suid = htobe64(0x0000000000000002); + rspMsg->tuid = htobe64(0x0000000000000003); + rspMsg->vgId = 0; + + SSchema *s = NULL; + s = &rspMsg->pSchema[0]; + s->type = TSDB_DATA_TYPE_TIMESTAMP; + s->colId = htonl(0); + s->bytes = htonl(8); + strcpy(s->name, "ts"); + + s = &rspMsg->pSchema[1]; + s->type = TSDB_DATA_TYPE_INT; + s->colId = htonl(1); + s->bytes = htonl(4); + strcpy(s->name, "col1s"); + + s = &rspMsg->pSchema[2]; + s->type = TSDB_DATA_TYPE_BINARY; + s->colId = htonl(2); + s->bytes = htonl(12); + strcpy(s->name, "tag1s"); + + return; } +void ctgTestPrepareDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetPrepareTableMeta(); + + return; +} + + +void ctgTestPrepareDbVgroupsAndChildMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetPrepareCTableMeta(); + + return; +} + +void ctgTestPrepareDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetPrepareSTableMeta(); + + return; +} + + + +void ctgTestSetPrepareDbVgroups() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareDbVgroups); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareDbVgroups); + } + } +} + +void ctgTestSetPrepareTableMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareTableMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareTableMeta); + } + } +} + +void ctgTestSetPrepareCTableMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareCTableMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareCTableMeta); + } + } +} + +void ctgTestSetPrepareSTableMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareSTableMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareSTableMeta); + } + } +} + +void ctgTestSetPrepareDbVgroupsAndNormalMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndNormalMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareDbVgroupsAndNormalMeta); + } + } +} + + +void ctgTestSetPrepareDbVgroupsAndChildMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndChildMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareDbVgroupsAndChildMeta); + } + } +} -void initTestEnv() { +void ctgTestSetPrepareDbVgroupsAndSuperMeta() { static Stub stub; - stub.set(rpcSendRecv, __rpcSendRecv); + stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndSuperMeta); { AddrAny any("libtransport.so"); std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto& f : result) { - stub.set(f.second, __rpcSendRecv); + stub.set(f.second, ctgTestPrepareDbVgroupsAndSuperMeta); } } } @@ -111,33 +371,267 @@ void initTestEnv() { } -TEST(testCase, normalCase) { - STscObj* pConn = (STscObj *)taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); - assert(pConn != NULL); +TEST(tableMeta, normalTable) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + + ctgTestSetPrepareDbVgroups(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &vgInfo); + ASSERT_EQ(code, 0); + ASSERT_EQ(vgInfo.vgId, 8); + ASSERT_EQ(vgInfo.numOfEps, 3); + + ctgTestSetPrepareTableMeta(); + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 8); + ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, 0); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 8); + ASSERT_EQ(tableMeta->tableType, TSDB_NORMAL_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, 0); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + catalogDestroy(); +} - char *clusterId = "cluster1"; - char *dbname = "1.db1"; - char *tablename = "table1"; +TEST(tableMeta, childTableCase) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; + ctgTestSetPrepareDbVgroupsAndChildMeta(); + initQueryModuleMsgHandle(); - sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); int32_t code = catalogInit(NULL); ASSERT_EQ(code, 0); - code = catalogGetHandle(clusterId, &pCtg); + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta); ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); - code = catalogGetTableHashVgroup(pCtg, pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet, dbname, tablename, &vgInfo); + tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestSTablename, &tableMeta); ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); - taos_close(pConn); + catalogDestroy(); } +TEST(tableMeta, superTableCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + + ctgTestSetPrepareDbVgroupsAndSuperMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestSTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + ctgTestSetPrepareCTableMeta(); + + tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + tableMeta = NULL; + code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 9); + ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + + + catalogDestroy(); +} + +TEST(tableDistVgroup, normalTable) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndNormalMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(vgInfo->vgId, 8); + ASSERT_EQ(vgInfo->numOfEps, 3); + + catalogDestroy(); +} + +TEST(tableDistVgroup, childTableCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndChildMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(vgInfo->vgId, 9); + ASSERT_EQ(vgInfo->numOfEps, 4); + + + catalogDestroy(); +} + +TEST(tableDistVgroup, superTableCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndSuperMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestSTablename, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(vgInfo->vgId, 1); + ASSERT_EQ(vgInfo->numOfEps, 1); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 1); + ASSERT_EQ(vgInfo->vgId, 2); + ASSERT_EQ(vgInfo->numOfEps, 2); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 2); + ASSERT_EQ(vgInfo->vgId, 3); + ASSERT_EQ(vgInfo->numOfEps, 3); + + + catalogDestroy(); +} + + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); diff --git a/source/libs/index/CMakeLists.txt b/source/libs/index/CMakeLists.txt index 4805bd3b77ced7b51d76d8e47c83e2b01dc66a59..50e76abd3f4dc7c0d255baf08fce3d2a34a95606 100644 --- a/source/libs/index/CMakeLists.txt +++ b/source/libs/index/CMakeLists.txt @@ -3,7 +3,9 @@ add_library(index ${INDEX_SRC}) target_include_directories( index PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index" + PUBLIC "${CMAKE_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" + ) target_link_libraries( index diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 2584a847ff2343db27a2f2acaf00a3f68c7aad68..a8f231da0a95a3ce5f1aaa33b48c5f61ee70eb05 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -49,7 +49,6 @@ struct SIndex { SHashObj* colObj; // < field name, field id> int64_t suid; // current super table id, -1 is normal table - int colId; // field id allocated to cache int32_t cVersion; // current version allocated to cache SIndexStat stat; @@ -88,41 +87,39 @@ typedef struct SIndexTermQuery { EIndexQueryType qType; } SIndexTermQuery; -#define indexFatal(...) \ - do { \ - if (sDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("index FATAL ", 255, __VA_ARGS__); \ - } \ +typedef struct Iterate { + void* iter; + int8_t type; + char* colVal; + SArray* val; +} Iterate; +extern void* indexQhandle; + +int indexFlushCacheTFile(SIndex* sIdx, void*); + +#define indexFatal(...) \ + do { \ + if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \ } while (0) -#define indexError(...) \ - do { \ - if (sDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("index ERROR ", 255, __VA_ARGS__); \ - } \ +#define indexError(...) \ + do { \ + if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); } \ } while (0) -#define indexWarn(...) \ - do { \ - if (sDebugFlag & DEBUG_WARN) { \ - taosPrintLog("index WARN ", 255, __VA_ARGS__); \ - } \ +#define indexWarn(...) \ + do { \ + if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); } \ } while (0) -#define indexInfo(...) \ - do { \ - if (sDebugFlag & DEBUG_INFO) { \ - taosPrintLog("index ", 255, __VA_ARGS__); \ - } \ +#define indexInfo(...) \ + do { \ + if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); } \ } while (0) -#define indexDebug(...) \ - do { \ - if (sDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \ - } \ +#define indexDebug(...) \ + do { \ + if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \ } while (0) -#define indexTrace(...) \ - do { \ - if (sDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \ - } \ +#define indexTrace(...) \ + do { \ + if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \ } while (0) #ifdef __cplusplus diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 692edcc064ca42d8443e3863e2072d14665a9595..07b5b8d564392ee6e4f6e08dfba80198e88ef8d9 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -22,10 +22,8 @@ // ----------------- key structure in skiplist --------------------- /* A data row, the format is like below: - * content: |<--totalLen-->|<-- fieldid-->|<--field type-->|<-- value len--->| - * |<-- value -->|<--uid -->|<--version--->|<-- itermType -->| - * len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->| - * <--valuelen->|<--uint64_t->| * <-- int32_t-->|<-- int8_t --->| + * content: |<--totalLen-->|<-- value len--->|<-- value -->|<--uid -->|<--version--->|<-- itermType -->| + * len : |<--int32_t -->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->| */ #ifdef __cplusplus @@ -34,12 +32,17 @@ extern "C" { typedef struct IndexCache { T_REF_DECLARE() - SSkipList* skiplist; + SSkipList *mem, *imm; + SIndex* index; + char* colName; + int32_t version; + int32_t nTerm; + int8_t type; + } IndexCache; typedef struct CacheTerm { // key - int32_t colId; int32_t nColVal; char* colVal; int32_t version; @@ -49,14 +52,18 @@ typedef struct CacheTerm { SIndexOperOnColumn operaType; } CacheTerm; // -IndexCache* indexCacheCreate(); + +IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type); void indexCacheDestroy(void* cache); -int indexCachePut(void* cache, SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid); +int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid); // int indexCacheGet(void *cache, uint64_t *rst); -int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s); +int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s); + +void indexCacheRef(IndexCache* cache); +void indexCacheUnRef(IndexCache* cache); void indexCacheDebug(IndexCache* cache); #ifdef __cplusplus diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h index c7269eda0f30a845c5b9a1810a5a4aea4baa56eb..be056f38fa183e1460dac83c9fd90b70e69c40e9 100644 --- a/source/libs/index/inc/index_fst_automation.h +++ b/source/libs/index/inc/index_fst_automation.h @@ -23,7 +23,7 @@ extern "C" { typedef struct AutomationCtx AutomationCtx; -typedef enum AutomationType { AUTOMATION_PREFIX, AUTMMATION_MATCH } AutomationType; +typedef enum AutomationType { AUTOMATION_ALWAYS, AUTOMATION_PREFIX, AUTMMATION_MATCH } AutomationType; typedef struct StartWith { AutomationCtx* autoSelf; diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index 416b10bd14c63ebea97b40d80175f033e0e3d7f8..550492ba5086401a5f5fe0bd1490168daead2eee 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -105,9 +105,13 @@ void tfileCacheDestroy(TFileCache* tcache); TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key); void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader); +TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName); + TFileReader* tfileReaderCreate(WriterCtx* ctx); void tfileReaderDestroy(TFileReader* reader); int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result); +void tfileReaderRef(TFileReader* reader); +void tfileReaderUnRef(TFileReader* reader); TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header); void tfileWriterDestroy(TFileWriter* tw); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 1c65dd03d596309688e72a6986e1d8e6c6466a17..3f871af01db4d84cdf731b6fcd17d80caf644957 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -18,11 +18,26 @@ #include "index_cache.h" #include "index_tfile.h" #include "tdef.h" +#include "tsched.h" #ifdef USE_LUCENE #include "lucene++/Lucene_c.h" #endif +#define INDEX_NUM_OF_THREADS 4 +#define INDEX_QUEUE_SIZE 4 + +void* indexQhandle = NULL; + +int32_t indexInit() { + indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); + return indexQhandle == NULL ? -1 : 0; + // do nothing +} +void indexCleanUp() { + taosCleanUpScheduler(indexQhandle); +} + static int uidCompare(const void* a, const void* b) { uint64_t u1 = *(uint64_t*)a; uint64_t u2 = *(uint64_t*)b; @@ -38,16 +53,15 @@ typedef struct SIdxColInfo { } SIdxColInfo; static pthread_once_t isInit = PTHREAD_ONCE_INIT; -static void indexInit(); +// static void indexInit(); static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); -static int indexFlushCacheTFile(SIndex* sIdx); static void indexInterResultsDestroy(SArray* results); static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult); int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { - pthread_once(&isInit, indexInit); + // pthread_once(&isInit, indexInit); SIndex* sIdx = calloc(1, sizeof(SIndex)); if (sIdx == NULL) { return -1; } @@ -57,10 +71,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { #endif #ifdef USE_INVERTED_INDEX - sIdx->cache = (void*)indexCacheCreate(); - sIdx->tindex = NULL; + // sIdx->cache = (void*)indexCacheCreate(sIdx); + sIdx->tindex = indexTFileCreate(path); sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - sIdx->colId = 1; sIdx->cVersion = 1; pthread_mutex_init(&sIdx->mtx, NULL); @@ -80,6 +93,12 @@ void indexClose(SIndex* sIdx) { #ifdef USE_INVERTED_INDEX indexCacheDestroy(sIdx->cache); + void* iter = taosHashIterate(sIdx->colObj, NULL); + while (iter) { + IndexCache** pCache = iter; + if (*pCache) { indexCacheUnRef(*pCache); } + iter = taosHashIterate(sIdx->colObj, iter); + } taosHashCleanup(sIdx->colObj); pthread_mutex_destroy(&sIdx->mtx); #endif @@ -110,29 +129,24 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { pthread_mutex_lock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); - SIdxColInfo* fi = taosHashGet(index->colObj, p->colName, p->nColName); - if (fi == NULL) { - SIdxColInfo tfi = {.colId = index->colId}; - index->cVersion++; - index->colId++; - taosHashPut(index->colObj, p->colName, p->nColName, &tfi, sizeof(tfi)); - } else { - // TODO, del + IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); + if (*cache == NULL) { + IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType); + taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*)); } } pthread_mutex_unlock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); - SIdxColInfo* fi = taosHashGet(index->colObj, p->colName, p->nColName); - assert(fi != NULL); - int32_t colId = fi->colId; - int32_t version = index->cVersion; - int ret = indexCachePut(index->cache, p, colId, version, uid); + IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); + + assert(*cache != NULL); + int ret = indexCachePut(*cache, p, uid); if (ret != 0) { return ret; } } -#endif +#endif return 0; } int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { @@ -281,32 +295,26 @@ void indexMultiTermDestroy(SIndexMultiTerm* terms) { taosArrayDestroy(terms); } -void indexInit() { - // do nothing -} static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) { - int32_t version = -1; - int16_t colId = -1; - SIdxColInfo* colInfo = NULL; - SIndexTerm* term = query->term; const char* colName = term->colName; int32_t nColName = term->nColName; + // Get col info + IndexCache* cache = NULL; pthread_mutex_lock(&sIdx->mtx); - colInfo = taosHashGet(sIdx->colObj, colName, nColName); - if (colInfo == NULL) { + IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName); + if (*pCache == NULL) { pthread_mutex_unlock(&sIdx->mtx); return -1; } - colId = colInfo->colId; - version = colInfo->cVersion; + cache = *pCache; pthread_mutex_unlock(&sIdx->mtx); *result = taosArrayInit(4, sizeof(uint64_t)); // TODO: iterator mem and tidex STermValueType s; - if (0 == indexCacheSearch(sIdx->cache, query, colId, version, *result, &s)) { + if (0 == indexCacheSearch(cache, query, *result, &s)) { if (s == kTypeDeletion) { indexInfo("col: %s already drop by other opera", term->colName); // coloum already drop by other oper, no need to query tindex @@ -353,10 +361,14 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType } return 0; } -static int indexFlushCacheTFile(SIndex* sIdx) { +int indexFlushCacheTFile(SIndex* sIdx, void* cache) { if (sIdx == NULL) { return -1; } - indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); + IndexCache* pCache = (IndexCache*)cache; + + TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName); + tfileReaderUnRef(pReader); + indexCacheUnRef(pCache); return 0; } diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 9c922030885d40020627c55666f648ddea8eb02b..8181c1750531ef3c6d7a75459dfe7bf075fe0406 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -16,9 +16,11 @@ #include "index_cache.h" #include "index_util.h" #include "tcompare.h" +#include "tsched.h" #define MAX_INDEX_KEY_LEN 256 // test only, change later +#define CACH_LIMIT 1000000 // ref index_cache.h:22 //#define CACHE_KEY_LEN(p) \ // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) @@ -38,9 +40,6 @@ static int32_t compareKey(const void* l, const void* r) { CacheTerm* lt = (CacheTerm*)l; CacheTerm* rt = (CacheTerm*)r; - // compare colId - if (lt->colId != rt->colId) { return lt->colId - rt->colId; } - // compare colVal int i, j; for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) { @@ -56,71 +55,40 @@ static int32_t compareKey(const void* l, const void* r) { return -1; } // compare version - return rt->version - lt->version; +} - // char* lp = (char*)l; - // char* rp = (char*)r; - - //// compare col id - // int16_t lf, rf; // cold id - // memcpy(&lf, lp, sizeof(lf)); - // memcpy(&rf, rp, sizeof(rf)); - // if (lf != rf) { return lf < rf ? -1 : 1; } - - // lp += sizeof(lf); - // rp += sizeof(rf); - - //// skip value len - // int32_t lfl, rfl; - // memcpy(&lfl, lp, sizeof(lfl)); - // memcpy(&rfl, rp, sizeof(rfl)); - // lp += sizeof(lfl); - // rp += sizeof(rfl); - - //// compare value - // int32_t i, j; - // for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) { - // if (lp[i] == rp[j]) { - // continue; - // } else { - // return lp[i] < rp[j] ? -1 : 1; - // } - //} - // if (i < lfl) { - // return 1; - //} else if (j < rfl) { - // return -1; - //} - // lp += lfl; - // rp += rfl; - - //// compare version, desc order - // int32_t lv, rv; - // memcpy(&lv, lp, sizeof(lv)); - // memcpy(&rv, rp, sizeof(rv)); - // if (lv != rv) { return lv < rv ? 1 : -1; } - - // return 0; +static SSkipList* indexInternalCacheCreate(int8_t type) { + if (type == TSDB_DATA_TYPE_BINARY) { + return tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); + } } -IndexCache* indexCacheCreate() { + +IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { IndexCache* cache = calloc(1, sizeof(IndexCache)); if (cache == NULL) { indexError("failed to create index cache"); return NULL; - } - cache->skiplist = - tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); + }; + cache->mem = indexInternalCacheCreate(type); + + cache->colName = calloc(1, strlen(colName) + 1); + memcpy(cache->colName, colName, strlen(colName)); + cache->type = type; + cache->index = idx; + cache->version = 0; + + indexCacheRef(cache); return cache; } void indexCacheDebug(IndexCache* cache) { - SSkipListIterator* iter = tSkipListCreateIter(cache->skiplist); + SSkipListIterator* iter = tSkipListCreateIter(cache->mem); while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); if (ct != NULL) { // TODO, add more debug info - indexInfo("{colId:%d, colVal: %s, version: %d} \t", ct->colId, ct->colVal, ct->version); + indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version); } } tSkipListDestroyIter(iter); @@ -129,37 +97,71 @@ void indexCacheDebug(IndexCache* cache) { void indexCacheDestroy(void* cache) { IndexCache* pCache = cache; if (pCache == NULL) { return; } - tSkipListDestroy(pCache->skiplist); + tSkipListDestroy(pCache->mem); + tSkipListDestroy(pCache->imm); + free(pCache->colName); free(pCache); } -int indexCachePut(void* cache, SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { +static void doMergeWork(SSchedMsg* msg) { + IndexCache* pCache = msg->ahandle; + SIndex* sidx = (SIndex*)pCache->index; + indexFlushCacheTFile(sidx, pCache); +} + +int indexCacheSchedToMerge(IndexCache* pCache) { + SSchedMsg schedMsg = {0}; + schedMsg.fp = doMergeWork; + schedMsg.ahandle = pCache; + schedMsg.thandle = NULL; + schedMsg.msg = NULL; + + taosScheduleTask(indexQhandle, &schedMsg); +} +int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { if (cache == NULL) { return -1; } IndexCache* pCache = cache; + indexCacheRef(pCache); // encode data CacheTerm* ct = calloc(1, sizeof(CacheTerm)); if (cache == NULL) { return -1; } // set up key - ct->colId = colId; ct->colType = term->colType; ct->nColVal = term->nColVal; ct->colVal = (char*)calloc(1, sizeof(char) * (ct->nColVal + 1)); memcpy(ct->colVal, term->colVal, ct->nColVal); - ct->version = version; - + ct->version = atomic_add_fetch_32(&pCache->version, 1); + // set value ct->uid = uid; ct->operaType = term->operType; - tSkipListPut(pCache->skiplist, (char*)ct); + tSkipListPut(pCache->mem, (char*)ct); + pCache->nTerm += 1; + + if (pCache->nTerm >= CACH_LIMIT) { + pCache->nTerm = 0; + + while (pCache->imm != NULL) { + // do nothong + } + + pCache->imm = pCache->mem; + pCache->mem = indexInternalCacheCreate(pCache->type); + + // sched to merge + // unref cache int bgwork + indexCacheSchedToMerge(pCache); + } + indexCacheUnRef(pCache); return 0; // encode end } -int indexCacheDel(void* cache, int32_t fieldId, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { +int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { IndexCache* pCache = cache; return 0; } -int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { +int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { if (cache == NULL) { return -1; } IndexCache* pCache = cache; SIndexTerm* term = query->term; @@ -167,15 +169,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t CacheTerm* ct = calloc(1, sizeof(CacheTerm)); if (ct == NULL) { return -1; } - ct->colId = colId; ct->nColVal = term->nColVal; ct->colVal = calloc(1, sizeof(char) * (ct->nColVal + 1)); memcpy(ct->colVal, term->colVal, ct->nColVal); - ct->version = version; + ct->version = atomic_load_32(&pCache->version); char* key = getIndexKey(ct); // TODO handle multi situation later, and refactor - SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->skiplist, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); if (node != NULL) { @@ -209,3 +210,12 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t } return 0; } + +void indexCacheRef(IndexCache* cache) { + int ref = T_REF_INC(cache); + UNUSED(ref); +} +void indexCacheUnRef(IndexCache* cache) { + int ref = T_REF_DEC(cache); + if (ref == 0) { indexCacheDestroy(cache); } +} diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 18514cd0d5498eba9a31d8d28b197557bdf23bd8..18024fa39110c3a6acb3052e76e84c332ef425bf 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1083,7 +1083,7 @@ bool fstBoundWithDataExceededBy(FstBoundWithData* bound, FstSlice* slice) { } else if (bound->type == Excluded) { return comp >= 0 ? true : false; } else { - return true; + return false; } } bool fstBoundWithDataIsEmpty(FstBoundWithData* bound) { @@ -1224,7 +1224,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb void* start = automFuncs[aut->type].start(aut); if (automFuncs[aut->type].isMatch(aut, start)) { FstSlice s = fstSliceCreate(NULL, 0); - return swsResultCreate(&s, output, callback(start)); + return swsResultCreate(&s, output, callback == NULL ? NULL : callback(start)); } } SArray* nodes = taosArrayInit(8, sizeof(FstNode*)); @@ -1237,10 +1237,12 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb } FstTransition trn; fstNodeGetTransitionAt(p->node, p->trans, &trn); - Output out = p->out.out + trn.out; - void* nextState = automFuncs[aut->type].accept(aut, p->autState, trn.inp); - void* tState = callback(nextState); - bool isMatch = automFuncs[aut->type].isMatch(aut, nextState); + + Output out = p->out.out + trn.out; + void* nextState = automFuncs[aut->type].accept(aut, p->autState, trn.inp); + void* tState = (callback == NULL) ? NULL : callback(nextState); + bool isMatch = automFuncs[aut->type].isMatch(aut, nextState); + FstNode* nextNode = fstGetNode(sws->fst, trn.addr); taosArrayPush(nodes, &nextNode); taosArrayPush(sws->inp, &(trn.inp)); diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index eb4c479c9406d4be367df458235c47f7b87ff9f8..72983809d132810f70c0eadf3832286ce8915e3c 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -64,6 +64,25 @@ StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) { return nsv; } +// iterate fst +static void* alwaysMatchStart(AutomationCtx* ctx) { + return NULL; +} +static bool alwaysMatchIsMatch(AutomationCtx* ctx, void* state) { + return true; +} +static bool alwaysMatchCanMatch(AutomationCtx* ctx, void* state) { + return true; +} +static bool alwaysMatchWillAlwaysMatch(AutomationCtx* ctx, void* state) { + return true; +} +static void* alwaysMatchAccpet(AutomationCtx* ctx, void* state, uint8_t byte) { + return NULL; +} +static void* alwaysMatchAccpetEof(AutomationCtx* ctx, void* state) { + return NULL; +} // prefix query, impl later static void* prefixStart(AutomationCtx* ctx) { @@ -127,6 +146,7 @@ static void* patternAcceptEof(AutomationCtx* ctx, void* state) { } AutomationFunc automFuncs[] = { + {alwaysMatchStart, alwaysMatchIsMatch, alwaysMatchCanMatch, alwaysMatchWillAlwaysMatch, alwaysMatchAccpet, alwaysMatchAccpetEof}, {prefixStart, prefixIsMatch, prefixCanMatch, prefixWillAlwaysMatch, prefixAccept, prefixAcceptEof}, {patternStart, patternIsMatch, patternCanMatch, patternWillAlwaysMatch, patternAccept, patternAcceptEof} // add more search type @@ -137,7 +157,11 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) { if (ctx == NULL) { return NULL; } StartWithStateValue* sv = NULL; - if (atype == AUTOMATION_PREFIX) { + if (atype == AUTOMATION_ALWAYS) { + int val = 0; + sv = startWithStateValueCreate(Running, FST_INT, &val); + ctx->stdata = (void*)sv; + } else if (atype == AUTOMATION_PREFIX) { int val = 0; sv = startWithStateValueCreate(Running, FST_INT, &val); ctx->stdata = (void*)sv; diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 003ae86c6a9d1030f4b1586b7fd66491b7867459..0dfb14cc8d1d3cb3827a70121ff738b1ff0cb5fd 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -33,11 +33,9 @@ static int tfileWriteHeader(TFileWriter* writer); static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteData(TFileWriter* write, TFileValue* tval); -static int tfileReaderLoadHeader(TFileReader* reader); -static int tfileReaderLoadFst(TFileReader* reader); -static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); -static void tfileReaderRef(TFileReader* reader); -static void tfileReaderUnRef(TFileReader* reader); +static int tfileReaderLoadHeader(TFileReader* reader); +static int tfileReaderLoadFst(TFileReader* reader); +static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static int tfileGetFileList(const char* path, SArray* result); static int tfileRmExpireFile(SArray* result); @@ -131,7 +129,6 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); return; } - TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* reader = calloc(1, sizeof(TFileReader)); if (reader == NULL) { return NULL; } @@ -317,6 +314,11 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { return 0; } +TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) { + if (tf == NULL) { return NULL; } + TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; + return tfileCacheGet(tf->cache, &key); +} static int tfileStrCompare(const void* a, const void* b) { int ret = strcmp((char*)a, (char*)b); @@ -423,12 +425,12 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* free(buf); return 0; } -static void tfileReaderRef(TFileReader* reader) { +void tfileReaderRef(TFileReader* reader) { int ref = T_REF_INC(reader); UNUSED(ref); } -static void tfileReaderUnRef(TFileReader* reader) { +void tfileReaderUnRef(TFileReader* reader) { int ref = T_REF_DEC(reader); if (ref == 0) { tfileReaderDestroy(reader); } } @@ -479,9 +481,9 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, return -1; } static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) { - SERIALIZE_MEM_TO_BUF(buf, key, suid); - SERIALIZE_VAR_TO_BUF(buf, '_', char); - SERIALIZE_MEM_TO_BUF(buf, key, colType); - SERIALIZE_VAR_TO_BUF(buf, '_', char); + // SERIALIZE_MEM_TO_BUF(buf, key, suid); + // SERIALIZE_VAR_TO_BUF(buf, '_', char); + // SERIALIZE_MEM_TO_BUF(buf, key, colType); + // SERIALIZE_VAR_TO_BUF(buf, '_', char); SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); } diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index 6eb532b41e5e619aa3169ebaea37c09924df4ec1..3957554748dd7797822313524141da0486d4e14d 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -1,13 +1,23 @@ add_executable(indexTest "") +add_executable(fstTest "") target_sources(indexTest PRIVATE "indexTests.cc" ) +target_sources(fstTest + PRIVATE + "fstTest.cc" +) target_include_directories ( indexTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories ( fstTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/index" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries (indexTest os util @@ -15,8 +25,16 @@ target_link_libraries (indexTest gtest_main index ) - -add_test( - NAME index_test - COMMAND indexTest +target_link_libraries (fstTest + os + util + common + gtest_main + index ) + + +#add_test( +# NAME index_test +# COMMAND indexTest +#) diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc new file mode 100644 index 0000000000000000000000000000000000000000..85bb5e2b152da7bf550d9a5e79aeb837def4059d --- /dev/null +++ b/source/libs/index/test/fstTest.cc @@ -0,0 +1,174 @@ + +#include +#include +#include +#include "index.h" +#include "indexInt.h" +#include "index_cache.h" +#include "index_fst.h" +#include "index_fst_counting_writer.h" +#include "index_fst_util.h" +#include "index_tfile.h" +#include "tskiplist.h" +#include "tutil.h" + +void* callback(void* s) { + return s; +} + +static std::string fileName = "/tmp/tindex.tindex"; +class FstWriter { + public: + FstWriter() { + remove(fileName.c_str()); + _wc = writerCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024); + _b = fstBuilderCreate(_wc, 0); + } + bool Put(const std::string& key, uint64_t val) { + FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size()); + bool ok = fstBuilderInsert(_b, skey, val); + fstSliceDestroy(&skey); + return ok; + } + ~FstWriter() { + fstBuilderFinish(_b); + fstBuilderDestroy(_b); + + writerCtxDestroy(_wc); + } + + private: + FstBuilder* _b; + WriterCtx* _wc; +}; + +class FstReadMemory { + public: + FstReadMemory(size_t size) { + _wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024); + _w = fstCountingWriterCreate(_wc); + _size = size; + memset((void*)&_s, 0, sizeof(_s)); + } + bool init() { + char* buf = (char*)calloc(1, sizeof(char) * _size); + int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); + if (nRead <= 0) { return false; } + _size = nRead; + _s = fstSliceCreate((uint8_t*)buf, _size); + _fst = fstCreate(&_s); + free(buf); + return _fst != NULL; + } + bool Get(const std::string& key, uint64_t* val) { + FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size()); + bool ok = fstGet(_fst, &skey, val); + fstSliceDestroy(&skey); + return ok; + } + bool GetWithTimeCostUs(const std::string& key, uint64_t* val, uint64_t* elapse) { + int64_t s = taosGetTimestampUs(); + bool ok = this->Get(key, val); + int64_t e = taosGetTimestampUs(); + *elapse = e - s; + return ok; + } + // add later + bool Search(AutomationCtx* ctx, std::vector& result) { + FstStreamBuilder* sb = fstSearch(_fst, ctx); + StreamWithState* st = streamBuilderIntoStream(sb); + StreamWithStateResult* rt = NULL; + while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { + // result.push_back((uint64_t)(rt->out.out)); + FstSlice* s = &rt->data; + int32_t sz = 0; + char* ch = (char*)fstSliceData(s, &sz); + std::string key(ch, sz); + printf("key: %s, val: %" PRIu64 "\n", key.c_str(), (uint64_t)(rt->out.out)); + swsResultDestroy(rt); + } + for (size_t i = 0; i < result.size(); i++) {} + std::cout << std::endl; + return true; + } + bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector& result) { + int64_t s = taosGetTimestampUs(); + bool ok = this->Search(ctx, result); + int64_t e = taosGetTimestampUs(); + return ok; + } + + ~FstReadMemory() { + fstCountingWriterDestroy(_w); + fstDestroy(_fst); + fstSliceDestroy(&_s); + writerCtxDestroy(_wc); + } + + private: + FstCountingWriter* _w; + Fst* _fst; + FstSlice _s; + WriterCtx* _wc; + size_t _size; +}; + +#define L 100 +#define M 100 +#define N 100 + +int Performance_fstWriteRecords(FstWriter* b) { + std::string str("aa"); + for (int i = 0; i < L; i++) { + str[0] = 'a' + i; + str.resize(2); + for (int j = 0; j < M; j++) { + str[1] = 'a' + j; + str.resize(2); + for (int k = 0; k < N; k++) { + str.push_back('a'); + b->Put(str, k); + printf("(%d, %d, %d, %s)\n", i, j, k, str.c_str()); + } + } + } + return L * M * N; +} +void checkFstCheckIterator() { + tfInit(); + FstWriter* fw = new FstWriter; + int64_t s = taosGetTimestampUs(); + int count = 2; + Performance_fstWriteRecords(fw); + int64_t e = taosGetTimestampUs(); + + std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl; + delete fw; + + FstReadMemory* m = new FstReadMemory(1024 * 64); + if (m->init() == false) { + std::cout << "init readMemory failed" << std::endl; + delete m; + return; + } + + // prefix search + std::vector result; + + AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS); + m->Search(ctx, result); + std::cout << "size: " << result.size() << std::endl; + // assert(result.size() == count); + for (int i = 0; i < result.size(); i++) { + // assert(result[i] == i); // check result + } + + free(ctx); + delete m; + tfCleanup(); +} +int main() { + checkFstCheckIterator(); + // checkFstPrefixSearch(); + return 1; +} diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index bed2b82daa97050548cd3d9d876a5359c4682572..17733dd2842389527f26618aac701ade9ee30f87 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -471,18 +471,22 @@ class CacheObj { public: CacheObj() { // TODO - cache = indexCacheCreate(); + cache = indexCacheCreate(NULL, "voltage", TSDB_DATA_TYPE_BINARY); } int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { - int ret = indexCachePut(cache, term, colId, version, uid); + int ret = indexCachePut(cache, term, uid); if (ret != 0) { // std::cout << "failed to put into cache: " << ret << std::endl; } return ret; } + void Debug() { + // + indexCacheDebug(cache); + } int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { - int ret = indexCacheSearch(cache, query, colId, version, result, s); + int ret = indexCacheSearch(cache, query, result, s); if (ret != 0) { // std::cout << "failed to get from cache:" << ret << std::endl; @@ -515,6 +519,7 @@ class IndexCacheEnv : public ::testing::Test { TEST_F(IndexCacheEnv, cache_test) { int version = 0; int16_t colId = 0; + int16_t othColId = 10; uint64_t suid = 0; std::string colName("voltage"); @@ -544,15 +549,27 @@ TEST_F(IndexCacheEnv, cache_test) { coj->Put(term, colId, version++, suid++); } + { + std::string colVal("v3"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, othColId, version++, suid++); + } + { + std::string colVal("v4"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, othColId, version++, suid++); + } { std::string colVal("v4"); - for (size_t i = 0; i < 100; i++) { + for (size_t i = 0; i < 10; i++) { colVal[colVal.size() - 1] = 'a' + i; SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); } } + coj->Debug(); + // begin query { std::string colVal("v3"); SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); @@ -561,7 +578,8 @@ TEST_F(IndexCacheEnv, cache_test) { STermValueType valType; coj->Get(&query, colId, 10000, ret, &valType); - assert(taosArrayGetSize(ret) == 3); + // std::cout << "size : " << taosArrayGetSize(ret) << std::endl; + assert(taosArrayGetSize(ret) == 4); } { std::string colVal("v2"); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 9507b9322241c143acae92dd801e3a51d3e59381..34db262d5dbb6a894f920d3be0341af36d4d92dc 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -92,8 +92,8 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; } - pRsp->vgVersion = htonl(pRsp->vgVersion); - pRsp->vgNum = htonl(pRsp->vgNum); + pRsp->vgVersion = ntohl(pRsp->vgVersion); + pRsp->vgNum = ntohl(pRsp->vgNum); if (pRsp->vgNum < 0) { qError("invalid db[%s] vgroup number[%d]", pRsp->db, pRsp->vgNum); @@ -115,12 +115,12 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { } for (int32_t i = 0; i < pRsp->vgNum; ++i) { - pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId); - pRsp->vgroupInfo[i].hashBegin = htonl(pRsp->vgroupInfo[i].hashBegin); - pRsp->vgroupInfo[i].hashEnd = htonl(pRsp->vgroupInfo[i].hashEnd); + pRsp->vgroupInfo[i].vgId = ntohl(pRsp->vgroupInfo[i].vgId); + pRsp->vgroupInfo[i].hashBegin = ntohl(pRsp->vgroupInfo[i].hashBegin); + pRsp->vgroupInfo[i].hashEnd = ntohl(pRsp->vgroupInfo[i].hashEnd); for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) { - pRsp->vgroupInfo[i].epAddr[n].port = htons(pRsp->vgroupInfo[i].epAddr[n].port); + pRsp->vgroupInfo[i].epAddr[n].port = ntohs(pRsp->vgroupInfo[i].epAddr[n].port); } if (0 != taosHashPut(pOut->dbVgroup.vgInfo, &pRsp->vgroupInfo[i].vgId, sizeof(pRsp->vgroupInfo[i].vgId), &pRsp->vgroupInfo[i], sizeof(pRsp->vgroupInfo[i]))) { @@ -142,13 +142,13 @@ _return: } static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) { - pMetaMsg->numOfTags = htonl(pMetaMsg->numOfTags); - pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns); - pMetaMsg->sversion = htonl(pMetaMsg->sversion); - pMetaMsg->tversion = htonl(pMetaMsg->tversion); + pMetaMsg->numOfTags = ntohl(pMetaMsg->numOfTags); + pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns); + pMetaMsg->sversion = ntohl(pMetaMsg->sversion); + pMetaMsg->tversion = ntohl(pMetaMsg->tversion); pMetaMsg->tuid = htobe64(pMetaMsg->tuid); pMetaMsg->suid = htobe64(pMetaMsg->suid); - pMetaMsg->vgId = htonl(pMetaMsg->vgId); + pMetaMsg->vgId = ntohl(pMetaMsg->vgId); if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) { qError("invalid numOfTags[%d] in table meta rsp msg", pMetaMsg->numOfTags); @@ -179,8 +179,8 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) { int32_t numOfTotalCols = pMetaMsg->numOfColumns + pMetaMsg->numOfTags; for (int i = 0; i < numOfTotalCols; ++i) { - pSchema->bytes = htonl(pSchema->bytes); - pSchema->colId = htonl(pSchema->colId); + pSchema->bytes = ntohl(pSchema->bytes); + pSchema->colId = ntohl(pSchema->colId); pSchema++; } @@ -202,7 +202,8 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl qError("calloc size[%d] failed", metaSize); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - + + pTableMeta->vgId = isSuperTable ? 0 : msg->vgId; pTableMeta->tableType = isSuperTable ? TSDB_SUPER_TABLE : msg->tableType; pTableMeta->uid = msg->suid; pTableMeta->suid = msg->suid; @@ -213,12 +214,12 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl pTableMeta->tableInfo.precision = msg->precision; pTableMeta->tableInfo.numOfColumns = msg->numOfColumns; + memcpy(pTableMeta->schema, msg->pSchema, sizeof(SSchema) * total); + for(int32_t i = 0; i < msg->numOfColumns; ++i) { pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; } - memcpy(pTableMeta->schema, msg->pSchema, sizeof(SSchema) * total); - *pMeta = pTableMeta; return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/CMakeLists.txt b/source/libs/qworker/CMakeLists.txt index 4eafa50bdc200615391cfc2f364fc9c9c20430bd..001756d7c3f74656a8e5d0a41267a3b9d5071f0b 100644 --- a/source/libs/qworker/CMakeLists.txt +++ b/source/libs/qworker/CMakeLists.txt @@ -10,3 +10,5 @@ target_link_libraries( qworker PRIVATE os util transport planner qcom ) + +ADD_SUBDIRECTORY(test) \ No newline at end of file diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 994b46c5c4e0b5c619e79863d87adc9d6380dc1a..149f46273c575e30f8b5190b3cf004ebae5a810a 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -943,6 +943,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qError("invalid query msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + + msg->schedulerId = htobe64(msg->schedulerId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + msg->contentLen = ntohl(msg->contentLen); bool queryDone = false; bool queryRsp = false; diff --git a/source/libs/qworker/test/CMakeLists.txt b/source/libs/qworker/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..6d755ad487ca64959ad92b2b4964f821785de922 --- /dev/null +++ b/source/libs/qworker/test/CMakeLists.txt @@ -0,0 +1,18 @@ + +MESSAGE(STATUS "build qworker unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST}) +TARGET_LINK_LIBRARIES( + qworkerTest + PUBLIC os util common transport gtest qcom planner qworker +) + +TARGET_INCLUDE_DIRECTORIES( + qworkerTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker/" + PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/qworker/inc" +) diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4b54b77544e26f6eb0bf22ac7eadf29569df86e5 --- /dev/null +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#pragma GCC diagnostic ignored "-Wwrite-strings" + +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#include "os.h" + +#include "taos.h" +#include "tdef.h" +#include "tvariant.h" +#include "tep.h" +#include "trpc.h" +#include "planner.h" +#include "qworker.h" +#include "stub.h" +#include "addr_any.h" + + +namespace { + +int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { + return 0; +} + + +void stubSetStringToPlan() { + static Stub stub; + stub.set(qStringToSubplan, qwtStringToPlan); + { + AddrAny any("libplanner.so"); + std::map result; + any.get_global_func_addr_dynsym("^qStringToSubplan$", result); + for (const auto& f : result) { + stub.set(f.second, qwtStringToPlan); + } + } +} + + +} + + +TEST(testCase, normalCase) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + SRpcMsg queryRpc = {0}; + SRpcMsg readyRpc = {0}; + SRpcMsg fetchRpc = {0}; + SRpcMsg dropRpc = {0}; + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); + queryMsg->queryId = htobe64(1); + queryMsg->schedulerId = htobe64(1); + queryMsg->taskId = htobe64(1); + queryMsg->contentLen = htonl(100); + queryRpc.pCont = queryMsg; + + SResReadyMsg readyMsg = {0}; + readyMsg.schedulerId = htobe64(1); + readyMsg.queryId = htobe64(1); + readyMsg.taskId = htobe64(1); + readyRpc.pCont = &readyMsg; + + SResFetchMsg fetchMsg = {0}; + fetchMsg.schedulerId = htobe64(1); + fetchMsg.queryId = htobe64(1); + fetchMsg.taskId = htobe64(1); + fetchRpc.pCont = &fetchMsg; + + STaskDropMsg dropMsg = {0}; + dropMsg.schedulerId = htobe64(1); + dropMsg.queryId = htobe64(1); + dropMsg.taskId = htobe64(1); + dropRpc.pCont = &dropMsg; + + stubSetStringToPlan(); + + code = qWorkerInit(NULL, &mgmt); + ASSERT_EQ(code, 0); + + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + ASSERT_EQ(code, 0); + +} + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + + + diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index bc7bc44350245814cfb960ceee38720822cbdef3..2381a1dd49e5ca5ef61e3f5f0cdf4a91909d6bca 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -43,7 +43,7 @@ typedef struct SSchedulerMgmt { SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; -typedef struct SQueryLevel { +typedef struct SSchLevel { int32_t level; int8_t status; SRWLatch lock; @@ -51,12 +51,12 @@ typedef struct SQueryLevel { int32_t taskSucceed; int32_t taskNum; SArray *subTasks; // Element is SQueryTask -} SQueryLevel; +} SSchLevel; -typedef struct SQueryTask { +typedef struct SSchTask { uint64_t taskId; // task id - SQueryLevel *level; // level + SSchLevel *level; // level SSubplan *plan; // subplan char *msg; // operator tree int32_t msgLen; // msg length @@ -66,13 +66,20 @@ typedef struct SQueryTask { int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* -} SQueryTask; +} SSchTask; -typedef struct SQueryJob { +typedef struct SSchJobAttr { + bool needFetch; + bool syncSchedule; + bool queryJob; +} SSchJobAttr; + +typedef struct SSchJob { uint64_t queryId; int32_t levelNum; int32_t levelIdx; int8_t status; + SSchJobAttr attr; SQueryProfileSummary summary; SEpSet dataSrcEps; SEpAddr resEp; @@ -81,15 +88,19 @@ typedef struct SQueryJob { tsem_t rspSem; int32_t userFetch; int32_t remoteFetch; - void *res; + SSchTask *fetchTask; + int32_t errCode; + void *res; + int32_t resNumOfRows; + SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* SArray *levels; // Element is SQueryLevel, starting from 0. SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. -} SQueryJob; +} SSchJob; #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE @@ -108,7 +119,7 @@ typedef struct SQueryJob { #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) -extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task); +extern int32_t schLaunchTask(SSchJob *job, SSchTask *task); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 876a4ff4ae8ac3b59e46da86359df1a206299667..503383f4b14eca7c23a5efa8bae086ba117758c9 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -51,12 +51,12 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ */ } -int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { +int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { for (int32_t i = 0; i < job->levelNum; ++i) { - SQueryLevel *level = taosArrayGet(job->levels, i); + SSchLevel *level = taosArrayGet(job->levels, i); for (int32_t m = 0; m < level->taskNum; ++m) { - SQueryTask *task = taosArrayGet(level->subTasks, m); + SSchTask *task = taosArrayGet(level->subTasks, m); SSubplan *plan = task->plan; int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0; int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0; @@ -70,14 +70,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } for (int32_t n = 0; n < childNum; ++n) { - SSubplan *child = taosArrayGet(plan->pChildern, n); - SQueryTask *childTask = taosHashGet(planToTask, &child, POINTER_BYTES); - if (childTask) { + SSubplan **child = taosArrayGet(plan->pChildern, n); + SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES); + if (NULL == childTask || NULL == *childTask) { qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->children, &childTask)) { + if (NULL == taosArrayPush(task->children, childTask)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -92,14 +92,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } for (int32_t n = 0; n < parentNum; ++n) { - SSubplan *parent = taosArrayGet(plan->pParents, n); - SQueryTask *parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); - if (parentTask) { + SSubplan **parent = taosArrayGet(plan->pParents, n); + SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES); + if (NULL == parentTask || NULL == *parentTask) { qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->parents, &parentTask)) { + if (NULL == taosArrayPush(task->parents, parentTask)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -107,13 +107,13 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } } - SQueryLevel *level = taosArrayGet(job->levels, 0); - if (level->taskNum > 1) { + SSchLevel *level = taosArrayGet(job->levels, 0); + if (job->attr.queryJob && level->taskNum > 1) { qError("invalid plan info, level 0, taskNum:%d", level->taskNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SQueryTask *task = taosArrayGet(level->subTasks, 0); + SSchTask *task = taosArrayGet(level->subTasks, 0); if (task->parents && taosArrayGetSize(task->parents) > 0) { qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents)); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); @@ -124,7 +124,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } -int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { +int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { int32_t code = 0; job->queryId = dag->queryId; @@ -146,21 +146,23 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); + job->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); if (NULL == job->levels) { qError("taosArrayInit %d failed", levelNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + job->attr.needFetch = true; + job->levelNum = levelNum; job->levelIdx = levelNum - 1; job->subPlans = dag->pSubplans; - SQueryLevel level = {0}; + SSchLevel level = {0}; SArray *levelPlans = NULL; int32_t levelPlanNum = 0; - SQueryLevel *pLevel = NULL; + SSchLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_NOT_START; @@ -187,7 +189,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { pLevel->taskNum = levelPlanNum; - pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); + pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask)); if (NULL == pLevel->subTasks) { qError("taosArrayInit %d failed", levelPlanNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -195,7 +197,14 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { for (int32_t n = 0; n < levelPlanNum; ++n) { SSubplan *plan = taosArrayGet(levelPlans, n); - SQueryTask task = {0}; + SSchTask task = {0}; + + if (plan->type == QUERY_TYPE_MODIFY) { + job->attr.needFetch = false; + } else { + job->attr.queryJob = true; + } + task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.plan = plan; @@ -236,7 +245,7 @@ _return: SCH_RET(code); } -int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { +int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) { if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { return TSDB_CODE_SUCCESS; } @@ -263,7 +272,7 @@ int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { } -int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { +int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { qError("taosHashPut failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -272,7 +281,7 @@ int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { +int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); return TSDB_CODE_SUCCESS; @@ -288,10 +297,9 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { +int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { - qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); - return TSDB_CODE_SUCCESS; + qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId); } if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { @@ -305,7 +313,7 @@ int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { } -int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { +int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { int32_t msgSize = 0; void *msg = NULL; @@ -357,6 +365,9 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { break; } case TDMT_VND_FETCH: { + if (NULL == task) { + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } msgSize = sizeof(SResFetchMsg); msg = calloc(1, msgSize); if (NULL == msg) { @@ -395,7 +406,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { return TSDB_CODE_SUCCESS; } -int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { +int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -405,7 +416,7 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod } -int32_t schFetchFromRemote(SQueryJob *job) { +int32_t schFetchFromRemote(SSchJob *job) { int32_t code = 0; if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { @@ -413,7 +424,7 @@ int32_t schFetchFromRemote(SQueryJob *job) { return TSDB_CODE_SUCCESS; } - SCH_ERR_JRET(schAsyncSendMsg(job, NULL, TDMT_VND_FETCH)); + SCH_ERR_JRET(schAsyncSendMsg(job, job->fetchTask, TDMT_VND_FETCH)); return TSDB_CODE_SUCCESS; @@ -424,8 +435,12 @@ _return: } -int32_t schProcessOnJobSuccess(SQueryJob *job) { - job->status = JOB_TASK_STATUS_SUCCEED; +int32_t schProcessOnJobPartialSuccess(SSchJob *job) { + job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; + + if ((!job->attr.needFetch) && job->attr.syncSchedule) { + tsem_post(&job->rspSem); + } if (job->userFetch) { SCH_ERR_RET(schFetchFromRemote(job)); @@ -434,26 +449,27 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnJobFailure(SQueryJob *job) { +int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { job->status = JOB_TASK_STATUS_FAILED; + job->errCode = errCode; atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); - if (job->userFetch) { + if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) { tsem_post(&job->rspSem); } return TSDB_CODE_SUCCESS; } -int32_t schProcessOnDataFetched(SQueryJob *job) { +int32_t schProcessOnDataFetched(SSchJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); tsem_post(&job->rspSem); } -int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { +int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { bool moved = false; SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); @@ -464,7 +480,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { task->status = JOB_TASK_STATUS_SUCCEED; - int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); + int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0; if (parentNum == 0) { if (task->plan->level != 0) { qError("level error"); @@ -475,7 +491,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { if (SCH_TASK_NEED_WAIT_ALL(task)) { SCH_LOCK(SCH_WRITE, &task->level->lock); - task->level->taskFailed++; + task->level->taskSucceed++; taskDone = task->level->taskSucceed + task->level->taskFailed; SCH_UNLOCK(SCH_WRITE, &task->level->lock); @@ -486,7 +502,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { if (task->level->taskFailed > 0) { job->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(job)); + SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR)); return TSDB_CODE_SUCCESS; } @@ -495,7 +511,9 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { job->resEp.port = task->execAddr.port; } - SCH_ERR_RET(schProcessOnJobSuccess(job)); + job->fetchTask = task; + + SCH_ERR_RET(schProcessOnJobPartialSuccess(job)); return TSDB_CODE_SUCCESS; } @@ -508,21 +526,21 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { } for (int32_t i = 0; i < parentNum; ++i) { - SQueryTask *par = taosArrayGet(task->parents, i); + SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i); ++par->childReady; SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr)); if (SCH_TASK_READY_TO_LUNCH(par)) { - SCH_ERR_RET(schLaunchTask(job, task)); + SCH_ERR_RET(schLaunchTask(job, par)); } } return TSDB_CODE_SUCCESS; } -int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) { +int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { bool needRetry = false; bool moved = false; int32_t taskDone = 0; @@ -534,7 +552,6 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); if (!moved) { SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); - return TSDB_CODE_SUCCESS; } if (SCH_TASK_NEED_WAIT_ALL(task)) { @@ -550,7 +567,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod } job->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(job)); + SCH_ERR_RET(schProcessOnJobFailure(job, errCode)); return TSDB_CODE_SUCCESS; } @@ -560,34 +577,60 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod return TSDB_CODE_SUCCESS; } -int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { +int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; switch (msgType) { - case TDMT_VND_QUERY: - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schAsyncSendMsg(job, task, TDMT_VND_RES_READY); - if (code) { - goto _task_error; + case TDMT_VND_SUBMIT: { + SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; + if (rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); + } else { + job->resNumOfRows += rsp->affectedRows; + + code = schProcessOnTaskSuccess(job, task); + if (code) { + goto _task_error; + } } + break; } - break; - case TDMT_VND_RES_READY: - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schProcessOnTaskSuccess(job, task); - if (code) { - goto _task_error; - } + case TDMT_VND_QUERY: { + SQueryTableRsp *rsp = (SQueryTableRsp *)msg; + + if (rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); + } else { + code = schAsyncSendMsg(job, task, TDMT_VND_RES_READY); + if (code) { + goto _task_error; + } + } + break; + } + case TDMT_VND_RES_READY: { + SResReadyRsp *rsp = (SResReadyRsp *)msg; + + if (rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); + } else { + code = schProcessOnTaskSuccess(job, task); + if (code) { + goto _task_error; + } + } + break; + } + case TDMT_VND_FETCH: { + SCH_ERR_JRET(rspCode); + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; + + job->res = rsp; + job->resNumOfRows = rsp->numOfRows; + + SCH_ERR_JRET(schProcessOnDataFetched(job)); + break; } - break; - case TDMT_VND_FETCH: - SCH_ERR_JRET(rspCode); - SCH_ERR_JRET(schProcessOnDataFetched(job)); - break; default: qError("unknown msg type:%d received", msgType); return TSDB_CODE_QRY_INVALID_INPUT; @@ -600,14 +643,14 @@ _task_error: return TSDB_CODE_SUCCESS; _return: - code = schProcessOnJobFailure(job); + code = schProcessOnJobFailure(job, code); return code; } -int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { +int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); if (plan->execEpSet.numOfEps <= 0) { @@ -630,10 +673,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schLaunchJob(SQueryJob *job) { - SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx); +int32_t schLaunchJob(SSchJob *job) { + SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); for (int32_t i = 0; i < level->taskNum; ++i) { - SQueryTask *task = taosArrayGet(level->subTasks, i); + SSchTask *task = taosArrayGet(level->subTasks, i); SCH_ERR_RET(schLaunchTask(job, task)); } @@ -642,10 +685,10 @@ int32_t schLaunchJob(SQueryJob *job) { return TSDB_CODE_SUCCESS; } -void schDropJobAllTasks(SQueryJob *job) { +void schDropJobAllTasks(SSchJob *job) { void *pIter = taosHashIterate(job->succTasks, NULL); while (pIter) { - SQueryTask *task = *(SQueryTask **)pIter; + SSchTask *task = *(SSchTask **)pIter; schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK); @@ -654,7 +697,7 @@ void schDropJobAllTasks(SQueryJob *job) { pIter = taosHashIterate(job->failTasks, NULL); while (pIter) { - SQueryTask *task = *(SQueryTask **)pIter; + SSchTask *task = *(SSchTask **)pIter; schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK); @@ -680,7 +723,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { +int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -690,11 +733,12 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi } int32_t code = 0; - SQueryJob *job = calloc(1, sizeof(SQueryJob)); + SSchJob *job = calloc(1, sizeof(SSchJob)); if (NULL == job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + job->attr.syncSchedule = syncSchedule; job->transport = transport; job->qnodeList = qnodeList; @@ -719,54 +763,104 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi } tsem_init(&job->rspSem, 0, 0); - - if (0 != taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { - qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + + code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES); + if (0 != code) { + if (HASH_NODE_EXIST(code)) { + qError("taosHashPut queryId:%"PRIx64" already exist", job->queryId); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } else { + qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } } job->status = JOB_TASK_STATUS_NOT_START; SCH_ERR_JRET(schLaunchJob(job)); - *(SQueryJob **)pJob = job; + *(SSchJob **)pJob = job; + + if (syncSchedule) { + tsem_wait(&job->rspSem); + } return TSDB_CODE_SUCCESS; _return: - *(SQueryJob **)pJob = NULL; + *(SSchJob **)pJob = NULL; scheduleFreeJob(job); SCH_RET(code); } +int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) { + *numOfRows = 0; + + SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true)); + + SSchJob *job = *(SSchJob **)pJob; + + *numOfRows = job->resNumOfRows; + + return TSDB_CODE_SUCCESS; +} + +int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { + return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false); +} + + int32_t scheduleFetchRows(void *pJob, void **data) { if (NULL == pJob || NULL == data) { - return TSDB_CODE_QRY_INVALID_INPUT; + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SQueryJob *job = pJob; + SSchJob *job = pJob; int32_t code = 0; + if (!job->attr.needFetch) { + qError("no need to fetch data"); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (job->status == JOB_TASK_STATUS_FAILED) { + job->res = NULL; + SCH_RET(job->errCode); + } + + if (job->status == JOB_TASK_STATUS_SUCCEED) { + job->res = NULL; + return TSDB_CODE_SUCCESS; + } + if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { qError("prior fetching not finished"); - return TSDB_CODE_QRY_APP_ERROR; + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - if (job->status == JOB_TASK_STATUS_SUCCEED) { + if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(job)); } tsem_wait(&job->rspSem); + if (job->status == JOB_TASK_STATUS_FAILED) { + code = job->errCode; + } + + if (job->res && ((SRetrieveTableRsp *)job->res)->completed) { + job->status = JOB_TASK_STATUS_SUCCEED; + } + *data = job->res; job->res = NULL; _return: atomic_val_compare_exchange_32(&job->userFetch, 1, 0); - return code; + SCH_RET(code); } int32_t scheduleCancelJob(void *pJob) { @@ -782,7 +876,7 @@ void scheduleFreeJob(void *pJob) { return; } - SQueryJob *job = pJob; + SSchJob *job = pJob; if (job->status > 0) { if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 9e94553058e923086808cdb462d27ffa95e9a96e..4732429d0bb62e30bf4ff02eb19c14ec66183b23 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -26,72 +26,310 @@ #include "taos.h" #include "tdef.h" #include "tvariant.h" -#include "catalog.h" -#include "scheduler.h" +#include "catalog.h" +#include "scheduler.h" #include "tep.h" #include "trpc.h" - +#include "schedulerInt.h" +#include "stub.h" +#include "addr_any.h" + namespace { -void mockBuildDag(SQueryDag *dag) { - uint64_t qId = 0x111111111111; - - dag->queryId = qId; - dag->numOfSubplans = 2; - dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); - SArray *scan = taosArrayInit(1, sizeof(SSubplan)); - SArray *merge = taosArrayInit(1, sizeof(SSubplan)); - - SSubplan scanPlan = {0}; - SSubplan mergePlan = {0}; - - scanPlan.id.queryId = qId; - scanPlan.id.templateId = 0x2222222222; - scanPlan.id.subplanId = 0x3333333333; - scanPlan.type = QUERY_TYPE_SCAN; - scanPlan.level = 1; - scanPlan.execEpSet.numOfEps = 1; - scanPlan.pChildern = NULL; - scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); - - mergePlan.id.queryId = qId; - mergePlan.id.templateId = 0x4444444444; - mergePlan.id.subplanId = 0x5555555555; - mergePlan.type = QUERY_TYPE_MERGE; - mergePlan.level = 0; - mergePlan.execEpSet.numOfEps = 1; - mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); - mergePlan.pParents = NULL; - - SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); - SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); - - taosArrayPush(mergePointer->pChildern, &scanPointer); - taosArrayPush(scanPointer->pParents, &mergePointer); - - taosArrayPush(dag->pSubplans, &merge); - taosArrayPush(dag->pSubplans, &scan); -} - + +extern "C" int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); + +void schtBuildQueryDag(SQueryDag *dag) { + uint64_t qId = 0x0000000000000001; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); + SArray *scan = taosArrayInit(1, sizeof(SSubplan)); + SArray *merge = taosArrayInit(1, sizeof(SSubplan)); + + SSubplan scanPlan = {0}; + SSubplan mergePlan = {0}; + + scanPlan.id.queryId = qId; + scanPlan.id.templateId = 0x0000000000000002; + scanPlan.id.subplanId = 0x0000000000000003; + scanPlan.type = QUERY_TYPE_SCAN; + scanPlan.level = 1; + scanPlan.execEpSet.numOfEps = 1; + scanPlan.execEpSet.port[0] = 6030; + strcpy(scanPlan.execEpSet.fqdn[0], "ep0"); + scanPlan.pChildern = NULL; + scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); + scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); + + mergePlan.id.queryId = qId; + mergePlan.id.templateId = 0x4444444444; + mergePlan.id.subplanId = 0x5555555555; + mergePlan.type = QUERY_TYPE_MERGE; + mergePlan.level = 0; + mergePlan.execEpSet.numOfEps = 0; + mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); + mergePlan.pParents = NULL; + mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); + + SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); + SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); + + taosArrayPush(mergePointer->pChildern, &scanPointer); + taosArrayPush(scanPointer->pParents, &mergePointer); + + taosArrayPush(dag->pSubplans, &merge); + taosArrayPush(dag->pSubplans, &scan); +} + +void schtBuildInsertDag(SQueryDag *dag) { + uint64_t qId = 0x0000000000000002; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = taosArrayInit(1, POINTER_BYTES); + SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan)); + + SSubplan insertPlan[2] = {0}; + + insertPlan[0].id.queryId = qId; + insertPlan[0].id.templateId = 0x0000000000000003; + insertPlan[0].id.subplanId = 0x0000000000000004; + insertPlan[0].type = QUERY_TYPE_MODIFY; + insertPlan[0].level = 0; + insertPlan[0].execEpSet.numOfEps = 1; + insertPlan[0].execEpSet.port[0] = 6030; + strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0"); + insertPlan[0].pChildern = NULL; + insertPlan[0].pParents = NULL; + insertPlan[0].pNode = NULL; + insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); + + insertPlan[1].id.queryId = qId; + insertPlan[1].id.templateId = 0x0000000000000003; + insertPlan[1].id.subplanId = 0x0000000000000005; + insertPlan[1].type = QUERY_TYPE_MODIFY; + insertPlan[1].level = 0; + insertPlan[1].execEpSet.numOfEps = 1; + insertPlan[1].execEpSet.port[0] = 6030; + strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1"); + insertPlan[1].pChildern = NULL; + insertPlan[1].pParents = NULL; + insertPlan[1].pNode = NULL; + insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); + + + taosArrayPush(inserta, &insertPlan[0]); + taosArrayPush(inserta, &insertPlan[1]); + + taosArrayPush(dag->pSubplans, &inserta); +} + + +int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { + *str = (char *)calloc(1, 20); + *len = 20; + return 0; +} + +int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { + return 0; +} + + +void schtSetPlanToString() { + static Stub stub; + stub.set(qSubPlanToString, schtPlanToString); + { + AddrAny any("libplanner.so"); + std::map result; + any.get_global_func_addr_dynsym("^qSubPlanToString$", result); + for (const auto& f : result) { + stub.set(f.second, schtPlanToString); + } + } +} + +void schtSetExecNode() { + static Stub stub; + stub.set(qSetSubplanExecutionNode, schtExecNode); + { + AddrAny any("libplanner.so"); + std::map result; + any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result); + for (const auto& f : result) { + stub.set(f.second, schtExecNode); + } + } } -TEST(testCase, normalCase) { - void *mockPointer = (void *)0x1; +void *schtSendRsp(void *param) { + SSchJob *job = NULL; + int32_t code = 0; + + while (true) { + job = *(SSchJob **)param; + if (job) { + break; + } + + usleep(1000); + } + + void *pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SShellSubmitRspMsg rsp = {0}; + rsp.affectedRows = 10; + schHandleRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); + + pIter = taosHashIterate(job->execTasks, pIter); + } + + return NULL; +} + +void *pInsertJob = NULL; + + +} + +TEST(queryTest, normalCase) { + void *mockPointer = (void *)0x1; char *clusterId = "cluster1"; char *dbname = "1.db1"; char *tablename = "table1"; - SVgroupInfo vgInfo = {0}; - void *pJob = NULL; - SQueryDag dag = {0}; - SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); - - int32_t code = schedulerInit(NULL); + SVgroupInfo vgInfo = {0}; + void *pJob = NULL; + SQueryDag dag = {0}; + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + SEpAddr qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); ASSERT_EQ(code, 0); - - mockBuildDag(&dag); - - code = scheduleExecJob(mockPointer, qnodeList, &dag, &pJob); + + schtBuildQueryDag(&dag); + + schtSetPlanToString(); + schtSetExecNode(); + + code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); ASSERT_EQ(code, 0); -} + + SSchJob *job = (SSchJob *)pJob; + void *pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(job->execTasks, pIter); + } + + pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(job->execTasks, pIter); + } + + pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(job->execTasks, pIter); + } + + pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + ASSERT_EQ(code, 0); + + pIter = taosHashIterate(job->execTasks, pIter); + } + + SRetrieveTableRsp rsp = {0}; + rsp.completed = 1; + rsp.numOfRows = 10; + code = schHandleRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + + + void *data = NULL; + + code = scheduleFetchRows(job, &data); + ASSERT_EQ(code, 0); + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + ASSERT_EQ(pRsp->completed, 1); + ASSERT_EQ(pRsp->numOfRows, 10); + + data = NULL; + code = scheduleFetchRows(job, &data); + ASSERT_EQ(code, 0); + ASSERT_EQ(data, (void*)NULL); + + scheduleFreeJob(pJob); +} + + + + +TEST(insertTest, normalCase) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + SQueryDag dag = {0}; + uint64_t numOfRows = 0; + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + SEpAddr qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); + ASSERT_EQ(code, 0); + + schtBuildInsertDag(&dag); + + schtSetPlanToString(); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1; + pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob); + + code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &numOfRows); + ASSERT_EQ(code, 0); + ASSERT_EQ(numOfRows, 20); + + scheduleFreeJob(pInsertJob); +} + + int main(int argc, char** argv) { @@ -101,4 +339,4 @@ int main(int argc, char** argv) { - +