pipe.c 11.6 KB
Newer Older
1
/*
2
 * Copyright (c) 2006-2021, RT-Thread Development Team
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5 6 7 8
 *
 * Change Logs:
 * Date           Author       Notes
 * 2012-09-30     Bernard      first version.
9
 * 2017-11-08     JasonJiaJie  fix memory leak issue when close a pipe.
10 11 12
 */
#include <rthw.h>
#include <rtdevice.h>
13
#include <stdint.h>
B
bernard 已提交
14 15

#if defined(RT_USING_POSIX)
B
bernard 已提交
16 17
#include <dfs_file.h>
#include <dfs_posix.h>
B
bernard 已提交
18
#include <dfs_poll.h>
19

B
bernard 已提交
20
static int pipe_fops_open(struct dfs_fd *fd)
21
{
B
Bernard Xiong 已提交
22
    int rc = 0;
B
bernard 已提交
23 24
    rt_device_t device;
    rt_pipe_t *pipe;
25

B
bernard 已提交
26 27
    pipe = (rt_pipe_t *)fd->data;
    if (!pipe) return -1;
28

B
bernard 已提交
29 30
    device = &(pipe->parent);
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
31

B
bernard 已提交
32 33
    if (device->ref_count == 0)
    {
34
        pipe->fifo = rt_ringbuffer_create(pipe->bufsz);
35 36 37 38 39
        if (pipe->fifo == RT_NULL)
        {
            rc = -RT_ENOMEM;
            goto __exit;
        }
B
bernard 已提交
40
    }
41

B
bernard 已提交
42 43
    switch (fd->flags & O_ACCMODE)
    {
mysterywolf's avatar
mysterywolf 已提交
44 45 46 47 48 49 50 51 52 53
        case O_RDONLY:
            pipe->readers ++;
            break;
        case O_WRONLY:
            pipe->writers ++;
            break;
        case O_RDWR:
            pipe->readers ++;
            pipe->writers ++;
            break;
54
    }
B
bernard 已提交
55 56
    device->ref_count ++;

57
__exit:
B
bernard 已提交
58 59
    rt_mutex_release(&(pipe->lock));

60
    return rc;
61 62
}

B
bernard 已提交
63
static int pipe_fops_close(struct dfs_fd *fd)
64
{
B
bernard 已提交
65 66
    rt_device_t device;
    rt_pipe_t *pipe;
67

B
bernard 已提交
68 69
    pipe = (rt_pipe_t *)fd->data;
    if (!pipe) return -1;
70

B
bernard 已提交
71 72 73 74
    device = &(pipe->parent);
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);

    switch (fd->flags & O_ACCMODE)
75
    {
mysterywolf's avatar
mysterywolf 已提交
76 77 78 79 80 81 82 83 84 85
        case O_RDONLY:
            pipe->readers --;
            break;
        case O_WRONLY:
            pipe->writers --;
            break;
        case O_RDWR:
            pipe->readers --;
            pipe->writers --;
            break;
B
bernard 已提交
86
    }
87

B
bernard 已提交
88 89 90 91
    if (pipe->writers == 0)
    {
        rt_wqueue_wakeup(&(pipe->reader_queue), (void*)(POLLIN | POLLERR | POLLHUP));
    }
92

B
bernard 已提交
93 94 95 96
    if (pipe->readers == 0)
    {
        rt_wqueue_wakeup(&(pipe->writer_queue), (void*)(POLLOUT | POLLERR | POLLHUP));
    }
97

B
bernard 已提交
98 99
    if (device->ref_count == 1)
    {
100 101
        if (pipe->fifo != RT_NULL)
            rt_ringbuffer_destroy(pipe->fifo);
B
bernard 已提交
102
        pipe->fifo = RT_NULL;
103
    }
B
bernard 已提交
104 105 106 107
    device->ref_count --;

    rt_mutex_release(&(pipe->lock));

