ioevent_loop.c 5.4 KB
Newer Older
Y
yuqing 已提交
1 2 3 4
#include "sched_thread.h"
#include "logger.h"
#include "ioevent_loop.h"

Y
yuqing 已提交
5
static void deal_ioevents(IOEventPoller *ioevent)
Y
yuqing 已提交
6 7 8 9
{
	int event;
	IOEventEntry *pEntry;

10 11
	for (ioevent->iterator.index=0; ioevent->iterator.index < ioevent->
            iterator.count; ioevent->iterator.index++)
Y
yuqing 已提交
12 13 14 15 16 17 18
	{
		event = IOEVENT_GET_EVENTS(ioevent, ioevent->iterator.index);
		pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent,
                ioevent->iterator.index);
        if (pEntry != NULL) {
            pEntry->callback(pEntry->fd, event, pEntry->timer.data);
        }
Y
yuqing 已提交
19 20
        else {
            logDebug("file: "__FILE__", line: %d, "
Y
YuQing 已提交
21
                    "ignore ioevent : %d, index: %d",
22
                    __LINE__, event, ioevent->iterator.index);
Y
yuqing 已提交
23
        }
Y
yuqing 已提交
24 25 26
	}
}

Y
yuqing 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
int ioevent_remove(IOEventPoller *ioevent, void *data)
{
	IOEventEntry *pEntry;
    int index;

    if (ioevent->iterator.index >= ioevent->iterator.count)
    {
        return ENOENT;
    }

    pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent,
            ioevent->iterator.index);
    if (pEntry != NULL && pEntry->timer.data == data) {
        return 0;  //do NOT clear current entry
    }

    for (index=ioevent->iterator.index + 1; index < ioevent->iterator.count;
            index++)
    {
        pEntry = (IOEventEntry *)IOEVENT_GET_DATA(ioevent, index);
        if (pEntry != NULL && pEntry->timer.data == data) {
Y
yuqing 已提交
48
            logDebug("file: "__FILE__", line: %d, "
Y
YuQing 已提交
49
                    "clear ioevent data: %p", __LINE__, data);
Y
yuqing 已提交
50 51 52 53 54 55 56 57
            IOEVENT_CLEAR_DATA(ioevent, index);
            return 0;
        }
    }

    return ENOENT;
}

Y
yuqing 已提交
58 59 60
static void deal_timeouts(FastTimerEntry *head)
{
	FastTimerEntry *entry;
Y
yuqing 已提交
61
	FastTimerEntry *current;
Y
yuqing 已提交
62 63 64 65 66
	IOEventEntry *pEventEntry;

	entry = head->next;
	while (entry != NULL)
	{
Y
yuqing 已提交
67
		current = entry;
Y
yuqing 已提交
68 69
		entry = entry->next;

70
        current->prev = current->next = NULL; //must set NULL because NOT in time wheel
Y
yuqing 已提交
71
		pEventEntry = (IOEventEntry *)current->data;
Y
yuqing 已提交
72 73 74
		if (pEventEntry != NULL)
		{
			pEventEntry->callback(pEventEntry->fd, IOEVENT_TIMEOUT,
Y
yuqing 已提交
75
						current->data);
Y
yuqing 已提交
76 77 78 79 80
		}
	}
}

