/* * Copyright (c) 2021 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "le_task.h" #include #include #include "le_loop.h" static void DoAsyncEvent_(const LoopHandle loopHandle, AsyncEventTask *asyncTask) { LE_CHECK(loopHandle != NULL && asyncTask != NULL, return, "Invalid parameters"); LE_Buffer *buffer = GetFirstBuffer(&asyncTask->stream); while (buffer != NULL) { uint64_t eventId = *(uint64_t*)(buffer->data); if (asyncTask->processAsyncEvent) { asyncTask->processAsyncEvent((TaskHandle)asyncTask, eventId, (uint8_t *)(buffer->data + sizeof(uint64_t)), buffer->dataSize); } FreeBuffer(loopHandle, &asyncTask->stream, buffer); buffer = GetFirstBuffer(&asyncTask->stream); } } #ifdef STARTUP_INIT_TEST void LE_DoAsyncEvent(const LoopHandle loopHandle, const TaskHandle taskHandle) { DoAsyncEvent_(loopHandle, (AsyncEventTask *)taskHandle); } #endif static LE_STATUS HandleAsyncEvent_(const LoopHandle loopHandle, const TaskHandle taskHandle, uint32_t oper) { LE_LOGV("HandleAsyncEvent_ fd: %d oper 0x%x", GetSocketFd(taskHandle), oper); EventLoop *loop = (EventLoop *)loopHandle; AsyncEventTask *asyncTask = (AsyncEventTask *)taskHandle; if (LE_TEST_FLAGS(oper, Event_Read)) { uint64_t eventId = 0; int ret = read(GetSocketFd(taskHandle), &eventId, sizeof(eventId)); LE_LOGV("HandleAsyncEvent_ read fd:%d ret: %d eventId %llu", GetSocketFd(taskHandle), ret, eventId); DoAsyncEvent_(loopHandle, asyncTask); if (IsBufferEmpty(&asyncTask->stream)) { loop->modEvent(loop, (const BaseTask *)taskHandle, Event_Read); return LE_SUCCESS; } } else { static uint64_t eventId = 0; (void)write(GetSocketFd(taskHandle), &eventId, sizeof(eventId)); loop->modEvent(loop, (const BaseTask *)taskHandle, Event_Read); eventId++; } return LE_SUCCESS; } static void HandleAsyncTaskClose_(const LoopHandle loopHandle, const TaskHandle taskHandle) { BaseTask *task = (BaseTask *)taskHandle; CloseTask(loopHandle, task); close(task->taskId.fd); } LE_STATUS LE_CreateAsyncTask(const LoopHandle loopHandle, TaskHandle *taskHandle, LE_ProcessAsyncEvent processAsyncEvent) { LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); LE_CHECK(processAsyncEvent != NULL, return LE_INVALID_PARAM, "Invalid parameters processAsyncEvent "); int fd = eventfd(1, EFD_NONBLOCK | EFD_CLOEXEC); LE_CHECK(fd > 0, return LE_FAILURE, "Failed to event fd "); LE_BaseInfo baseInfo = {TASK_EVENT | TASK_ASYNC_EVENT, NULL}; AsyncEventTask *task = (AsyncEventTask *)CreateTask(loopHandle, fd, &baseInfo, sizeof(AsyncEventTask)); LE_CHECK(task != NULL, close(fd); return LE_NO_MEMORY, "Failed to create task"); task->stream.base.handleEvent = HandleAsyncEvent_; task->stream.base.innerClose = HandleAsyncTaskClose_; ListInit(&task->stream.buffHead); LoopMutexInit(&task->stream.mutex); task->processAsyncEvent = processAsyncEvent; EventLoop *loop = (EventLoop *)loopHandle; loop->addEvent(loop, (const BaseTask *)task, Event_Read); *taskHandle = (TaskHandle)task; return LE_SUCCESS; } LE_STATUS LE_StartAsyncEvent(const LoopHandle loopHandle, const TaskHandle taskHandle, uint64_t eventId, const uint8_t *data, uint32_t buffLen) { LE_CHECK(loopHandle != NULL && taskHandle != NULL, return LE_INVALID_PARAM, "Invalid parameters"); BufferHandle handle = LE_CreateBuffer(loopHandle, buffLen + 1 + sizeof(eventId)); char *buff = (char *)LE_GetBufferInfo(handle, NULL, NULL); int ret = memcpy_s(buff, sizeof(eventId), &eventId, sizeof(eventId)); LE_CHECK(ret == 0, return -1, "Failed to copy data"); if (data != NULL || buffLen == 0) { ret = memcpy_s(buff + sizeof(eventId), buffLen, data, buffLen); LE_CHECK(ret == 0, return -1, "Failed to copy data"); buff[sizeof(eventId) + buffLen] = '\0'; } return LE_Send(loopHandle, taskHandle, handle, buffLen); } void LE_StopAsyncTask(LoopHandle loopHandle, TaskHandle taskHandle) { LE_CHECK(loopHandle != NULL && taskHandle != NULL, return, "Invalid parameters"); LE_CloseTask(loopHandle, taskHandle); }