event_dispatcher.h 4.9 KB
Newer Older
羽飞's avatar
羽飞 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
         http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */

//
// Created by Longda on 2010
//

#ifndef __COMMON_SEDA_EVENT_DISPATCHER_H__
#define __COMMON_SEDA_EVENT_DISPATCHER_H__

// Include Files
#include <list>
#include <map>

// SEDA headers
#include "common/seda/stage.h"
#include "common/seda/stage_event.h"
namespace common {

/**
 *  @file   Event Dispatcher
 *  @author Longda
 *  @date   8/20/07
 */

class DispatchContext;

/**
 * A stage which stores and re-orders events
 * The EventDispatcher stage is designed to assert control over the order
 * of events that move through the Seda pipeline.  The EventDispatcher stage
 * has a hash table that stores events for later processing.  As each event
 * arrives at the Dispatcher, a test is applied to determine whether it
 * should be allowed to proceed.  This test is implemented in subclasses
 * and uses state from the event and state held in the dispatcher itself.
 * If the event is meant to be delayed, it is hashed and stored.  The
 * dispatcher also provides an interface that "wakes up" a stored, event
 * and re-applies the dispatch test.  This wake-up interface can be called
 * from a background thread, or from a callback associated with an event
 * that has already been dispatched.
 *
 * The EventDispatcher class is designed to be specialized by adding a
 * specific implementation of the dispatch test for events, and a method
 * or process for waking up stored events at the appropriate time.
 */

class EventDispatcher : public Stage {

  // public interface operations

58
public:
羽飞's avatar
羽飞 已提交
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
  typedef enum { SEND_EVENT = 0, STORE_EVENT, FAIL_EVENT } status_t;

  /**
   * Destructor
   * @pre  stage is not connected
   * @post pending events are deleted and stage is destroyed
   */
  virtual ~EventDispatcher();

  /**
   * Process an event
   * Check if the event can be dispatched. If not, hash it and store
   * it.  If so, send it on to the next stage
   *
   * @param[in] event Pointer to event that must be handled.
   * @post  event must not be de-referenced by caller after return
   */
  void handle_event(StageEvent *event);

  // Note, EventDispatcher is an abstract class and needs no make_stage()

80
protected:
羽飞's avatar
羽飞 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
  /**
   * Constructor
   * @param[in] tag     The label that identifies this stage.
   *
   * @pre  tag is non-null and points to null-terminated string
   * @post event queue is empty
   * @post stage is not connected
   */
  EventDispatcher(const char *tag);

  /**
   * Initialize stage params and validate outputs
   * @pre  Stage not connected
   * @return TRUE if and only if outputs are valid and init succeeded.
   */
  bool initialize();

  // set properties for this object
99 100 101 102
  bool set_properties()
  {
    return true;
  }
羽飞's avatar
羽飞 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124

  /**
   * Cleanup stage after disconnection
   * After disconnection is completed, cleanup any resources held by the
   * stage and prepare for destruction or re-initialization.
   */
  virtual void cleanup();

  /**
   * Dispatch test
   * @param[in] ev  Pointer to event that should be tested
   * @param[in/out]  Pointer to context object
   * @param[out] hash  Hash value for event
   *
   * @pre event_lock_ is locked
   * @post hash is calculated if return val is false
   * @return SEND_EVENT if ok to send the event down the pipeline;
   *                    ctx is NULL
   *         STORE_EVENT if event should be stored; ctx will be stored
   *         FAIL_EVENT if failure, and event has been completed;
   *                    ctx is NULL
   */
125
  virtual status_t dispatch_event(StageEvent *ev, DispatchContext *&ctx, std::string &hash) = 0;
羽飞's avatar
羽飞 已提交
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140

  /**
   * Wake up a stored event
   * @pre event_lock_ is locked
   * @return true if an event was found on hash-chain associated with
   *              hashkey and sent to next stage
   *         false no event was found on hash-chain
   */
  bool wakeup_event(std::string hashkey);

  // implementation state

  typedef std::pair<StageEvent *, DispatchContext *> StoredEvent;
  typedef std::map<std::string, std::list<StoredEvent>> EventHash;

141 142 143
  EventHash event_store_;       // events stored here while waiting
  pthread_mutex_t event_lock_;  // protects access to event_store_
  Stage *next_stage_;           // target for dispatched events
羽飞's avatar
羽飞 已提交
144

145
protected:
羽飞's avatar
羽飞 已提交
146 147 148 149 150 151 152
};

/**
 * Class to store context info with the stored event.  Subclasses should
 * derive from this base class.
 */
class DispatchContext {
153 154 155
public:
  virtual ~DispatchContext()
  {}
羽飞's avatar
羽飞 已提交
156 157
};

158 159
}  // namespace common
#endif  // __COMMON_SEDA_EVENT_DISPATCHER_H__