streamData.c 8.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInc.h"
L
Liu Jicong 已提交
17

18 19 20 21 22 23 24 25 26
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
  if (pData == NULL) {
    return NULL;
  }

  pData->type = blockType;
  pData->srcVgId = srcVg;

L
Liu Jicong 已提交
27
  int32_t blockNum = pReq->blockNum;
28
  SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
L
Liu Jicong 已提交
29
  if (pArray == NULL) {
30
    return NULL;
L
Liu Jicong 已提交
31 32
  }

33
  ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
L
Liu Jicong 已提交
34 35

  for (int32_t i = 0; i < blockNum; i++) {
5
54liuyao 已提交
36
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
L
Liu Jicong 已提交
37
    SSDataBlock*       pDataBlock = taosArrayGet(pArray, i);
38
    blockDecode(pDataBlock, pRetrieve->data);
39

L
Liu Jicong 已提交
40
    // TODO: refactor
41 42
    pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
    pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
43
    pDataBlock->info.version = be64toh(pRetrieve->version);
44
    pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
45
    memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN);
46

L
Liu Jicong 已提交
47
    pDataBlock->info.type = pRetrieve->streamBlockType;
L
Liu Jicong 已提交
48 49
    pDataBlock->info.childId = pReq->upstreamChildId;
  }
50

L
Liu Jicong 已提交
51
  pData->blocks = pArray;
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  return pData;
}

SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
  SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
  if (pStreamBlocks == NULL) {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
    return NULL;
  }

  pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
  pStreamBlocks->blocks = pRes;

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pSubmit->ver;
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
    pStreamBlocks->childId = pTask->selfChildId;
    pStreamBlocks->sourceVer = pMerged->ver;
  }

  return pStreamBlocks;
}

void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
  if (pBlock == NULL) {
    return;
  }

  taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
  taosFreeQitem(pBlock);
L
Liu Jicong 已提交
85 86 87 88 89 90
}

int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
  SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
  if (pArray == NULL) {
    return -1;
L
Liu Jicong 已提交
91
  }
92

H
Haojun Liao 已提交
93
  taosArrayPush(pArray, &(SSDataBlock){0});
L
Liu Jicong 已提交
94
  SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
95
  SSDataBlock*       pDataBlock = taosArrayGet(pArray, 0);
96
  blockDecode(pDataBlock, pRetrieve->data);
97

98 99 100
  // TODO: refactor
  pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
  pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
101
  pDataBlock->info.version = be64toh(pRetrieve->version);
102 103 104

  pDataBlock->info.type = pRetrieve->streamBlockType;

L
Liu Jicong 已提交
105
  pData->reqId = pReq->reqId;
L
Liu Jicong 已提交
106
  pData->blocks = pArray;
L
Liu Jicong 已提交
107

L
Liu Jicong 已提交
108 109 110
  return 0;
}

111 112
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
  SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen);
113 114 115 116
  if (pDataSubmit == NULL) {
    return NULL;
  }

L
Liu Jicong 已提交
117
  pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
118 119 120 121 122
  if (pDataSubmit->dataRef == NULL) {
    taosFreeQitem(pDataSubmit);
    return NULL;
  }

123
  pDataSubmit->submit = *pData;
124
  *pDataSubmit->dataRef = 1;   // initialize the reference count to be 1
125
  pDataSubmit->type = type;
126

L
Liu Jicong 已提交
127 128 129
  return pDataSubmit;
}

130
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
131 132 133 134 135 136 137 138 139
  int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
  ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);

  if (ref == 0) {
    taosMemoryFree(pDataSubmit->submit.msgStr);
    taosMemoryFree(pDataSubmit->dataRef);
  }
}

140 141
SStreamMergedSubmit* streamMergedSubmitNew() {
  SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
142 143 144
  if (pMerged == NULL) {
    return NULL;
  }
145

L
Liu Jicong 已提交
146
  pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
147
  pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
148 149 150 151 152 153 154 155

  if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
    taosArrayDestroy(pMerged->submits);
    taosArrayDestroy(pMerged->dataRefs);
    taosFreeQitem(pMerged);
    return NULL;
  }

J
jiacy-jcy 已提交
156
  pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
157 158 159
  return pMerged;
}

160
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
161
  taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
L
Liu Jicong 已提交
162
  taosArrayPush(pMerged->submits, &pSubmit->submit);
163 164 165 166
  pMerged->ver = pSubmit->ver;
  return 0;
}

167
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
L
Liu Jicong 已提交
168 169 170
  atomic_add_fetch_32(pDataSubmit->dataRef, 1);
}

171
SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) {
172 173 174 175 176
  int32_t len = 0;
  if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
    len = pSubmit->submit.msgLen;
  }

177
  SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len);
L
Liu Jicong 已提交
178 179 180
  if (pSubmitClone == NULL) {
    return NULL;
  }
181

L
Liu Jicong 已提交
182
  streamDataSubmitRefInc(pSubmit);
183
  memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
L
Liu Jicong 已提交
184 185
  return pSubmitClone;
}
L
Liu Jicong 已提交
186

187 188
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
  if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
L
Liu Jicong 已提交
189
    SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
190
    SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
L
Liu Jicong 已提交
191
    taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
192
    taosArrayDestroy(pBlockSrc->blocks);
193
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
194
    return dst;
195
  } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
196 197
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
    SStreamDataSubmit*   pBlockSrc = (SStreamDataSubmit*)pElem;
198
    streamMergeSubmit(pMerged, pBlockSrc);
199
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
200
    return dst;
201
  } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
202
    SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
203 204
    // todo handle error

205 206
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
207
    taosFreeQitem(dst);
208
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
209
    return (SStreamQueueItem*)pMerged;
L
Liu Jicong 已提交
210
  } else {
J
jiacy-jcy 已提交
211
    return NULL;
L
Liu Jicong 已提交
212 213 214 215 216
  }
}

void streamFreeQitem(SStreamQueueItem* data) {
  int8_t type = data->type;
L
Liu Jicong 已提交
217
  if (type == STREAM_INPUT__GET_RES) {
L
Liu Jicong 已提交
218 219 220
    blockDataDestroy(((SStreamTrigger*)data)->pBlock);
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
221
    taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
222 223
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_SUBMIT) {
224
    streamDataSubmitDestroy((SStreamDataSubmit*)data);
L
Liu Jicong 已提交
225
    taosFreeQitem(data);
226
  } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
227
    SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
228 229

    int32_t sz = taosArrayGetSize(pMerge->submits);
230
    for (int32_t i = 0; i < sz; i++) {
231 232 233
      int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
      int32_t  ref = atomic_sub_fetch_32(pRef, 1);
      ASSERT(ref >= 0);
234

235
      if (ref == 0) {
L
Liu Jicong 已提交
236
        SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
L
Liu Jicong 已提交
237
        taosMemoryFree(pSubmit->msgStr);
238
        taosMemoryFree(pRef);
239 240
      }
    }
L
Liu Jicong 已提交
241
    taosArrayDestroy(pMerge->submits);
242 243
    taosArrayDestroy(pMerge->dataRefs);
    taosFreeQitem(pMerge);
L
Liu Jicong 已提交
244 245
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
    SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
H
Haojun Liao 已提交
246
    blockDataDestroy(pRefBlock->pBlock);
L
Liu Jicong 已提交
247
    taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
248 249
  }
}