108 109 110 111 112 113
    if (device->ref_count == 0 && pipe->is_named == RT_FALSE)
    {
        /* delete the unamed pipe */
        rt_pipe_delete(device->parent.name);
    }

B
bernard 已提交
114 115 116
    return 0;
}

B
bernard 已提交
117
static int pipe_fops_ioctl(struct dfs_fd *fd, int cmd, void *args)
B
bernard 已提交
118 119 120 121 122 123 124 125
{
    rt_pipe_t *pipe;
    int ret = 0;

    pipe = (rt_pipe_t *)fd->data;

    switch (cmd)
    {
mysterywolf's avatar
mysterywolf 已提交
126 127 128 129 130 131 132 133 134
        case FIONREAD:
            *((int*)args) = rt_ringbuffer_data_len(pipe->fifo);
            break;
        case FIONWRITE:
            *((int*)args) = rt_ringbuffer_space_len(pipe->fifo);
            break;
        default:
            ret = -EINVAL;
            break;
B
bernard 已提交
135 136 137 138 139
    }

    return ret;
}

B
bernard 已提交
140
static int pipe_fops_read(struct dfs_fd *fd, void *buf, size_t count)
B
bernard 已提交
141 142 143
{
    int len = 0;
    rt_pipe_t *pipe;
144

B
bernard 已提交
145
    pipe = (rt_pipe_t *)fd->data;
146

B
bernard 已提交
147 148 149
    /* no process has the pipe open for writing, return end-of-file */
    if (pipe->writers == 0)
        return 0;
150

B
bernard 已提交
151 152 153 154 155
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);

    while (1)
    {
        if (pipe->writers == 0)
156
        {
B
bernard 已提交
157 158 159 160
            goto out;
        }

        len = rt_ringbuffer_get(pipe->fifo, buf, count);
161

B
bernard 已提交
162 163 164
        if (len > 0)
        {
            break;
165 166 167
        }
        else
        {
B
bernard 已提交
168 169 170 171 172 173 174 175 176 177
            if (fd->flags & O_NONBLOCK)
            {
                len = -EAGAIN;
                goto out;
            }

            rt_mutex_release(&pipe->lock);
            rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT);
            rt_wqueue_wait(&(pipe->reader_queue), 0, -1);
            rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
178
        }
B
bernard 已提交
179 180 181 182
    }

    /* wakeup writer */
    rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT);
183

B
bernard 已提交
184 185 186
out:
    rt_mutex_release(&pipe->lock);
    return len;
187 188
}

B
bernard 已提交
189
static int pipe_fops_write(struct dfs_fd *fd, const void *buf, size_t count)
190
{
B
bernard 已提交
191 192 193 194 195 196 197
    int len;
    rt_pipe_t *pipe;
    int wakeup = 0;
    int ret = 0;
    uint8_t *pbuf;

    pipe = (rt_pipe_t *)fd->data;
198

B
bernard 已提交
199
    if (pipe->readers == 0)
200
    {
B
bernard 已提交
201 202 203
        ret = -EPIPE;
        goto out;
    }
204

B
bernard 已提交
205 206
    if (count == 0)
        return 0;
207

B
bernard 已提交
208 209
    pbuf = (uint8_t*)buf;
    rt_mutex_take(&pipe->lock, -1);
210

B
bernard 已提交
211 212 213 214 215 216 217 218
    while (1)
    {
        if (pipe->readers == 0)
        {
            if (ret == 0)
                ret = -EPIPE;
            break;
        }
219

B
bernard 已提交
220 221 222 223
        len = rt_ringbuffer_put(pipe->fifo, pbuf, count - ret);
        ret +=  len;
        pbuf += len;
        wakeup = 1;
224

B
bernard 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
        if (ret == count)
        {
            break;
        }
        else
        {
            if (fd->flags & O_NONBLOCK)
            {
                if (ret == 0)
                {
                    ret = -EAGAIN;
                }

                break;
            }
        }
241

B
bernard 已提交
242 243 244 245 246 247 248
        rt_mutex_release(&pipe->lock);
        rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN);
        /* pipe full, waiting on suspended write list */
        rt_wqueue_wait(&(pipe->writer_queue), 0, -1);
        rt_mutex_take(&pipe->lock, -1);
    }
    rt_mutex_release(&pipe->lock);
