streamData.c 8.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInc.h"
L
Liu Jicong 已提交
17

18 19 20 21 22 23 24 25 26
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
  if (pData == NULL) {
    return NULL;
  }

  pData->type = blockType;
  pData->srcVgId = srcVg;

L
Liu Jicong 已提交
27
  int32_t blockNum = pReq->blockNum;
28
  SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
L
Liu Jicong 已提交
29
  if (pArray == NULL) {
H
Haojun Liao 已提交
30
    taosFreeQitem(pData);
31
    return NULL;
L
Liu Jicong 已提交
32 33
  }

34
  ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
L
Liu Jicong 已提交
35 36

  for (int32_t i = 0; i < blockNum; i++) {
5
54liuyao 已提交
37
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
L
Liu Jicong 已提交
38
    SSDataBlock*       pDataBlock = taosArrayGet(pArray, i);
39
    blockDecode(pDataBlock, pRetrieve->data);
40

L
Liu Jicong 已提交
41
    // TODO: refactor
42 43
    pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
    pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
44
    pDataBlock->info.version = be64toh(pRetrieve->version);
45
    pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
46
    memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN);
47

L
Liu Jicong 已提交
48
    pDataBlock->info.type = pRetrieve->streamBlockType;
L
Liu Jicong 已提交
49 50
    pDataBlock->info.childId = pReq->upstreamChildId;
  }
51

L
Liu Jicong 已提交
52
  pData->blocks = pArray;
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
  return pData;
}

SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
  SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
  if (pStreamBlocks == NULL) {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
    return NULL;
  }

  pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
  pStreamBlocks->blocks = pRes;

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pSubmit->ver;
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pMerged->ver;
  }

  return pStreamBlocks;
}

void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
  if (pBlock == NULL) {
    return;
  }

  taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
  taosFreeQitem(pBlock);
L
Liu Jicong 已提交
86 87 88 89 90 91
}

int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
  SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
  if (pArray == NULL) {
    return -1;
L
Liu Jicong 已提交
92
  }
93

H
Haojun Liao 已提交
94
  taosArrayPush(pArray, &(SSDataBlock){0});
L
Liu Jicong 已提交
95
  SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
96
  SSDataBlock*       pDataBlock = taosArrayGet(pArray, 0);
97
  blockDecode(pDataBlock, pRetrieve->data);
98

99 100 101
  // TODO: refactor
  pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
  pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
102
  pDataBlock->info.version = be64toh(pRetrieve->version);
103 104 105

  pDataBlock->info.type = pRetrieve->streamBlockType;

L
Liu Jicong 已提交
106
  pData->reqId = pReq->reqId;
L
Liu Jicong 已提交
107
  pData->blocks = pArray;
L
Liu Jicong 已提交
108

L
Liu Jicong 已提交
109 110 111
  return 0;
}

112 113
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
  SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen);
114 115 116 117
  if (pDataSubmit == NULL) {
    return NULL;
  }

L
Liu Jicong 已提交
118
  pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
119 120 121 122 123
  if (pDataSubmit->dataRef == NULL) {
    taosFreeQitem(pDataSubmit);
    return NULL;
  }

124
  pDataSubmit->submit = *pData;
125
  *pDataSubmit->dataRef = 1;   // initialize the reference count to be 1
126
  pDataSubmit->type = type;
127

L
Liu Jicong 已提交
128 129 130
  return pDataSubmit;
}

131
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
132 133 134 135 136 137 138 139 140
  int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
  ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);

  if (ref == 0) {
    taosMemoryFree(pDataSubmit->submit.msgStr);
    taosMemoryFree(pDataSubmit->dataRef);
  }
}

141 142
SStreamMergedSubmit* streamMergedSubmitNew() {
  SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
143 144 145
  if (pMerged == NULL) {
    return NULL;
  }
146

L
Liu Jicong 已提交
147
  pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
148
  pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
149 150 151 152 153 154 155 156

  if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
    taosArrayDestroy(pMerged->submits);
    taosArrayDestroy(pMerged->dataRefs);
    taosFreeQitem(pMerged);
    return NULL;
  }

J
jiacy-jcy 已提交
157
  pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
158 159 160
  return pMerged;
}

161
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
162
  taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
L
Liu Jicong 已提交
163
  taosArrayPush(pMerged->submits, &pSubmit->submit);
164 165 166 167
  pMerged->ver = pSubmit->ver;
  return 0;
}

168
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
L
Liu Jicong 已提交
169 170 171
  atomic_add_fetch_32(pDataSubmit->dataRef, 1);
}

172
SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) {
173 174 175 176 177
  int32_t len = 0;
  if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
    len = pSubmit->submit.msgLen;
  }

178
  SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len);
L
Liu Jicong 已提交
179 180 181
  if (pSubmitClone == NULL) {
    return NULL;
  }
182

L
Liu Jicong 已提交
183
  streamDataSubmitRefInc(pSubmit);
184
  memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
L
Liu Jicong 已提交
185 186
  return pSubmitClone;
}
L
Liu Jicong 已提交
187

188 189
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
  if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
L
Liu Jicong 已提交
190
    SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
191
    SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
L
Liu Jicong 已提交
192
    taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
193
    taosArrayDestroy(pBlockSrc->blocks);
194
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
195
    return dst;
196
  } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
197 198
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
    SStreamDataSubmit*   pBlockSrc = (SStreamDataSubmit*)pElem;
199
    streamMergeSubmit(pMerged, pBlockSrc);
200
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
201
    return dst;
202
  } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
203
    SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
204 205
    // todo handle error

206 207
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
208
    taosFreeQitem(dst);
209
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
210
    return (SStreamQueueItem*)pMerged;
L
Liu Jicong 已提交
211
  } else {
J
jiacy-jcy 已提交
212
    return NULL;
L
Liu Jicong 已提交
213 214 215 216 217
  }
}

void streamFreeQitem(SStreamQueueItem* data) {
  int8_t type = data->type;
L
Liu Jicong 已提交
218
  if (type == STREAM_INPUT__GET_RES) {
L
Liu Jicong 已提交
219 220 221
    blockDataDestroy(((SStreamTrigger*)data)->pBlock);
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
222
    taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
223 224
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_SUBMIT) {
225
    streamDataSubmitDestroy((SStreamDataSubmit*)data);
L
Liu Jicong 已提交
226
    taosFreeQitem(data);
227
  } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
228
    SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
229 230

    int32_t sz = taosArrayGetSize(pMerge->submits);
231
    for (int32_t i = 0; i < sz; i++) {
232 233 234
      int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
      int32_t  ref = atomic_sub_fetch_32(pRef, 1);
      ASSERT(ref >= 0);
235

236
      if (ref == 0) {
L
Liu Jicong 已提交
237
        SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
L
Liu Jicong 已提交
238
        taosMemoryFree(pSubmit->msgStr);
239
        taosMemoryFree(pRef);
240 241
      }
    }
L
Liu Jicong 已提交
242
    taosArrayDestroy(pMerge->submits);
243 244
    taosArrayDestroy(pMerge->dataRefs);
    taosFreeQitem(pMerge);
L
Liu Jicong 已提交
245 246
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
    SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
H
Haojun Liao 已提交
247
    blockDataDestroy(pRefBlock->pBlock);
L
Liu Jicong 已提交
248
    taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
249 250
  }
}