/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "catalog.h" #include "clientInt.h" #include "clientLog.h" #include "functionMgt.h" #include "os.h" #include "query.h" #include "qworker.h" #include "scheduler.h" #include "tcache.h" #include "tglobal.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" #include "tsched.h" #include "ttime.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 SAppInfo appInfo; int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; int32_t timestampDeltaLimit = 900; // s static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { // connection has been released already, abort creating request. pRequest->self = taosAddRef(clientReqRefPool, pRequest); int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1); if (pTscObj->pAppInfo) { SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary; int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1); int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1); tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64, pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); } return TSDB_CODE_SUCCESS; } static void deregisterRequest(SRequestObj *pRequest) { const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable assert(pRequest != NULL); STscObj *pTscObj = pRequest->pTscObj; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int64_t duration = taosGetTimestampUs() - pRequest->metric.start; tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%.2f ms, " "current:%d, app current:%d", pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 "us, exec:%" PRId64 "us", duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64, duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.ctgEnd, pRequest->metric.planEnd - pRequest->metric.semanticEnd, pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); } if (duration >= SLOW_QUERY_INTERVAL) { atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); } releaseTscObj(pTscObj->id); } // todo close the transporter properly void closeTransporter(SAppInstInfo *pAppInfo) { if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) { return; } tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo); rpcClose(pAppInfo->pTransporter); } static bool clientRpcRfp(int32_t code, tmsg_t msgType) { if (NEED_REDIRECT_ERROR(code)) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK) { return false; } return true; } else { return false; } } // start timer for particular msgType static bool clientRpcTfp(int32_t code, tmsg_t msgType) { if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) { return true; } return false; } // TODO refactor void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; rpcInit.numOfThreads = numOfThread; rpcInit.cfp = processMsgFromServer; rpcInit.rfp = clientRpcRfp; // rpcInit.tfp = clientRpcTfp; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.user = (char *)user; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.compressSize = tsCompressMsgSize; void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); return NULL; } return pDnodeConn; } void destroyAllRequests(SHashObj *pRequests) { void *pIter = taosHashIterate(pRequests, NULL); while (pIter != NULL) { int64_t *rid = pIter; SRequestObj *pRequest = acquireRequest(*rid); if (pRequest) { destroyRequest(pRequest); releaseRequest(*rid); } pIter = taosHashIterate(pRequests, pIter); } } void stopAllRequests(SHashObj *pRequests) { void *pIter = taosHashIterate(pRequests, NULL); while (pIter != NULL) { int64_t *rid = pIter; SRequestObj *pRequest = acquireRequest(*rid); if (pRequest) { taos_stop_query(pRequest); releaseRequest(*rid); } pIter = taosHashIterate(pRequests, pIter); } } void destroyAppInst(SAppInstInfo *pAppInfo) { tscDebug("destroy app inst mgr %p", pAppInfo); taosThreadMutexLock(&appInfo.mutex); hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr); taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey)); taosThreadMutexUnlock(&appInfo.mutex); taosMemoryFreeClear(pAppInfo->instKey); closeTransporter(pAppInfo); taosThreadMutexLock(&pAppInfo->qnodeMutex); taosArrayDestroy(pAppInfo->pQnodeList); taosThreadMutexUnlock(&pAppInfo->qnodeMutex); taosMemoryFree(pAppInfo); } void destroyTscObj(void *pObj) { if (NULL == pObj) { return; } STscObj *pTscObj = pObj; int64_t tscId = pTscObj->id; tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); destroyAllRequests(pTscObj->pRequests); taosHashCleanup(pTscObj->pRequests); schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj, pTscObj->pAppInfo->numOfConns); int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); if (0 == connNum) { destroyAppInst(pTscObj->pAppInfo); } taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFree(pTscObj); tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj); } void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) { STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj)); if (NULL == pObj) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (NULL == pObj->pRequests) { taosMemoryFree(pObj); terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } pObj->connType = connType; pObj->pAppInfo = pAppInfo; tstrncpy(pObj->user, user, sizeof(pObj->user)); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); if (db != NULL) { tstrncpy(pObj->db, db, tListLen(pObj->db)); } taosThreadMutexInit(&pObj->mutex, NULL); pObj->id = taosAddRef(clientConnRefPool, pObj); pObj->schemalessType = 1; atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1); tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj); return pObj; } STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); } int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); } void *createRequest(uint64_t connId, int32_t type) { SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj)); if (NULL == pRequest) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } STscObj *pTscObj = acquireTscObj(connId); if (pTscObj == NULL) { taosMemoryFree(pRequest); terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } pRequest->resType = RES_TYPE__QUERY; pRequest->requestId = generateRequestId(); pRequest->metric.start = taosGetTimestampUs(); pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default pRequest->type = type; pRequest->allocatorRefId = -1; pRequest->pDb = getDbOfConnection(pTscObj); pRequest->pTscObj = pTscObj; pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE; tsem_init(&pRequest->body.rspSem, 0, 0); if (registerRequest(pRequest, pTscObj)) { doDestroyRequest(pRequest); return NULL; } return pRequest; } void doFreeReqResultInfo(SReqResultInfo *pResInfo) { taosMemoryFreeClear(pResInfo->pRspMsg); taosMemoryFreeClear(pResInfo->length); taosMemoryFreeClear(pResInfo->row); taosMemoryFreeClear(pResInfo->pCol); taosMemoryFreeClear(pResInfo->fields); taosMemoryFreeClear(pResInfo->userFields); taosMemoryFreeClear(pResInfo->convertJson); if (pResInfo->convertBuf != NULL) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { taosMemoryFreeClear(pResInfo->convertBuf[i]); } taosMemoryFreeClear(pResInfo->convertBuf); } } SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); } int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); } int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } void doDestroyRequest(void *p) { if (NULL == p) { return; } SRequestObj *pRequest = (SRequestObj *)p; int64_t reqId = pRequest->self; tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); schedulerFreeJob(&pRequest->body.queryJob, 0); taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->pDb); doFreeReqResultInfo(&pRequest->body.resInfo); taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->targetTableList); qDestroyQuery(pRequest->pQuery); nodesDestroyAllocator(pRequest->allocatorRefId); destroyQueryExecRes(&pRequest->body.resInfo.execRes); if (pRequest->self) { deregisterRequest(pRequest); } if (pRequest->syncQuery) { taosMemoryFree(pRequest->body.param); } taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFree(pRequest); tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); } void destroyRequest(SRequestObj *pRequest) { if (pRequest == NULL) { return; } taos_stop_query(pRequest); removeRequest(pRequest->self); } void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. atexit(taos_cleanup); errno = TSDB_CODE_SUCCESS; taosSeedRand(taosGetTimestampSec()); deltaToUtcInitOnce(); if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) { tscInitRes = -1; return; } if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) { tscInitRes = -1; return; } initQueryModuleMsgHandle(); taosConvInit(); rpcInit(); SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; catalogInit(&cfg); schedulerInit(); tscDebug("starting to initialize TAOS driver"); #ifndef WINDOWS taosSetCoreDump(true); #endif initTaskQueue(); fmFuncMgtInit(); nodesInitAllocatorSet(); clientConnRefPool = taosOpenRef(200, destroyTscObj); clientReqRefPool = taosOpenRef(40960, doDestroyRequest); // transDestroyBuffer(&conn->readBuf); taosGetAppName(appInfo.appName, NULL); taosThreadMutexInit(&appInfo.mutex, NULL); appInfo.pid = taosGetPId(); appInfo.startTime = taosGetTimestampMs(); appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tscDebug("client is initialized successfully"); } int taos_init() { taosThreadOnce(&tscinit, taos_init_imp); return tscInitRes; } int taos_options_imp(TSDB_OPTION option, const char *str) { if (option != TSDB_OPTION_CONFIGDIR) { taos_init(); // initialize global config } else { tstrncpy(configDir, str, PATH_MAX); tscInfo("set cfg:%s to %s", configDir, str); return 0; } SConfig *pCfg = taosGetCfg(); SConfigItem *pItem = NULL; switch (option) { case TSDB_OPTION_CONFIGDIR: pItem = cfgGetItem(pCfg, "configDir"); break; case TSDB_OPTION_SHELL_ACTIVITY_TIMER: pItem = cfgGetItem(pCfg, "shellActivityTimer"); break; case TSDB_OPTION_LOCALE: pItem = cfgGetItem(pCfg, "locale"); break; case TSDB_OPTION_CHARSET: pItem = cfgGetItem(pCfg, "charset"); break; case TSDB_OPTION_TIMEZONE: pItem = cfgGetItem(pCfg, "timezone"); break; default: break; } if (pItem == NULL) { tscError("Invalid option %d", option); return -1; } int code = cfgSetItem(pCfg, pItem->name, str, CFG_STYPE_TAOS_OPTIONS); if (code != 0) { tscError("failed to set cfg:%s to %s since %s", pItem->name, str, terrstr()); } else { tscInfo("set cfg:%s to %s", pItem->name, str); } return code; } /** * The request id is an unsigned integer format of 64bit. *+------------+-----+-----------+---------------+ *| uid|localIp| PId | timestamp | serial number | *+------------+-----+-----------+---------------+ *| 12bit |12bit|24bit |16bit | *+------------+-----+-----------+---------------+ * @return */ uint64_t generateRequestId() { static uint64_t hashId = 0; static uint32_t requestSerialId = 0; if (hashId == 0) { char uid[64] = {0}; int32_t code = taosGetSystemUUID(uid, tListLen(uid)); if (code != TSDB_CODE_SUCCESS) { tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead", tstrerror(TAOS_SYSTEM_ERROR(errno))); } else { hashId = MurmurHash3_32(uid, strlen(uid)); } } uint64_t id = 0; while (true) { int64_t ts = taosGetTimestampMs(); uint64_t pid = taosGetPId(); uint32_t val = atomic_add_fetch_32(&requestSerialId, 1); if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0); id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); if (id) { break; } } return id; } #if 0 #include "cJSON.h" static setConfRet taos_set_config_imp(const char *config){ setConfRet ret = {SET_CONF_RET_SUCC, {0}}; static bool setConfFlag = false; if (setConfFlag) { ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE; strcpy(ret.retMsg, "configuration can only set once"); return ret; } taosInitGlobalCfg(); cJSON *root = cJSON_Parse(config); if (root == NULL){ ret.retCode = SET_CONF_RET_ERR_JSON_PARSE; strcpy(ret.retMsg, "parse json error"); return ret; } int size = cJSON_GetArraySize(root); if(!cJSON_IsObject(root) || size == 0) { ret.retCode = SET_CONF_RET_ERR_JSON_INVALID; strcpy(ret.retMsg, "json content is invalid, must be not empty object"); return ret; } if(size >= 1000) { ret.retCode = SET_CONF_RET_ERR_TOO_LONG; strcpy(ret.retMsg, "json object size is too long"); return ret; } for(int i = 0; i < size; i++){ cJSON *item = cJSON_GetArrayItem(root, i); if(!item) { ret.retCode = SET_CONF_RET_ERR_INNER; strcpy(ret.retMsg, "inner error"); return ret; } if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){ ret.retCode = SET_CONF_RET_ERR_PART; if (strlen(ret.retMsg) == 0){ snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string); }else{ int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg); size_t leftSize = tmp >= 0 ? tmp : 0; strncat(ret.retMsg, "|", leftSize); tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg); leftSize = tmp >= 0 ? tmp : 0; strncat(ret.retMsg, item->string, leftSize); } } } cJSON_Delete(root); setConfFlag = true; return ret; } setConfRet taos_set_config(const char *config){ taosThreadMutexLock(&setConfMutex); setConfRet ret = taos_set_config_imp(config); taosThreadMutexUnlock(&setConfMutex); return ret; } #endif