int ioevent_loop(struct nio_thread_data *pThreadData,
81
	IOEventCallback recv_notify_callback, TaskCleanUpCallback
Y
yuqing 已提交
82 83 84 85 86
	clean_up_callback, volatile bool *continue_flag)
{
	int result;
	IOEventEntry ev_notify;
	FastTimerEntry head;
87
	struct fast_task_info *task;
Y
yuqing 已提交
88 89 90 91
	time_t last_check_time;
	int count;

	memset(&ev_notify, 0, sizeof(ev_notify));
Y
YuQing 已提交
92
	ev_notify.fd = FC_NOTIFY_READ_FD(pThreadData);
Y
yuqing 已提交
93
	ev_notify.callback = recv_notify_callback;
94
	ev_notify.timer.data = pThreadData;
Y
yuqing 已提交
95 96 97 98 99 100 101 102 103 104 105 106
	if (ioevent_attach(&pThreadData->ev_puller,
		pThreadData->pipe_fds[0], IOEVENT_READ,
		&ev_notify) != 0)
	{
		result = errno != 0 ? errno : ENOMEM;
		logCrit("file: "__FILE__", line: %d, " \
			"ioevent_attach fail, " \
			"errno: %d, error info: %s", \
			__LINE__, result, STRERROR(result));
		return result;
	}

Y
yuqing 已提交
107
    pThreadData->deleted_list = NULL;
Y
yuqing 已提交
108 109 110
	last_check_time = g_current_time;
	while (*continue_flag)
	{
111 112
		pThreadData->ev_puller.iterator.count = ioevent_poll(
                &pThreadData->ev_puller);
Y
yuqing 已提交
113
		if (pThreadData->ev_puller.iterator.count > 0)
Y
yuqing 已提交
114
		{
Y
yuqing 已提交
115
			deal_ioevents(&pThreadData->ev_puller);
Y
yuqing 已提交
116
		}
Y
yuqing 已提交
117
		else if (pThreadData->ev_puller.iterator.count < 0)
Y
yuqing 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
		{
			result = errno != 0 ? errno : EINVAL;
			if (result != EINTR)
			{
				logError("file: "__FILE__", line: %d, " \
					"ioevent_poll fail, " \
					"errno: %d, error info: %s", \
					__LINE__, result, STRERROR(result));
				return result;
			}
		}

		if (pThreadData->deleted_list != NULL)
		{
			count = 0;
			while (pThreadData->deleted_list != NULL)
			{
135 136
				task = pThreadData->deleted_list;
				pThreadData->deleted_list = task->next;
Y
yuqing 已提交
137

138
				clean_up_callback(task);
Y
yuqing 已提交
139 140
				count++;
			}
141
			//logInfo("cleanup task count: %d", count);
Y
yuqing 已提交
142 143 144 145 146 147 148 149 150 151 152 153
		}

		if (g_current_time - last_check_time > 0)
		{
			last_check_time = g_current_time;
			count = fast_timer_timeouts_get(
				&pThreadData->timer, g_current_time, &head);
			if (count > 0)
			{
				deal_timeouts(&head);
			}
		}
154

155 156
        if (pThreadData->notify.enabled)
        {
Y
YuQing 已提交
157
            int64_t n;
158 159
            if ((n=__sync_fetch_and_add(&pThreadData->notify.counter, 0)) != 0)
            {
Y
YuQing 已提交
160
                __sync_fetch_and_sub(&pThreadData->notify.counter, n);
161 162 163 164 165 166
                /*
                logInfo("file: "__FILE__", line: %d, "
                        "n ==== %"PRId64", now: %"PRId64,
                        __LINE__, n, __sync_fetch_and_add(
                            &pThreadData->notify.counter, 0));
                            */
Y
YuQing 已提交
167 168
            }
        }
169 170 171

        if (pThreadData->thread_loop_callback != NULL)
        {
172 173
            pThreadData->thread_loop_callback(pThreadData);
        }
Y
yuqing 已提交
174 175 176 177 178
	}

	return 0;
}

179
int ioevent_set(struct fast_task_info *task, struct nio_thread_data *pThread,
Y
yuqing 已提交
180 181 182 183
	int sock, short event, IOEventCallback callback, const int timeout)
{
	int result;

184 185 186
	task->thread_data = pThread;
	task->event.fd = sock;
	task->event.callback = callback;
Y
yuqing 已提交
187
	if (ioevent_attach(&pThread->ev_puller,
188
		sock, event, task) < 0)
Y
yuqing 已提交
189 190 191 192 193 194 195 196 197
	{
		result = errno != 0 ? errno : ENOENT;
		logError("file: "__FILE__", line: %d, " \
			"ioevent_attach fail, " \
			"errno: %d, error info: %s", \
			__LINE__, result, STRERROR(result));
		return result;
	}

198 199 200
	task->event.timer.data = task;
	task->event.timer.expires = g_current_time + timeout;
	result = fast_timer_add(&pThread->timer, &task->event.timer);
Y
yuqing 已提交
201 202 203 204 205 206 207 208 209 210 211 212
	if (result != 0)
	{
		logError("file: "__FILE__", line: %d, " \
			"fast_timer_add fail, " \
			"errno: %d, error info: %s", \
			__LINE__, result, STRERROR(result));
		return result;
	}

	return 0;
}