提交 6af088ad 编写于 作者: A Alex Duan

combine multi-table write to one sql

上级 194e4f0f
......@@ -486,6 +486,8 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void *param, TAOS **taos);
TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, int64_t* res);
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param);
// get taos connection unused session number
int32_t taos_unused_session(TAOS* taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
......
......@@ -28,6 +28,7 @@
#include "tutil.h"
#include "ttimer.h"
#include "tscProfile.h"
#include "tidpool.h"
static bool validImpl(const char* str, size_t maxsize) {
if (str == NULL) {
......@@ -306,6 +307,25 @@ void taos_close(TAOS *taos) {
taosRemoveRef(tscRefId, pObj->rid);
}
// get taos connection unused session number
int32_t taos_unused_session(TAOS* taos) {
// param valid check
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("pObj:%p is NULL or freed", pObj);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return 0;
}
if(pObj->pRpcObj == NULL ) {
tscError("pObj:%p pRpcObj is NULL.", pObj);
terrno = TSDB_CODE_TSC_DISCONNECTED;
return 0;
}
// get number
return rpcUnusedSession(pObj->pRpcObj->pDnodeConn, false);
}
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
assert(tres != NULL);
......
......@@ -266,9 +266,15 @@ static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
}
// callback send values
int32_t ok_cnt = 0;
int32_t err_cnt = 0;
void cbSendValues(void *param, TAOS_RES *res, int code) {
if(code < 0) {
tscError("CQ=%p insert another table error 0x%x", param, code);
err_cnt ++;
tscError("CQ Send Failed. code=0x%x ok_cnt=%d err_cnt=%d", code, ok_cnt,err_cnt);
} else {
ok_cnt ++;
tscInfo("CQ Send OK. row=%d ok_cnt=%d err_cnt=%d", code, ok_cnt, err_cnt);
}
}
......@@ -303,36 +309,62 @@ size_t appendValues(TAOS_FIELD* fields, int32_t numCols, TAOS_ROW row, char* pBu
return rowLen;
}
bool sqlBufSend(TAOS *taos, char *sqlBuf) {
// if no enough free session, wait max 10s
int32_t sleepCnt = 0;
do {
int32_t session = taos_unused_session(taos);
if(session > 1000) {
break;
}
taosMsleep(500);
tscInfo("CQ session < 1000. session=%d Wait 0.5s cnt=%d", session, sleepCnt);
} while(++sleepCnt < 20);
strcat(sqlBuf, ";");
taos_query_ra(taos, sqlBuf, cbSendValues, NULL);
return true;
}
#define STR_SQL_INSERT "insert into "
// send one table all rows for once
bool sendChildTalbe(STscObj *pTscObj, char *superName, char *tableName, TAOS_FIELD *fields, int32_t numCols, SArray *arr) {
int32_t bufLen = TSDB_MAX_SQL_LEN;
bool sendChildTalbe(TAOS *taos, char *superName, char *tableName, TAOS_FIELD *fields, int32_t numCols,
SArray *arr, char* sqlBuf, int32_t bufLen) {
char dbName[TSDB_DB_NAME_LEN] = "";
char dbTable[TSDB_TABLE_FNAME_LEN];
size_t numRows = taosArrayGetSize(arr);
if(numRows == 0)
return false;
if(numRows < 50)
bufLen /= 20;
else if(numRows < 500)
bufLen /= 5;
else
bufLen /= 2;
char dbName[TSDB_DB_NAME_LEN] = "";
char fullTable[TSDB_TABLE_FNAME_LEN];
// obtain dbname
char * p = strstr(superName, ".");
if(p) { // if have db prefix , under this db create table
int32_t len = p - superName;
strncpy(dbName, superName, len);
dbName[len] = 0; // append str end
sprintf(fullTable, "%s.%s", dbName, tableName);
sprintf(dbTable, "%s.%s", dbName, tableName);
} else {
// no db prefix
strcpy(fullTable, tableName);
strcpy(dbTable, tableName);
}
char *pBuf = (char *)tmalloc(bufLen);
sprintf(pBuf, "insert into %s using %s tags(0) values ", fullTable, superName);
size_t curLen = strlen(pBuf);
// first enter
if(sqlBuf[0] == 0) {
strcpy(sqlBuf, STR_SQL_INSERT);
} else { // check need send
if( bufLen - strlen(sqlBuf) < 300) {
sqlBufSend(taos, sqlBuf);
strcpy(sqlBuf, STR_SQL_INSERT);
}
}
// init
int32_t preLen = strlen(sqlBuf);
char *subBuf = sqlBuf + preLen;
int32_t subLen = bufLen - preLen;
sprintf(subBuf, " %s using %s tags(0) values ", dbTable, superName);
size_t curLen = strlen(subBuf);
TAOS_ROW row ;
bool full = false;
......@@ -340,30 +372,42 @@ bool sendChildTalbe(STscObj *pTscObj, char *superName, char *tableName, TAOS_FIE
row = (TAOS_ROW)taosArrayGetP(arr, i);
if(row == NULL)
continue;
curLen += appendValues(fields, numCols, row, pBuf, bufLen - 100, curLen, &full);
if(full || i == numRows - 1) { // need send
if(subLen > 200)
curLen += appendValues(fields, numCols, row, subBuf, subLen - 100, curLen, &full);
else
full = true;
if(full) { // need send
// send current
strcat(pBuf, ";");
taos_query_a(pTscObj, pBuf, cbSendValues, NULL);
// reset for next
if(full) {
sprintf(pBuf, "insert into %s.%s using %s tags(0) values ", dbName, tableName, superName);
curLen = strlen(pBuf);
// retry append. if full is true again, ignore this row
curLen += appendValues(fields, numCols, row, pBuf, bufLen - 100, curLen, &full);
full = false; // reset to false
}
sqlBufSend(taos, sqlBuf);
// init reset
strcpy(sqlBuf, STR_SQL_INSERT);
preLen = strlen(sqlBuf);
subBuf = sqlBuf + preLen;
subLen = bufLen - preLen;
sprintf(subBuf, " %s using %s tags(0) values ", dbTable, superName);
curLen = strlen(subBuf);
// retry append. if full is true again, ignore this row
curLen += appendValues(fields, numCols, row, subBuf, subLen - 100, curLen, &full);
full = false; // reset to false
}
tfree(row);
}
tfree(pBuf);
return true;
}
// write cq result to another table
bool toAnotherTable(STscObj *pTscObj, char *superName, TAOS_FIELD *fields, int32_t numCols, SHashObj *tbHash) {
bool toAnotherTable(STscObj *pTscObj, char *superName, TAOS_FIELD *fields, int32_t numCols, SHashObj *tbHash, int32_t numRows) {
int32_t bufLen = TSDB_MAX_SQL_LEN/2 - 128;
char * sqlBuf = tmalloc(bufLen);
sqlBuf[0] = 0; // init
ok_cnt = 0;
err_cnt = 0;
int cnt_table = 0;
void *pIter = taosHashIterate(tbHash, NULL);
while(pIter) {
SArray *arr = *(SArray**)pIter;
......@@ -377,14 +421,22 @@ bool toAnotherTable(STscObj *pTscObj, char *superName, TAOS_FIELD *fields, int32
key[len] = 0; // string end '\0'
// send all this table rows
sendChildTalbe(pTscObj, superName, key, fields, numCols, arr);
sendChildTalbe(pTscObj, superName, key, fields, numCols, arr, sqlBuf, bufLen);
// release SArray
taosArrayDestroy(arr);
tfree(key);
cnt_table ++;
}
pIter = taosHashIterate(tbHash, pIter);
}
if(sqlBuf[0]) {
sqlBufSend(pTscObj, sqlBuf);
}
tscInfo("CQ ===== stream %d rows write to %d tables ===== \n", numRows, cnt_table);
tfree(sqlBuf);
return true;
}
......@@ -483,7 +535,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
// write Another
if(toAnother) {
toAnotherTable(pSql->pTscObj, pStream->to, fields, dstColsNum, tbHash);
toAnotherTable(pSql->pTscObj, pStream->to, fields, dstColsNum, tbHash, numOfRows);
taosHashCleanup(tbHash);
}
......
......@@ -92,6 +92,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
#ifdef __cplusplus
}
......
......@@ -1659,3 +1659,9 @@ static void rpcDecRef(SRpcInfo *pRpc)
}
}
int32_t rpcUnusedSession(void * rpcInfo, bool bLock) {
SRpcInfo *info = (SRpcInfo *)rpcInfo;
if(info == NULL)
return 0;
return taosIdPoolNumOfFree(info->idPool, bLock);
}
\ No newline at end of file
......@@ -36,6 +36,8 @@ int taosIdPoolNumOfUsed(void *handle);
bool taosIdPoolMarkStatus(void *handle, int id);
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int taosIdPoolNumOfFree(void *handle, bool bLock);
#ifdef __cplusplus
}
#endif
......
......@@ -163,4 +163,16 @@ int taosIdPoolMaxSize(void *handle) {
pthread_mutex_unlock(&pIdPool->mutex);
return ret;
}
\ No newline at end of file
}
// get free count from pool , if bLock is true, locked pool than get free count, accuracy but slowly
int taosIdPoolNumOfFree(void *handle, bool bLock) {
id_pool_t *pIdPool = handle;
if(bLock)
pthread_mutex_lock(&pIdPool->mutex);
int ret = pIdPool->numOfFree;
if(bLock)
pthread_mutex_unlock(&pIdPool->mutex);
return ret;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册