249

B
bernard 已提交
250
    if (wakeup)
251
    {
B
bernard 已提交
252 253
        rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN);
    }
254

B
bernard 已提交
255 256 257
out:
    return ret;
}
258

B
bernard 已提交
259
static int pipe_fops_poll(struct dfs_fd *fd, rt_pollreq_t *req)
B
bernard 已提交
260 261 262 263 264
{
    int mask = 0;
    rt_pipe_t *pipe;
    int mode = 0;
    pipe = (rt_pipe_t *)fd->data;
265

B
bernard 已提交
266 267
    rt_poll_add(&(pipe->reader_queue), req);
    rt_poll_add(&(pipe->writer_queue), req);
268

B
bernard 已提交
269 270 271 272 273 274 275 276 277 278 279 280
    switch (fd->flags & O_ACCMODE)
    {
    case O_RDONLY:
        mode = 1;
        break;
    case O_WRONLY:
        mode = 2;
        break;
    case O_RDWR:
        mode = 3;
        break;
    }
281

B
bernard 已提交
282 283 284 285 286 287 288 289 290 291 292
    if (mode & 1)
    {
        if (rt_ringbuffer_data_len(pipe->fifo) != 0)
        {
            mask |= POLLIN;
        }
        if (pipe->writers == 0)
        {
            mask |= POLLHUP;
        }
    }
293

B
bernard 已提交
294 295 296
    if (mode & 2)
    {
        if (rt_ringbuffer_space_len(pipe->fifo) != 0)
297
        {
B
bernard 已提交
298
            mask |= POLLOUT;
299
        }
B
bernard 已提交
300
        if (pipe->readers == 0)
301
        {
B
bernard 已提交
302
            mask |= POLLERR;
303
        }
B
bernard 已提交
304
    }
305

B
bernard 已提交
306
    return mask;
307 308
}

B
bernard 已提交
309
static const struct dfs_file_ops pipe_fops =
310
{
B
bernard 已提交
311 312 313 314 315
    pipe_fops_open,
    pipe_fops_close,
    pipe_fops_ioctl,
    pipe_fops_read,
    pipe_fops_write,
B
bernard 已提交
316 317 318
    RT_NULL,
    RT_NULL,
    RT_NULL,
B
bernard 已提交
319
    pipe_fops_poll,
B
bernard 已提交
320
};
321 322
#endif /* end of RT_USING_POSIX */

whj123999's avatar
whj123999 已提交
323
rt_err_t  rt_pipe_open(rt_device_t device, rt_uint16_t oflag)
324 325
{
    rt_pipe_t *pipe = (rt_pipe_t *)device;
D
David Lin 已提交
326
    rt_err_t ret = RT_EOK;
327

D
David Lin 已提交
328 329 330 331 332
    if (device == RT_NULL)
    {
        ret = -RT_EINVAL;
        goto __exit;
    }
333

334 335 336 337 338
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);

    if (pipe->fifo == RT_NULL)
    {
        pipe->fifo = rt_ringbuffer_create(pipe->bufsz);
D
David Lin 已提交
339 340
        if (pipe->fifo == RT_NULL)
        {
D
David Lin 已提交
341
            ret = -RT_ENOMEM;
D
David Lin 已提交
342
        }
343 344 345 346
    }

    rt_mutex_release(&(pipe->lock));

D
David Lin 已提交
347 348
__exit:
    return ret;
349 350
}

