dataInserter.c 7.2 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tqueue.h"

extern SDataSinkStat gDataSinkStat;

D
dapan1121 已提交
27 28 29 30 31
typedef struct SSubmitRes {
  int64_t     affectedRows;
  int32_t     code;
  SSubmitRsp *pRsp;
} SSubmitRes;
D
dapan1121 已提交
32 33 34 35

typedef struct SDataInserterHandle {
  SDataSinkHandle     sink;
  SDataSinkManager*   pManager;
D
dapan1121 已提交
36 37 38 39 40
  STSchema*           pSchema;
  SQueryInserterNode* pNode;
  SSubmitRes          submitRes;
  SInserterParam*     pParam;
  SArray*             pDataBlocks;
D
dapan1121 已提交
41 42 43 44 45
  int32_t             status;
  bool                queryEnd;
  uint64_t            useconds;
  uint64_t            cachedSize;
  TdThreadMutex       mutex;
D
dapan1121 已提交
46
  tsem_t              ready;  
D
dapan1121 已提交
47 48
} SDataInserterHandle;

D
dapan1121 已提交
49 50 51
typedef struct SSubmitRspParam {
  SDataInserterHandle* pInserter;
} SSubmitRspParam;
D
dapan1121 已提交
52

D
dapan1121 已提交
53 54 55 56 57 58 59 60 61
static int32_t updateStatus(SDataInserterHandle* pInserter) {
  taosThreadMutexLock(&pInserter->mutex);
  int32_t blockNums = taosQueueItemSize(pInserter->pDataBlocks);
  int32_t status =
      (0 == blockNums ? DS_BUF_EMPTY
                      : (blockNums < pInserter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
  pInserter->status = status;
  taosThreadMutexUnlock(&pInserter->mutex);
  return status;
D
dapan1121 已提交
62 63
}

D
dapan1121 已提交
64 65 66 67 68 69
static int32_t getStatus(SDataInserterHandle* pInserter) {
  taosThreadMutexLock(&pInserter->mutex);
  int32_t status = pInserter->status;
  taosThreadMutexUnlock(&pInserter->mutex);
  return status;
}
D
dapan1121 已提交
70

D
dapan1121 已提交
71 72 73
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
  SSubmitRspParam* pParam = (SSubmitRspParam*)param;
  SDataInserterHandle* pInserter = pParam->pInserter;
D
dapan1121 已提交
74

D
dapan1121 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
  pInserter->submitRes.code = code;
  
  if (code == TSDB_CODE_SUCCESS) {
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp));
    SDecoder    coder = {0};
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
    code = tDecodeSSubmitRsp(&coder, pInserter->submitRes.pRsp);
    if (code) {
      tFreeSSubmitRsp(pInserter->submitRes.pRsp);
      pInserter->submitRes.code = code;
      goto _return;
    }
    
    if (pInserter->submitRes.pRsp->nBlocks > 0) {
      for (int32_t i = 0; i < pInserter->submitRes.pRsp->nBlocks; ++i) {
        SSubmitBlkRsp *blk = pInserter->submitRes.pRsp->pBlocks + i;
        if (TSDB_CODE_SUCCESS != blk->code) {
          code = blk->code;
          tFreeSSubmitRsp(pInserter->submitRes.pRsp);
          pInserter->submitRes.code = code;
          goto _return;
        }
      }
    }
    
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
    qDebug("submit rsp received, affectedRows:%d, total:%d", pInserter->submitRes.pRsp->affectedRows, pInserter->submitRes.affectedRows);
D
dapan1121 已提交
102

D
dapan1121 已提交
103 104
    tFreeSSubmitRsp(pInserter->submitRes.pRsp);
  }
D
dapan1121 已提交
105

D
dapan1121 已提交
106
_return:
D
dapan1121 已提交
107

D
dapan1121 已提交
108
  tsem_post(&pInserter->ready);
D
dapan1121 已提交
109

D
dapan1121 已提交
110
  taosMemoryFree(param);
D
dapan1121 已提交
111
  
D
dapan1121 已提交
112
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
113 114 115
}


