streamData.c 7.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInt.h"
L
Liu Jicong 已提交
17

18 19 20 21 22 23 24 25 26
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
  SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
  if (pData == NULL) {
    return NULL;
  }

  pData->type = blockType;
  pData->srcVgId = srcVg;

L
Liu Jicong 已提交
27
  int32_t blockNum = pReq->blockNum;
28
  SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
L
Liu Jicong 已提交
29
  if (pArray == NULL) {
H
Haojun Liao 已提交
30
    taosFreeQitem(pData);
31
    return NULL;
L
Liu Jicong 已提交
32 33
  }

34
  ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
L
Liu Jicong 已提交
35 36

  for (int32_t i = 0; i < blockNum; i++) {
5
54liuyao 已提交
37
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
L
Liu Jicong 已提交
38
    SSDataBlock*       pDataBlock = taosArrayGet(pArray, i);
39
    blockDecode(pDataBlock, pRetrieve->data);
40

L
Liu Jicong 已提交
41
    // TODO: refactor
42 43
    pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
    pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
44
    pDataBlock->info.version = be64toh(pRetrieve->version);
45
    pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
46
    memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN);
47

L
Liu Jicong 已提交
48
    pDataBlock->info.type = pRetrieve->streamBlockType;
L
Liu Jicong 已提交
49 50
    pDataBlock->info.childId = pReq->upstreamChildId;
  }
51

L
Liu Jicong 已提交
52
  pData->blocks = pArray;
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
  return pData;
}

SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
  SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
  if (pStreamBlocks == NULL) {
    taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
    return NULL;
  }

  pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
  pStreamBlocks->blocks = pRes;

  if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
    SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
68
    pStreamBlocks->childId = pTask->info.selfChildId;
69 70 71
    pStreamBlocks->sourceVer = pSubmit->ver;
  } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
72
    pStreamBlocks->childId = pTask->info.selfChildId;
73 74 75 76 77 78 79 80 81 82 83 84 85
    pStreamBlocks->sourceVer = pMerged->ver;
  }

  return pStreamBlocks;
}

void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
  if (pBlock == NULL) {
    return;
  }

  taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
  taosFreeQitem(pBlock);
L
Liu Jicong 已提交
86 87 88 89 90 91
}

int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
  SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
  if (pArray == NULL) {
    return -1;
L
Liu Jicong 已提交
92
  }
93

H
Haojun Liao 已提交
94
  taosArrayPush(pArray, &(SSDataBlock){0});
L
Liu Jicong 已提交
95
  SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
96
  SSDataBlock*       pDataBlock = taosArrayGet(pArray, 0);
97
  blockDecode(pDataBlock, pRetrieve->data);
98

99 100 101
  // TODO: refactor
  pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
  pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
102
  pDataBlock->info.version = be64toh(pRetrieve->version);
103 104 105

  pDataBlock->info.type = pRetrieve->streamBlockType;

L
Liu Jicong 已提交
106
  pData->reqId = pReq->reqId;
L
Liu Jicong 已提交
107
  pData->blocks = pArray;
L
Liu Jicong 已提交
108

L
Liu Jicong 已提交
109 110 111
  return 0;
}

112 113
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
  SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen);
114 115 116 117
  if (pDataSubmit == NULL) {
    return NULL;
  }

L
Liu Jicong 已提交
118
  pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
119 120 121 122 123
  if (pDataSubmit->dataRef == NULL) {
    taosFreeQitem(pDataSubmit);
    return NULL;
  }

124
  pDataSubmit->submit = *pData;
125
  *pDataSubmit->dataRef = 1;   // initialize the reference count to be 1
126
  pDataSubmit->type = type;
127

L
Liu Jicong 已提交
128 129 130
  return pDataSubmit;
}

131
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
132 133 134 135 136 137 138 139 140
  int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
  ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);

  if (ref == 0) {
    taosMemoryFree(pDataSubmit->submit.msgStr);
    taosMemoryFree(pDataSubmit->dataRef);
  }
}

141 142
SStreamMergedSubmit* streamMergedSubmitNew() {
  SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
143 144 145
  if (pMerged == NULL) {
    return NULL;
  }
146

L
Liu Jicong 已提交
147
  pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
148
  pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
149 150 151 152 153 154 155 156

  if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
    taosArrayDestroy(pMerged->submits);
    taosArrayDestroy(pMerged->dataRefs);
    taosFreeQitem(pMerged);
    return NULL;
  }

J
jiacy-jcy 已提交
157
  pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
158 159 160
  return pMerged;
}

161
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
162
  taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
L
Liu Jicong 已提交
163
  taosArrayPush(pMerged->submits, &pSubmit->submit);
164 165 166 167
  pMerged->ver = pSubmit->ver;
  return 0;
}

168 169
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
  if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
L
Liu Jicong 已提交
170
    SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
171
    SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
L
Liu Jicong 已提交
172
    taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
173
    taosArrayDestroy(pBlockSrc->blocks);
174
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
175
    return dst;
176
  } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
177 178
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
    SStreamDataSubmit*   pBlockSrc = (SStreamDataSubmit*)pElem;
179
    streamMergeSubmit(pMerged, pBlockSrc);
180
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
181
    return dst;
182
  } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
183
    SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
184 185
    // todo handle error

186 187
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
188
    taosFreeQitem(dst);
189
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
190
    return (SStreamQueueItem*)pMerged;
L
Liu Jicong 已提交
191
  } else {
J
jiacy-jcy 已提交
192
    return NULL;
L
Liu Jicong 已提交
193 194 195 196 197
  }
}

void streamFreeQitem(SStreamQueueItem* data) {
  int8_t type = data->type;
L
Liu Jicong 已提交
198
  if (type == STREAM_INPUT__GET_RES) {
L
Liu Jicong 已提交
199 200 201
    blockDataDestroy(((SStreamTrigger*)data)->pBlock);
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
202
    taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
203 204
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_SUBMIT) {
205
    streamDataSubmitDestroy((SStreamDataSubmit*)data);
L
Liu Jicong 已提交
206
    taosFreeQitem(data);
207
  } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
208
    SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
209 210

    int32_t sz = taosArrayGetSize(pMerge->submits);
211
    for (int32_t i = 0; i < sz; i++) {
212 213 214
      int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
      int32_t  ref = atomic_sub_fetch_32(pRef, 1);
      ASSERT(ref >= 0);
215

216
      if (ref == 0) {
L
Liu Jicong 已提交
217
        SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
L
Liu Jicong 已提交
218
        taosMemoryFree(pSubmit->msgStr);
219
        taosMemoryFree(pRef);
220 221
      }
    }
L
Liu Jicong 已提交
222
    taosArrayDestroy(pMerge->submits);
223 224
    taosArrayDestroy(pMerge->dataRefs);
    taosFreeQitem(pMerge);
L
Liu Jicong 已提交
225 226
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
    SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
H
Haojun Liao 已提交
227
    blockDataDestroy(pRefBlock->pBlock);
L
Liu Jicong 已提交
228
    taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
229 230
  }
}