whj123999's avatar
whj123999 已提交
351
rt_err_t  rt_pipe_close(rt_device_t device)
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
{
    rt_pipe_t *pipe = (rt_pipe_t *)device;

    if (device == RT_NULL) return -RT_EINVAL;
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);

    if (device->ref_count == 1)
    {
        rt_ringbuffer_destroy(pipe->fifo);
        pipe->fifo = RT_NULL;
    }

    rt_mutex_release(&(pipe->lock));

    return RT_EOK;
}
B
bernard 已提交
368

whj123999's avatar
whj123999 已提交
369
rt_size_t rt_pipe_read(rt_device_t device, rt_off_t pos, void *buffer, rt_size_t count)
370 371
{
    uint8_t *pbuf;
372
    rt_size_t read_bytes = 0;
373 374
    rt_pipe_t *pipe = (rt_pipe_t *)device;

375 376 377 378 379
    if (device == RT_NULL)
    {
        rt_set_errno(-EINVAL);
        return 0;
    }
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
    if (count == 0) return 0;

    pbuf = (uint8_t*)buffer;
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);

    while (read_bytes < count)
    {
        int len = rt_ringbuffer_get(pipe->fifo, &pbuf[read_bytes], count - read_bytes);
        if (len <= 0) break;

        read_bytes += len;
    }
    rt_mutex_release(&pipe->lock);

    return read_bytes;
}

whj123999's avatar
whj123999 已提交
397
rt_size_t rt_pipe_write(rt_device_t device, rt_off_t pos, const void *buffer, rt_size_t count)
398 399
{
    uint8_t *pbuf;
400
    rt_size_t write_bytes = 0;
401 402
    rt_pipe_t *pipe = (rt_pipe_t *)device;

403 404 405 406 407
    if (device == RT_NULL)
    {
        rt_set_errno(-EINVAL);
        return 0;
    }
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
    if (count == 0) return 0;

    pbuf = (uint8_t*)buffer;
    rt_mutex_take(&pipe->lock, -1);

    while (write_bytes < count)
    {
        int len = rt_ringbuffer_put(pipe->fifo, &pbuf[write_bytes], count - write_bytes);
        if (len <= 0) break;

        write_bytes += len;
    }
    rt_mutex_release(&pipe->lock);

    return write_bytes;
}

rt_err_t  rt_pipe_control(rt_device_t dev, int cmd, void *args)
{
    return RT_EOK;
}

B
Bernard Xiong 已提交
430
#ifdef RT_USING_DEVICE_OPS
431
const static struct rt_device_ops pipe_ops =
B
Bernard Xiong 已提交
432 433 434 435 436 437 438 439 440 441
{
    RT_NULL,
    rt_pipe_open,
    rt_pipe_close,
    rt_pipe_read,
    rt_pipe_write,
    rt_pipe_control,
};
#endif

442
rt_pipe_t *rt_pipe_create(const char *name, int bufsz)
443
{
B
bernard 已提交
444 445
    rt_pipe_t *pipe;
    rt_device_t dev;
446

447
    pipe = (rt_pipe_t *)rt_malloc(sizeof(rt_pipe_t));
B
bernard 已提交
448
    if (pipe == RT_NULL) return RT_NULL;
449

B
bernard 已提交
450
    rt_memset(pipe, 0, sizeof(rt_pipe_t));
451
    pipe->is_named = RT_TRUE; /* initialize as a named pipe */
B
bernard 已提交
452
    rt_mutex_init(&(pipe->lock), name, RT_IPC_FLAG_FIFO);
453 454
    rt_wqueue_init(&(pipe->reader_queue));
    rt_wqueue_init(&(pipe->writer_queue));
455

456 457 458
    RT_ASSERT(bufsz < 0xFFFF);
    pipe->bufsz = bufsz;

B
bernard 已提交
459 460
    dev = &(pipe->parent);
    dev->type = RT_Device_Class_Pipe;
B
Bernard Xiong 已提交
461 462 463
#ifdef RT_USING_DEVICE_OPS
    dev->ops         = &pipe_ops;
#else
464 465 466 467 468 469
    dev->init        = RT_NULL;
    dev->open        = rt_pipe_open;
    dev->read        = rt_pipe_read;
    dev->write       = rt_pipe_write;
    dev->close       = rt_pipe_close;
    dev->control     = rt_pipe_control;
B
Bernard Xiong 已提交
470
#endif
471 472 473

    dev->rx_indicate = RT_NULL;
    dev->tx_complete = RT_NULL;
474

B
bernard 已提交
475 476 477 478 479
    if (rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR | RT_DEVICE_FLAG_REMOVABLE) != 0)
    {
        rt_free(pipe);
        return RT_NULL;
    }
