PatchworkOS  3984a1d
A non-POSIX operating system.
Loading...
Searching...
No Matches
pipe.c
Go to the documentation of this file.
1#include <kernel/fs/file.h>
2#include <kernel/fs/path.h>
3#include <kernel/fs/vfs.h>
4#include <kernel/log/log.h>
5#include <kernel/log/panic.h>
6#include <kernel/mem/pmm.h>
9#include <kernel/sync/lock.h>
10#include <kernel/utils/fifo.h>
11
12#include <assert.h>
13#include <stdlib.h>
14#include <sys/io.h>
15#include <sys/math.h>
16
17/**
18 * @brief Pipes.
19 * @defgroup modules_ipc_pipe Pipes
20 * @ingroup modules_ipc
21 *
22 * Pipes are exposed in the `/dev/pipe` directory. Pipes are unidirectional communication channels that can be used for
23 * inter-process communication (IPC).
24 *
25 * ## Creating Pipes
26 *
27 * Pipes are created using the `/dev/pipe/new` file. Opening this file using `open()` will return one file descriptor
28 * that can be used for both reading and writing. To create a pipe with separate file descriptors for reading and
29 * writing, use `open2()` with the `/dev/pipe/new` file.
30 *
31 * ## Using Pipes
32 *
33 * Pipes can be read from and written to using the expected `read()` and `write()` system calls. Pipes are blocking and
34 * pollable, following expected POSIX semantics.
35 *
36 * @{
37 */
38
39typedef struct
40{
41 void* buffer;
47 // Note: These pointers are just for checking which end the current file is, they should not be referenced.
48 void* readEnd;
49 void* writeEnd;
50} pipe_t;
51
54
56{
57 pipe_t* data = malloc(sizeof(pipe_t));
58 if (data == NULL)
59 {
60 return ERR;
61 }
62 data->buffer = pmm_alloc();
63 if (data->buffer == NULL)
64 {
65 free(data);
66 return ERR;
67 }
68 fifo_init(&data->ring, data->buffer, PAGE_SIZE);
69 data->isReadClosed = false;
70 data->isWriteClosed = false;
71 wait_queue_init(&data->waitQueue);
72 lock_init(&data->lock);
73 data->readEnd = file;
74 data->writeEnd = file;
75
76 file->private = data;
77 return 0;
78}
79
81{
82 pipe_t* data = malloc(sizeof(pipe_t));
83 if (data == NULL)
84 {
85 return ERR;
86 }
87 data->buffer = pmm_alloc();
88 if (data->buffer == NULL)
89 {
90 free(data);
91 return ERR;
92 }
93 fifo_init(&data->ring, data->buffer, PAGE_SIZE);
94 data->isReadClosed = false;
95 data->isWriteClosed = false;
96 wait_queue_init(&data->waitQueue);
97 lock_init(&data->lock);
98
99 data->readEnd = files[PIPE_READ];
100 data->writeEnd = files[PIPE_WRITE];
101
102 files[0]->private = data;
103 files[1]->private = data;
104 return 0;
105}
106
107static void pipe_close(file_t* file)
108{
109 pipe_t* data = file->private;
110 lock_acquire(&data->lock);
111 if (data->readEnd == file)
112 {
113 data->isReadClosed = true;
114 }
115 if (data->writeEnd == file)
116 {
117 data->isWriteClosed = true;
118 }
119
120 wait_unblock(&data->waitQueue, WAIT_ALL, EOK);
121 if (data->isWriteClosed && data->isReadClosed)
122 {
123 lock_release(&data->lock);
124 wait_queue_deinit(&data->waitQueue);
125 pmm_free(data->buffer);
126 free(data);
127 return;
128 }
129
130 lock_release(&data->lock);
131}
132
133static uint64_t pipe_read(file_t* file, void* buffer, size_t count, size_t* offset)
134{
135 UNUSED(offset);
136
137 if (count == 0)
138 {
139 return 0;
140 }
141
142 pipe_t* data = file->private;
143 if (data->readEnd != file)
144 {
145 errno = ENOSYS;
146 return ERR;
147 }
148
149 if (count >= PAGE_SIZE)
150 {
151 errno = EINVAL;
152 return ERR;
153 }
154
155 LOCK_SCOPE(&data->lock);
156
157 if (fifo_bytes_readable(&data->ring) == 0)
158 {
159 if (file->mode & MODE_NONBLOCK)
160 {
161 errno = EAGAIN;
162 return ERR;
163 }
164
165 if (WAIT_BLOCK_LOCK(&data->waitQueue, &data->lock,
166 fifo_bytes_readable(&data->ring) != 0 || data->isWriteClosed) == ERR)
167 {
168 return ERR;
169 }
170 }
171
172 uint64_t result = fifo_read(&data->ring, buffer, count);
173 wait_unblock(&data->waitQueue, WAIT_ALL, EOK);
174 return result;
175}
176
177static uint64_t pipe_write(file_t* file, const void* buffer, size_t count, size_t* offset)
178{
179 UNUSED(offset);
180
181 pipe_t* data = file->private;
182 if (data->writeEnd != file)
183 {
184 errno = ENOSYS;
185 return ERR;
186 }
187
188 if (count >= PAGE_SIZE)
189 {
190 errno = EINVAL;
191 return ERR;
192 }
193
194 LOCK_SCOPE(&data->lock);
195
196 if (fifo_bytes_writeable(&data->ring) == 0)
197 {
198 if (file->mode & MODE_NONBLOCK)
199 {
200 errno = EAGAIN;
201 return ERR;
202 }
203
204 if (WAIT_BLOCK_LOCK(&data->waitQueue, &data->lock,
205 fifo_bytes_writeable(&data->ring) != 0 || data->isReadClosed) == ERR)
206 {
207 return ERR;
208 }
209 }
210
211 if (data->isReadClosed)
212 {
213 wait_unblock(&data->waitQueue, WAIT_ALL, EOK);
214 errno = EPIPE;
215 return ERR;
216 }
217
218 uint64_t result = fifo_write(&data->ring, buffer, count);
219 wait_unblock(&data->waitQueue, WAIT_ALL, EOK);
220 return result;
221}
222
224{
225 pipe_t* data = file->private;
226 LOCK_SCOPE(&data->lock);
227
228 if (fifo_bytes_readable(&data->ring) != 0 || data->isWriteClosed)
229 {
230 *revents |= POLLIN;
231 }
232 if (fifo_bytes_writeable(&data->ring) > 0 || data->isReadClosed)
233 {
234 *revents |= POLLOUT;
235 }
236 if ((file == data->readEnd && data->isWriteClosed) || (file == data->writeEnd && data->isReadClosed))
237 {
238 *revents |= POLLHUP;
239 }
240
241 return &data->waitQueue;
242}
243
245 .open = pipe_open,
246 .open2 = pipe_open2,
247 .close = pipe_close,
248 .read = pipe_read,
249 .write = pipe_write,
250 .poll = pipe_poll,
251};
252
254{
255 pipeDir = devfs_dir_new(NULL, "pipe", NULL, NULL);
256 if (pipeDir == NULL)
257 {
258 LOG_ERR("failed to initialize pipe directory");
259 return ERR;
260 }
261
263 if (newFile == NULL)
264 {
265 UNREF(pipeDir);
266 LOG_ERR("failed to initialize pipe new file");
267 return ERR;
268 }
269
270 return 0;
271}
272
273void pipe_deinit(void)
274{
275 UNREF(newFile);
276 newFile = NULL;
277 UNREF(pipeDir);
278 pipeDir = NULL;
279}
280
281/** @} */
282
284{
285 switch (event->type)
286 {
288 if (pipe_init() == ERR)
289 {
290 return ERR;
291 }
292 break;
294 pipe_deinit();
295 break;
296 default:
297 break;
298 }
299
300 return 0;
301}
302
303MODULE_INFO("Pipes", "Kai Norberg", "Implements pipes for inter-process communication", OS_VERSION, "MIT",
304 "BOOT_ALWAYS");
int64_t y
Definition main.c:153
static fd_t data
Definition dwm.c:21
dentry_t * devfs_dir_new(dentry_t *parent, const char *name, const inode_ops_t *inodeOps, void *private)
Create a new directory inside a mounted devfs instance.
Definition devfs.c:86
dentry_t * devfs_file_new(dentry_t *parent, const char *name, const inode_ops_t *inodeOps, const file_ops_t *fileOps, void *private)
Create a new file inside a mounted devfs instance.
Definition devfs.c:125
@ MODE_NONBLOCK
Definition path.h:83
#define LOG_ERR(format,...)
Definition log.h:93
void pmm_free(void *address)
Frees a single physical page.
Definition pmm.c:220
void * pmm_alloc(void)
Allocates a single physical page.
Definition pmm.c:171
#define MODULE_INFO(_name, _author, _description, _version, _licence, _deviceTypes)
Macro to define module information.
Definition module.h:200
@ MODULE_EVENT_LOAD
Definition module.h:239
@ MODULE_EVENT_UNLOAD
Definition module.h:245
uint64_t wait_unblock(wait_queue_t *queue, uint64_t amount, errno_t err)
Unblock threads waiting on a wait queue.
Definition wait.c:296
void wait_queue_deinit(wait_queue_t *queue)
Deinitialize wait queue.
Definition wait.c:44
#define WAIT_ALL
Used to indicate that the wait should unblock all waiting threads.
Definition wait.h:40
void wait_queue_init(wait_queue_t *queue)
Initialize wait queue.
Definition wait.c:38
#define WAIT_BLOCK_LOCK(queue, lock, condition)
Blocks until the condition is true, condition will be tested on every wakeup. Will release the lock b...
Definition wait.h:106
static void lock_init(lock_t *lock)
Initializes a lock.
Definition lock.h:87
#define LOCK_SCOPE(lock)
Acquires a lock for the reminder of the current scope.
Definition lock.h:58
static void lock_release(lock_t *lock)
Releases a lock.
Definition lock.h:184
static void lock_acquire(lock_t *lock)
Acquires a lock, blocking until it is available.
Definition lock.h:104
void fifo_init(fifo_t *fifo, uint8_t *buffer, size_t size)
Initialize a fifo buffer.
Definition fifo.c:7
size_t fifo_bytes_writeable(const fifo_t *fifo)
Return the number of bytes available for writing in a fifo buffer.
Definition fifo.c:31
size_t fifo_bytes_readable(const fifo_t *fifo)
Return the number of bytes available for reading in a fifo buffer.
Definition fifo.c:21
size_t fifo_write(fifo_t *fifo, const void *buffer, size_t count)
Write data to the fifo buffer.
Definition fifo.c:73
size_t fifo_read(fifo_t *fifo, void *buffer, size_t count)
Read data from a fifo buffer at a specific offset.
Definition fifo.c:41
#define UNREF(ptr)
Decrement reference count.
Definition ref.h:95
#define EINVAL
Invalid argument.
Definition errno.h:142
#define ENOSYS
Function not implemented.
Definition errno.h:222
#define EPIPE
Broken pipe.
Definition errno.h:192
#define errno
Error number variable.
Definition errno.h:27
#define EOK
No error.
Definition errno.h:32
#define EAGAIN
Try again.
Definition errno.h:87
#define UNUSED(x)
Mark a variable as unused.
Definition defs.h:100
#define PIPE_READ
Pipe read end.
Definition io.h:45
#define PIPE_WRITE
Pipe write end.
Definition io.h:54
poll_events_t
Poll events type.
Definition io.h:285
@ POLLIN
File descriptor is ready to read.
Definition io.h:287
@ POLLHUP
Stream socket peer closed connection, or shut down writing of connection.
Definition io.h:290
@ POLLOUT
File descriptor is ready to write.
Definition io.h:288
#define PAGE_SIZE
The size of a memory page in bytes.
Definition proc.h:103
#define NULL
Pointer error value.
Definition NULL.h:23
#define ERR
Integer error value.
Definition ERR.h:17
static uint64_t pipe_open(file_t *file)
Definition pipe.c:55
static dentry_t * pipeDir
Definition pipe.c:52
static void pipe_close(file_t *file)
Definition pipe.c:107
uint64_t pipe_init(void)
Definition pipe.c:253
static uint64_t pipe_read(file_t *file, void *buffer, size_t count, size_t *offset)
Definition pipe.c:133
static dentry_t * newFile
Definition pipe.c:53
static uint64_t pipe_write(file_t *file, const void *buffer, size_t count, size_t *offset)
Definition pipe.c:177
void pipe_deinit(void)
Definition pipe.c:273
static uint64_t pipe_open2(file_t *files[2])
Definition pipe.c:80
static file_ops_t fileOps
Definition pipe.c:244
static wait_queue_t * pipe_poll(file_t *file, poll_events_t *revents)
Definition pipe.c:223
static uint64_t offset
Definition screen.c:19
static list_t files
Definition file.c:9
EFI_PHYSICAL_ADDRESS buffer
Definition mem.c:15
uint64_t _module_procedure(const module_event_t *event)
Definition pipe.c:283
static atomic_long count
Definition main.c:11
__UINT64_TYPE__ uint64_t
Definition stdint.h:17
_PUBLIC void * malloc(size_t size)
Definition malloc.c:5
_PUBLIC void free(void *ptr)
Definition free.c:11
Directory entry structure.
Definition dentry.h:153
FIFO Buffer.
Definition fifo.h:20
File operations structure.
Definition file.h:54
uint64_t(* open)(file_t *file)
Definition file.h:55
File structure.
Definition file.h:39
mode_t mode
Definition file.h:42
void * private
Definition file.h:46
A simple ticket lock implementation.
Definition lock.h:44
module_event_type_t type
Definition module.h:268
Definition pipe.c:40
void * readEnd
Definition pipe.c:48
fifo_t ring
Definition pipe.c:42
bool isWriteClosed
Definition pipe.c:44
void * writeEnd
Definition pipe.c:49
wait_queue_t waitQueue
Definition pipe.c:45
bool isReadClosed
Definition pipe.c:43
lock_t lock
Definition pipe.c:46
void * buffer
Definition pipe.c:41
The primitive that threads block on.
Definition wait.h:182