D
dapan1121 已提交
116 117 118 119 120 121 122
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMsg, void* pTransporter, SEpSet* pEpset) {
  // send the fetch remote task result reques
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == pMsgSendInfo) {
    taosMemoryFreeClear(pMsg);
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
D
dapan1121 已提交
123 124
  }

D
dapan1121 已提交
125 126
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
  pParam->pInserter = pInserter;
D
dapan1121 已提交
127

D
dapan1121 已提交
128 129 130 131 132
  pMsgSendInfo->param = pParam;
  pMsgSendInfo->msgInfo.pData = pMsg;
  pMsgSendInfo->msgInfo.len = sizeof(SSubmitReq);
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
  pMsgSendInfo->fp = inserterCallback;
D
dapan1121 已提交
133

D
dapan1121 已提交
134 135
  int64_t transporterId = 0;
  return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
D
dapan1121 已提交
136 137 138
}


D
dapan1121 已提交
139 140 141 142
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  taosArrayPush(pInserter->pDataBlocks, pInput->pData);
  SSubmitReq* pMsg = dataBlockToSubmit(pInserter->pDataBlocks, pInserter->pSchema, pInserter->pNode->tableId, pInserter->pNode->suid, pInserter->pNode->vgId);
D
dapan1121 已提交
143

D
dapan1121 已提交
144 145 146
  int32_t code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
  if (code) {
    return code;
D
dapan1121 已提交
147 148
  }

D
dapan1121 已提交
149
  tsem_wait(&pInserter->ready);
D
dapan1121 已提交
150

D
dapan1121 已提交
151 152
  if (pInserter->submitRes.code) {
    return pInserter->submitRes.code;
D
dapan1121 已提交
153
  }
D
dapan1121 已提交
154 155

  *pContinue = true;
D
dapan1121 已提交
156 157 158 159
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
160 161 162 163 164 165 166 167
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  taosThreadMutexLock(&pInserter->mutex);
  pInserter->queryEnd = true;
  pInserter->useconds = useconds;
  taosThreadMutexUnlock(&pInserter->mutex);
}

D
dapan1121 已提交
168
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
D
dapan1121 已提交
169 170 171 172 173
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
  taosArrayDestroy(pInserter->pDataBlocks);
  taosMemoryFree(pInserter->pSchema);
  taosThreadMutexDestroy(&pInserter->mutex);
D
dapan1121 已提交
174 175 176 177
  return TSDB_CODE_SUCCESS;
}

static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
D
dapan1121 已提交
178
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
D
dapan1121 已提交
179 180 181 182 183 184 185 186 187 188 189 190

  *size = atomic_load_64(&pDispatcher->cachedSize);
  return TSDB_CODE_SUCCESS;
}

int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) {
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
  if (NULL == inserter) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
191
  SDataDeleterNode* pInserterNode = (SQueryInserterNode *)pDataSink;
D
dapan1121 已提交
192 193
  inserter->sink.fPut = putDataBlock;
  inserter->sink.fEndPut = endPut;
D
dapan1121 已提交
194 195
  inserter->sink.fGetLen = NULL;
  inserter->sink.fGetData = NULL;
D
dapan1121 已提交
196 197 198
  inserter->sink.fDestroy = destroyDataSinker;
  inserter->sink.fGetCacheSize = getCacheSize;
  inserter->pManager = pManager;
D
dapan1121 已提交
199
  inserter->pNode = pInserterNode;
D
dapan1121 已提交
200 201 202
  inserter->pParam = pParam;
  inserter->status = DS_BUF_EMPTY;
  inserter->queryEnd = false;
D
dapan1121 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215

  int64_t suid = 0;
  int32_t code = tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
  if (code) {
    return code;
  }

  if (pInserterNode->suid != suid) {
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    return terrno;
  }

  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
216 217 218 219 220
  taosThreadMutexInit(&inserter->mutex, NULL);
  if (NULL == inserter->pDataBlocks) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
221 222 223

  tsem_init(&inserter->ready, 0, 0);
  
D
dapan1121 已提交
224 225 226
  *pHandle = inserter;
  return TSDB_CODE_SUCCESS;
}