event_dispatcher.cpp 3.5 KB
Newer Older
羽飞's avatar
羽飞 已提交
1
/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
羽飞's avatar
羽飞 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
         http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */

//
// Created by Longda on 2010
//

// Include Files
#include "common/seda/event_dispatcher.h"
namespace common {

// Constructor
20 21
EventDispatcher::EventDispatcher(const char *tag) : Stage(tag), event_store_(), next_stage_(NULL)
{
羽飞's avatar
羽飞 已提交
22 23 24 25 26 27 28 29 30 31 32 33
  LOG_TRACE("enter\n");

  pthread_mutexattr_t attr;

  pthread_mutexattr_init(&attr);
  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
  pthread_mutex_init(&event_lock_, &attr);

  LOG_TRACE("exit\n");
}

// Destructor
34 35
EventDispatcher::~EventDispatcher()
{
羽飞's avatar
羽飞 已提交
36 37 38 39 40 41 42 43 44 45
  LOG_TRACE("enter\n");
  pthread_mutex_destroy(&event_lock_);
  LOG_TRACE("exit\n");
}

/**
 * Process an event
 * Check if the event can be dispatched. If not, hash it and store
 * it.  If so, send it on to the next stage.
 */
46 47
void EventDispatcher::handle_event(StageEvent *event)
{
羽飞's avatar
羽飞 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
  LOG_TRACE("enter\n");

  std::string hash;
  DispatchContext *ctx = NULL;
  status_t stat;

  pthread_mutex_lock(&event_lock_);
  stat = dispatch_event(event, ctx, hash);
  if (stat == SEND_EVENT) {
    next_stage_->add_event(event);
  } else if (stat == STORE_EVENT) {
    StoredEvent se(event, ctx);

    event_store_[hash].push_back(se);
  } else {
    LOG_ERROR("Dispatch event failure\n");
    // in this case, dispatch_event is assumed to have disposed of event
  }
  pthread_mutex_unlock(&event_lock_);

  LOG_TRACE("exit\n");
}

// Initialize stage params and validate outputs
72 73
bool EventDispatcher::initialize()
{
羽飞's avatar
羽飞 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87
  bool ret_val = true;

  if (next_stage_list_.size() != 1) {
    ret_val = false;
  } else {
    next_stage_ = *(next_stage_list_.begin());
  }
  return ret_val;
}

/**
 * Cleanup stage after disconnection
 * Call done() on any events left over in the event_store_.
 */
88 89
void EventDispatcher::cleanup()
{
羽飞's avatar
羽飞 已提交
90 91 92 93 94 95
  pthread_mutex_lock(&event_lock_);

  // for each hash chain...
  for (EventHash::iterator i = event_store_.begin(); i != event_store_.end(); i++) {

    // for each event on the chain
96
    for (std::list<StoredEvent>::iterator j = i->second.begin(); j != i->second.end(); j++) {
羽飞's avatar
羽飞 已提交
97 98 99 100 101 102 103 104 105 106
      j->first->done();
    }
    i->second.clear();
  }
  event_store_.clear();

  pthread_mutex_unlock(&event_lock_);
}

// Wake up a stored event
107 108
bool EventDispatcher::wakeup_event(std::string hashkey)
{
羽飞's avatar
羽飞 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
  bool sent = false;
  EventHash::iterator i;

  i = event_store_.find(hashkey);
  if (i != event_store_.end()) {

    // find the event and remove it from the current queue
    StoredEvent target_ev = *(i->second.begin());
    i->second.pop_front();
    if (i->second.size() == 0) {
      event_store_.erase(i);
    }

    // try to dispatch the event again
    status_t stat = dispatch_event(target_ev.first, target_ev.second, hashkey);
    if (stat == SEND_EVENT) {
      next_stage_->add_event(target_ev.first);
      sent = true;
    } else if (stat == STORE_EVENT) {
      event_store_[hashkey].push_back(target_ev);
    } else {
      LOG_ERROR("Dispatch event failure\n");
      // in this case, dispatch_event is assumed to have disposed of event
    }
  }

  return sent;
}

138
}  // namespace common