480
#ifdef RT_USING_POSIX
B
bernard 已提交
481
    dev->fops = (void*)&pipe_fops;
482
#endif
483

B
bernard 已提交
484
    return pipe;
485 486
}

B
bernard 已提交
487
int rt_pipe_delete(const char *name)
488
{
B
bernard 已提交
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
    int result = 0;
    rt_device_t device;

    device = rt_device_find(name);
    if (device)
    {
        if (device->type == RT_Device_Class_Pipe)
        {
            rt_pipe_t *pipe;

            if (device->ref_count != 0)
            {
                return -RT_EBUSY;
            }

            pipe = (rt_pipe_t *)device;

            rt_mutex_detach(&(pipe->lock));
            rt_device_unregister(device);

509
            /* close fifo ringbuffer */
510
            if (pipe->fifo)
511 512 513 514
            {
                rt_ringbuffer_destroy(pipe->fifo);
                pipe->fifo = RT_NULL;
            }
B
bernard 已提交
515 516 517 518
            rt_free(pipe);
        }
        else
        {
519
            result = -ENODEV;
B
bernard 已提交
520 521 522 523
        }
    }
    else
    {
524
        result = -ENODEV;
B
bernard 已提交
525 526 527
    }

    return result;
528 529
}

530
#ifdef RT_USING_POSIX
B
bernard 已提交
531
int pipe(int fildes[2])
532
{
B
bernard 已提交
533
    rt_pipe_t *pipe;
whj123999's avatar
whj123999 已提交
534 535
    char dname[RT_NAME_MAX];
    char dev_name[RT_NAME_MAX * 4];
B
bernard 已提交
536
    static int pipeno = 0;
537

B
bernard 已提交
538 539
    rt_snprintf(dname, sizeof(dname), "pipe%d", pipeno++);

540
    pipe = rt_pipe_create(dname, PIPE_BUFSZ);
541
    if (pipe == RT_NULL)
B
bernard 已提交
542 543 544
    {
        return -1;
    }
545

546
    pipe->is_named = RT_FALSE; /* unamed pipe */
B
bernard 已提交
547 548 549
    rt_snprintf(dev_name, sizeof(dev_name), "/dev/%s", dname);
    fildes[0] = open(dev_name, O_RDONLY, 0);
    if (fildes[0] < 0)
550
    {
B
bernard 已提交
551
        return -1;
552
    }
553

B
bernard 已提交
554 555 556
    fildes[1] = open(dev_name, O_WRONLY, 0);
    if (fildes[1] < 0)
    {
557
        close(fildes[0]);
B
bernard 已提交
558 559 560 561
        return -1;
    }

    return 0;
562 563
}

B
bernard 已提交
564
int mkfifo(const char *path, mode_t mode)
565
{
B
bernard 已提交
566
    rt_pipe_t *pipe;
567

568
    pipe = rt_pipe_create(path, PIPE_BUFSZ);
569
    if (pipe == RT_NULL)
B
bernard 已提交
570 571 572
    {
        return -1;
    }
573

B
bernard 已提交
574
    return 0;
575
}
B
bernard 已提交
576
#endif