diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d677cacc059fffcccdaec7eaf0eea2f04614d7c5..b7fe073a54fac6c90b9c634d12fdd1ca7a69ddcd 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -486,6 +486,8 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void *param, TAOS **taos); TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res); TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param); +// get taos connection unused session number +int32_t taos_unused_session(TAOS* taos); void waitForQueryRsp(void *param, TAOS_RES *tres, int code); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 18145f5cf83fb248dfe92d2c6ae3a79e0f3d22a4..eb02d00988a8d7ef4eac56a83a9c3212a2c1912f 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -28,6 +28,7 @@ #include "tutil.h" #include "ttimer.h" #include "tscProfile.h" +#include "tidpool.h" static bool validImpl(const char* str, size_t maxsize) { if (str == NULL) { @@ -306,6 +307,25 @@ void taos_close(TAOS *taos) { taosRemoveRef(tscRefId, pObj->rid); } +// get taos connection unused session number +int32_t taos_unused_session(TAOS* taos) { + // param valid check + STscObj *pObj = (STscObj *)taos; + if (pObj == NULL || pObj->signature != pObj) { + tscError("pObj:%p is NULL or freed", pObj); + terrno = TSDB_CODE_TSC_DISCONNECTED; + return 0; + } + if(pObj->pRpcObj == NULL ) { + tscError("pObj:%p pRpcObj is NULL.", pObj); + terrno = TSDB_CODE_TSC_DISCONNECTED; + return 0; + } + + // get number + return rpcUnusedSession(pObj->pRpcObj->pDnodeConn, false); +} + void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { assert(tres != NULL); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 421af24ceef21767065ee52bf8334666e430ea09..88c72a91a6ab7a3b6b0a741dcfb2215662c63326 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -266,9 +266,15 @@ static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) { } // callback send values +int32_t ok_cnt = 0; +int32_t err_cnt = 0; void cbSendValues(void *param, TAOS_RES *res, int code) { if(code < 0) { - tscError("CQ=%p insert another table error 0x%x", param, code); + err_cnt ++; + tscError("CQ Send Failed. code=0x%x ok_cnt=%d err_cnt=%d", code, ok_cnt,err_cnt); + } else { + ok_cnt ++; + tscInfo("CQ Send OK. row=%d ok_cnt=%d err_cnt=%d", code, ok_cnt, err_cnt); } } @@ -303,36 +309,62 @@ size_t appendValues(TAOS_FIELD* fields, int32_t numCols, TAOS_ROW row, char* pBu return rowLen; } +bool sqlBufSend(TAOS *taos, char *sqlBuf) { + // if no enough free session, wait max 10s + int32_t sleepCnt = 0; + do { + int32_t session = taos_unused_session(taos); + if(session > 1000) { + break; + } + taosMsleep(500); + tscInfo("CQ session < 1000. session=%d Wait 0.5s cnt=%d", session, sleepCnt); + } while(++sleepCnt < 20); + + strcat(sqlBuf, ";"); + taos_query_ra(taos, sqlBuf, cbSendValues, NULL); + return true; +} + +#define STR_SQL_INSERT "insert into " // send one table all rows for once -bool sendChildTalbe(STscObj *pTscObj, char *superName, char *tableName, TAOS_FIELD *fields, int32_t numCols, SArray *arr) { - int32_t bufLen = TSDB_MAX_SQL_LEN; +bool sendChildTalbe(TAOS *taos, char *superName, char *tableName, TAOS_FIELD *fields, int32_t numCols, + SArray *arr, char* sqlBuf, int32_t bufLen) { + + char dbName[TSDB_DB_NAME_LEN] = ""; + char dbTable[TSDB_TABLE_FNAME_LEN]; size_t numRows = taosArrayGetSize(arr); if(numRows == 0) return false; - if(numRows < 50) - bufLen /= 20; - else if(numRows < 500) - bufLen /= 5; - else - bufLen /= 2; - - char dbName[TSDB_DB_NAME_LEN] = ""; - char fullTable[TSDB_TABLE_FNAME_LEN]; // obtain dbname char * p = strstr(superName, "."); if(p) { // if have db prefix , under this db create table int32_t len = p - superName; strncpy(dbName, superName, len); dbName[len] = 0; // append str end - sprintf(fullTable, "%s.%s", dbName, tableName); + sprintf(dbTable, "%s.%s", dbName, tableName); } else { // no db prefix - strcpy(fullTable, tableName); + strcpy(dbTable, tableName); } - char *pBuf = (char *)tmalloc(bufLen); - sprintf(pBuf, "insert into %s using %s tags(0) values ", fullTable, superName); - size_t curLen = strlen(pBuf); + // first enter + if(sqlBuf[0] == 0) { + strcpy(sqlBuf, STR_SQL_INSERT); + } else { // check need send + if( bufLen - strlen(sqlBuf) < 300) { + sqlBufSend(taos, sqlBuf); + strcpy(sqlBuf, STR_SQL_INSERT); + } + } + + // init + int32_t preLen = strlen(sqlBuf); + char *subBuf = sqlBuf + preLen; + int32_t subLen = bufLen - preLen; + sprintf(subBuf, " %s using %s tags(0) values ", dbTable, superName); + size_t curLen = strlen(subBuf); + TAOS_ROW row ; bool full = false; @@ -340,30 +372,42 @@ bool sendChildTalbe(STscObj *pTscObj, char *superName, char *tableName, TAOS_FIE row = (TAOS_ROW)taosArrayGetP(arr, i); if(row == NULL) continue; - curLen += appendValues(fields, numCols, row, pBuf, bufLen - 100, curLen, &full); - if(full || i == numRows - 1) { // need send + if(subLen > 200) + curLen += appendValues(fields, numCols, row, subBuf, subLen - 100, curLen, &full); + else + full = true; + if(full) { // need send // send current - strcat(pBuf, ";"); - taos_query_a(pTscObj, pBuf, cbSendValues, NULL); - - // reset for next - if(full) { - sprintf(pBuf, "insert into %s.%s using %s tags(0) values ", dbName, tableName, superName); - curLen = strlen(pBuf); - // retry append. if full is true again, ignore this row - curLen += appendValues(fields, numCols, row, pBuf, bufLen - 100, curLen, &full); - full = false; // reset to false - } + sqlBufSend(taos, sqlBuf); + + // init reset + strcpy(sqlBuf, STR_SQL_INSERT); + preLen = strlen(sqlBuf); + subBuf = sqlBuf + preLen; + subLen = bufLen - preLen; + sprintf(subBuf, " %s using %s tags(0) values ", dbTable, superName); + curLen = strlen(subBuf); + + // retry append. if full is true again, ignore this row + curLen += appendValues(fields, numCols, row, subBuf, subLen - 100, curLen, &full); + full = false; // reset to false } tfree(row); } - tfree(pBuf); return true; } // write cq result to another table -bool toAnotherTable(STscObj *pTscObj, char *superName, TAOS_FIELD *fields, int32_t numCols, SHashObj *tbHash) { +bool toAnotherTable(STscObj *pTscObj, char *superName, TAOS_FIELD *fields, int32_t numCols, SHashObj *tbHash, int32_t numRows) { + + int32_t bufLen = TSDB_MAX_SQL_LEN/2 - 128; + char * sqlBuf = tmalloc(bufLen); + sqlBuf[0] = 0; // init + + ok_cnt = 0; + err_cnt = 0; + int cnt_table = 0; void *pIter = taosHashIterate(tbHash, NULL); while(pIter) { SArray *arr = *(SArray**)pIter; @@ -377,14 +421,22 @@ bool toAnotherTable(STscObj *pTscObj, char *superName, TAOS_FIELD *fields, int32 key[len] = 0; // string end '\0' // send all this table rows - sendChildTalbe(pTscObj, superName, key, fields, numCols, arr); + sendChildTalbe(pTscObj, superName, key, fields, numCols, arr, sqlBuf, bufLen); // release SArray taosArrayDestroy(arr); tfree(key); + cnt_table ++; } pIter = taosHashIterate(tbHash, pIter); } + if(sqlBuf[0]) { + sqlBufSend(pTscObj, sqlBuf); + } + + tscInfo("CQ ===== stream %d rows write to %d tables ===== \n", numRows, cnt_table); + tfree(sqlBuf); + return true; } @@ -483,7 +535,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf // write Another if(toAnother) { - toAnotherTable(pSql->pTscObj, pStream->to, fields, dstColsNum, tbHash); + toAnotherTable(pSql->pTscObj, pStream->to, fields, dstColsNum, tbHash, numOfRows); taosHashCleanup(tbHash); } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 0ce2e3da14d1cec204fc755db13da53f08295bff..5e3fbd571ef4b6425f2e5a58c308c8fc9da0b12e 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -92,6 +92,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); +int32_t rpcUnusedSession(void * rpcInfo, bool bLock); #ifdef __cplusplus } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 2c5bf953a4be1b83cdcfa0d366e49d3b17124dc4..6fe96f5b07c9ec26ef4f51a9cabc8a438cd627a2 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1659,3 +1659,9 @@ static void rpcDecRef(SRpcInfo *pRpc) } } +int32_t rpcUnusedSession(void * rpcInfo, bool bLock) { + SRpcInfo *info = (SRpcInfo *)rpcInfo; + if(info == NULL) + return 0; + return taosIdPoolNumOfFree(info->idPool, bLock); +} \ No newline at end of file diff --git a/src/util/inc/tidpool.h b/src/util/inc/tidpool.h index e4439439ced6522e26c8db4a560c50f5b0cb8a16..fb56acb298fbc8a899f3119f6054a7aec43f1d46 100644 --- a/src/util/inc/tidpool.h +++ b/src/util/inc/tidpool.h @@ -36,6 +36,8 @@ int taosIdPoolNumOfUsed(void *handle); bool taosIdPoolMarkStatus(void *handle, int id); +// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly +int taosIdPoolNumOfFree(void *handle, bool bLock); #ifdef __cplusplus } #endif diff --git a/src/util/src/tidpool.c b/src/util/src/tidpool.c index 61cecf54c0e14a4435ba72d317715cdec902068b..00e4dca87236a21e3ffeac69e6bb2753b4addc4b 100644 --- a/src/util/src/tidpool.c +++ b/src/util/src/tidpool.c @@ -163,4 +163,16 @@ int taosIdPoolMaxSize(void *handle) { pthread_mutex_unlock(&pIdPool->mutex); return ret; -} \ No newline at end of file +} + +// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly +int taosIdPoolNumOfFree(void *handle, bool bLock) { + id_pool_t *pIdPool = handle; + if(bLock) + pthread_mutex_lock(&pIdPool->mutex); + + int ret = pIdPool->numOfFree; + if(bLock) + pthread_mutex_unlock(&pIdPool->mutex); + return ret; +}