streamData.c 7.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "streamInc.h"
L
Liu Jicong 已提交
17

18
int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
L
Liu Jicong 已提交
19
  int32_t blockNum = pReq->blockNum;
20
  SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
L
Liu Jicong 已提交
21 22 23 24 25 26 27 28
  if (pArray == NULL) {
    return -1;
  }

  ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data));
  ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));

  for (int32_t i = 0; i < blockNum; i++) {
5
54liuyao 已提交
29
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
L
Liu Jicong 已提交
30
    SSDataBlock*       pDataBlock = taosArrayGet(pArray, i);
31
    blockDecode(pDataBlock, pRetrieve->data);
L
Liu Jicong 已提交
32
    // TODO: refactor
33 34
    pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
    pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
35
    pDataBlock->info.version = be64toh(pRetrieve->version);
36
    pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
37
    memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN);
38

L
Liu Jicong 已提交
39
    pDataBlock->info.type = pRetrieve->streamBlockType;
L
Liu Jicong 已提交
40 41
    pDataBlock->info.childId = pReq->upstreamChildId;
  }
42

L
Liu Jicong 已提交
43 44 45 46 47 48 49 50
  pData->blocks = pArray;
  return 0;
}

int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
  SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
  if (pArray == NULL) {
    return -1;
L
Liu Jicong 已提交
51
  }
52

H
Haojun Liao 已提交
53
  taosArrayPush(pArray, &(SSDataBlock){0});
L
Liu Jicong 已提交
54
  SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
55
  SSDataBlock*       pDataBlock = taosArrayGet(pArray, 0);
56
  blockDecode(pDataBlock, pRetrieve->data);
57

58 59 60
  // TODO: refactor
  pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
  pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
61
  pDataBlock->info.version = be64toh(pRetrieve->version);
62 63 64

  pDataBlock->info.type = pRetrieve->streamBlockType;

L
Liu Jicong 已提交
65
  pData->reqId = pReq->reqId;
L
Liu Jicong 已提交
66
  pData->blocks = pArray;
L
Liu Jicong 已提交
67

L
Liu Jicong 已提交
68 69 70
  return 0;
}

71 72
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
  SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen);
73 74 75 76
  if (pDataSubmit == NULL) {
    return NULL;
  }

L
Liu Jicong 已提交
77
  pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
78 79 80 81 82
  if (pDataSubmit->dataRef == NULL) {
    taosFreeQitem(pDataSubmit);
    return NULL;
  }

83
  pDataSubmit->submit = *pData;
84
  *pDataSubmit->dataRef = 1;   // initialize the reference count to be 1
85
  pDataSubmit->type = type;
86

L
Liu Jicong 已提交
87 88 89
  return pDataSubmit;
}

90
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
91 92 93 94 95 96 97 98 99
  int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
  ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);

  if (ref == 0) {
    taosMemoryFree(pDataSubmit->submit.msgStr);
    taosMemoryFree(pDataSubmit->dataRef);
  }
}

100 101
SStreamMergedSubmit* streamMergedSubmitNew() {
  SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
102 103 104
  if (pMerged == NULL) {
    return NULL;
  }
105

L
Liu Jicong 已提交
106
  pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
107
  pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
108 109 110 111 112 113 114 115

  if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
    taosArrayDestroy(pMerged->submits);
    taosArrayDestroy(pMerged->dataRefs);
    taosFreeQitem(pMerged);
    return NULL;
  }

J
jiacy-jcy 已提交
116
  pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
117 118 119
  return pMerged;
}

120
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
121
  taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
L
Liu Jicong 已提交
122
  taosArrayPush(pMerged->submits, &pSubmit->submit);
123 124 125 126
  pMerged->ver = pSubmit->ver;
  return 0;
}

127
static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) {
L
Liu Jicong 已提交
128 129 130
  atomic_add_fetch_32(pDataSubmit->dataRef, 1);
}

131
SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) {
132 133 134 135 136
  int32_t len = 0;
  if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) {
    len = pSubmit->submit.msgLen;
  }

137
  SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len);
L
Liu Jicong 已提交
138 139 140
  if (pSubmitClone == NULL) {
    return NULL;
  }
141

L
Liu Jicong 已提交
142
  streamDataSubmitRefInc(pSubmit);
143
  memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit));
L
Liu Jicong 已提交
144 145
  return pSubmitClone;
}
L
Liu Jicong 已提交
146

147 148
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
  if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
L
Liu Jicong 已提交
149
    SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
150
    SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
L
Liu Jicong 已提交
151
    taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
152
    taosArrayDestroy(pBlockSrc->blocks);
153
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
154
    return dst;
155
  } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
156 157
    SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
    SStreamDataSubmit*   pBlockSrc = (SStreamDataSubmit*)pElem;
158
    streamMergeSubmit(pMerged, pBlockSrc);
159
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
160
    return dst;
161
  } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
162
    SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
163 164
    // todo handle error

165 166
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
    streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
167
    taosFreeQitem(dst);
168
    taosFreeQitem(pElem);
J
jiacy-jcy 已提交
169
    return (SStreamQueueItem*)pMerged;
L
Liu Jicong 已提交
170
  } else {
J
jiacy-jcy 已提交
171
    return NULL;
L
Liu Jicong 已提交
172 173 174 175 176
  }
}

void streamFreeQitem(SStreamQueueItem* data) {
  int8_t type = data->type;
L
Liu Jicong 已提交
177
  if (type == STREAM_INPUT__GET_RES) {
L
Liu Jicong 已提交
178 179 180
    blockDataDestroy(((SStreamTrigger*)data)->pBlock);
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
181
    taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
182 183
    taosFreeQitem(data);
  } else if (type == STREAM_INPUT__DATA_SUBMIT) {
184
    streamDataSubmitDestroy((SStreamDataSubmit*)data);
L
Liu Jicong 已提交
185
    taosFreeQitem(data);
186
  } else if (type == STREAM_INPUT__MERGED_SUBMIT) {
187
    SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
L
Liu Jicong 已提交
188
    int32_t               sz = taosArrayGetSize(pMerge->submits);
189
    for (int32_t i = 0; i < sz; i++) {
190 191 192 193
      int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
      int32_t  ref = atomic_sub_fetch_32(pRef, 1);
      ASSERT(ref >= 0);
      if (ref == 0) {
L
Liu Jicong 已提交
194
        SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
L
Liu Jicong 已提交
195
        taosMemoryFree(pSubmit->msgStr);
196
        taosMemoryFree(pRef);
197 198
      }
    }
L
Liu Jicong 已提交
199
    taosArrayDestroy(pMerge->submits);
200 201
    taosArrayDestroy(pMerge->dataRefs);
    taosFreeQitem(pMerge);
L
Liu Jicong 已提交
202 203
  } else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
    SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
H
Haojun Liao 已提交
204
    blockDataDestroy(pRefBlock->pBlock);
L
Liu Jicong 已提交
205
    taosFreeQitem(pRefBlock);
L
Liu Jicong 已提交
206 207
  }
}