PatchworkOS  da8a090
A non-POSIX operating system.
Loading...
Searching...
No Matches
wait.c
Go to the documentation of this file.
1#include <kernel/cpu/irq.h>
2#include <kernel/sched/wait.h>
3
4#include <kernel/cpu/cpu.h>
5#include <kernel/log/log.h>
6#include <kernel/log/panic.h>
10#include <kernel/sched/timer.h>
11#include <kernel/sync/lock.h>
12
13#include <assert.h>
14#include <errno.h>
15#include <stdatomic.h>
16#include <stdlib.h>
17#include <sys/list.h>
18
19static void wait_remove_wait_entries(thread_t* thread, errno_t err)
20{
21 assert(atomic_load(&thread->state) == THREAD_UNBLOCKING);
22
23 thread->wait.err = err;
24
25 wait_entry_t* temp;
26 wait_entry_t* entry;
27 LIST_FOR_EACH_SAFE(entry, temp, &thread->wait.entries, threadEntry)
28 {
29 lock_acquire(&entry->queue->lock);
30 list_remove(&entry->queue->entries, &entry->queueEntry);
31 lock_release(&entry->queue->lock);
32
33 list_remove(&entry->thread->wait.entries, &entry->threadEntry); // Belongs to thread, no lock needed.
34 free(entry);
35 }
36}
37
39{
40 lock_init(&queue->lock);
41 list_init(&queue->entries);
42}
43
45{
46 LOCK_SCOPE(&queue->lock);
47
48 if (!list_is_empty(&queue->entries))
49 {
50 panic(NULL, "Wait queue with pending threads freed");
51 }
52}
53
55{
56 list_entry_init(&client->entry);
57 list_init(&client->entries);
58 client->err = EOK;
59 client->deadline = 0;
60 client->owner = NULL;
61}
62
63void wait_init(wait_t* wait)
64{
66 lock_init(&wait->lock);
67}
68
70{
71 (void)frame; // Unused
72
73 LOCK_SCOPE(&self->wait.lock);
74
75 while (1)
76 {
77 thread_t* thread = CONTAINER_OF_SAFE(list_first(&self->wait.blockedThreads), thread_t, wait.entry);
78 if (thread == NULL)
79 {
80 return;
81 }
82
84 if (thread->wait.deadline > uptime)
85 {
86 timer_set(uptime, thread->wait.deadline);
87 return;
88 }
89
90 list_remove(&self->wait.blockedThreads, &thread->wait.entry);
91
93 if (!atomic_compare_exchange_strong(&thread->state, &expected, THREAD_UNBLOCKING)) // Already unblocking.
94 {
95 continue;
96 }
97
99 sched_submit(thread);
100 }
101}
102
104{
105 if (waitQueues == NULL || amount == 0)
106 {
107 errno = EINVAL;
108 return ERR;
109 }
110
111 // Disable interrupts and retrieve thread.
112 thread_t* thread = cpu_get()->sched.runThread;
113
114 assert(thread != NULL);
115
116 for (uint64_t i = 0; i < amount; i++)
117 {
118 lock_acquire(&waitQueues[i]->lock);
119 }
120
121 for (uint64_t i = 0; i < amount; i++)
122 {
123 wait_entry_t* entry = malloc(sizeof(wait_entry_t));
124 if (entry == NULL)
125 {
126 while (1)
127 {
128 wait_entry_t* other =
130 if (other == NULL)
131 {
132 break;
133 }
134 free(other);
135 }
136
137 for (uint64_t j = 0; j < amount; j++)
138 {
139 lock_release(&waitQueues[j]->lock);
140 }
141
142 cpu_put(); // Interrupts enable.
143 errno = ENOMEM;
144 return ERR;
145 }
148 entry->queue = waitQueues[i];
149 entry->thread = thread;
150
151 list_push_back(&thread->wait.entries, &entry->threadEntry);
152 list_push_back(&entry->queue->entries, &entry->queueEntry);
153 }
154
155 thread->wait.err = EOK;
156 thread->wait.deadline = CLOCKS_DEADLINE(timeout, sys_time_uptime());
157 thread->wait.owner = NULL;
158
159 for (uint64_t i = 0; i < amount; i++)
160 {
161 lock_release(&waitQueues[i]->lock);
162 }
163
164 thread_state_t expected = THREAD_ACTIVE;
165 if (!atomic_compare_exchange_strong(&thread->state, &expected, THREAD_PRE_BLOCK))
166 {
167 if (expected != THREAD_UNBLOCKING)
168 {
169 panic(NULL, "Thread in invalid state in wait_block_prepare() state=%d", expected);
170 }
171 // We wait until the wait_block_commit() or wait_block_cancel() to actually do the early unblock.
172 }
173
174 // Return without enabling interrupts, they will be enabled in wait_block_commit() or wait_block_cancel().
175 return 0;
176}
177
179{
181
183 assert(thread != NULL);
184
185 thread_state_t state = atomic_exchange(&thread->state, THREAD_UNBLOCKING);
186
187 // State might already be unblocking if the thread unblocked prematurely.
188 assert(state == THREAD_PRE_BLOCK || state == THREAD_UNBLOCKING);
189
191
192 thread_state_t newState = atomic_exchange(&thread->state, THREAD_ACTIVE);
193 assert(newState == THREAD_UNBLOCKING); // Make sure state did not change.
194
195 cpu_put(); // Release cpu from wait_block_prepare().
197}
198
200{
202
204
205 thread_state_t state = atomic_load(&thread->state);
206 switch (state)
207 {
210 atomic_store(&thread->state, THREAD_ACTIVE);
211 cpu_put(); // Release cpu from wait_block_prepare().
212 break;
213 case THREAD_PRE_BLOCK:
214 cpu_put(); // Release cpu from wait_block_prepare().
215 ipi_invoke();
216 break;
217 default:
218 panic(NULL, "Invalid thread state %d in wait_block_commit()", state);
219 }
220
222
223 if (thread->wait.err != EOK)
224 {
225 errno = thread->wait.err;
226 return ERR;
227 }
228 return 0;
229}
230
232{
233 (void)frame; // Unused
234
235 thread->wait.owner = &self->wait;
236 LOCK_SCOPE(&self->wait.lock);
237
238 thread_t* lastThread = (CONTAINER_OF(list_last(&self->wait.blockedThreads), thread_t, wait.entry));
239
240 // Sort blocked threads by deadline
241 if (thread->wait.deadline == CLOCKS_NEVER || list_is_empty(&self->wait.blockedThreads) ||
242 lastThread->wait.deadline <= thread->wait.deadline)
243 {
244 list_push_back(&self->wait.blockedThreads, &thread->wait.entry);
245 }
246 else
247 {
248 thread_t* other;
249 LIST_FOR_EACH(other, &self->wait.blockedThreads, wait.entry)
250 {
251 if (other->wait.deadline > thread->wait.deadline)
252 {
253 list_prepend(&self->wait.blockedThreads, &other->wait.entry, &thread->wait.entry);
254 break;
255 }
256 }
257 }
258
260 if (!atomic_compare_exchange_strong(&thread->state, &expected, THREAD_BLOCKED)) // Prematurely unblocked
261 {
262 list_remove(&self->wait.blockedThreads, &thread->wait.entry);
264 atomic_store(&thread->state, THREAD_ACTIVE);
265 return false;
266 }
267
268 if (thread_is_note_pending(thread))
269 {
271 if (atomic_compare_exchange_strong(&thread->state, &expected, THREAD_UNBLOCKING))
272 {
273 list_remove(&self->wait.blockedThreads, &thread->wait.entry);
275 atomic_store(&thread->state, THREAD_ACTIVE);
276 return false;
277 }
278 }
279
280 timer_set(uptime, thread->wait.deadline);
281 return true;
282}
283
285{
286 assert(atomic_load(&thread->state) == THREAD_UNBLOCKING);
287
288 lock_acquire(&thread->wait.owner->lock);
289 list_remove(&thread->wait.owner->blockedThreads, &thread->wait.entry);
290 wait_remove_wait_entries(thread, err);
291 lock_release(&thread->wait.owner->lock);
292
293 sched_submit(thread);
294}
295
297{
298 uint64_t amountUnblocked = 0;
299
300 const uint64_t threadsPerBatch = 64;
301 while (1)
302 {
303 // For consistent lock ordering we first remove from the wait queue, release the wait queues lock and then
304 // acquire the threads cpu lock. Such that every time we acquire the locks its, cpu lock -> wait queue lock.
305
306 thread_t* threads[threadsPerBatch];
307 uint64_t toUnblock = amount < threadsPerBatch ? amount : threadsPerBatch;
308
309 lock_acquire(&queue->lock);
310
311 wait_entry_t* temp;
312 wait_entry_t* waitEntry;
313 uint64_t collected = 0;
314 LIST_FOR_EACH_SAFE(waitEntry, temp, &queue->entries, queueEntry)
315 {
316 if (collected == toUnblock)
317 {
318 break;
319 }
320
321 thread_t* thread = waitEntry->thread;
322
323 if (atomic_exchange(&thread->state, THREAD_UNBLOCKING) == THREAD_BLOCKED)
324 {
325 list_remove(&queue->entries, &waitEntry->queueEntry);
326 list_remove(&thread->wait.entries, &waitEntry->threadEntry);
327 free(waitEntry);
328 threads[collected] = thread;
329 collected++;
330 }
331 }
332
333 lock_release(&queue->lock);
334
335 if (collected == 0)
336 {
337 break;
338 }
339
340 for (uint64_t i = 0; i < collected; i++)
341 {
342 lock_acquire(&threads[i]->wait.owner->lock);
343 list_remove(&threads[i]->wait.owner->blockedThreads, &threads[i]->wait.entry);
344 wait_remove_wait_entries(threads[i], err);
345 lock_release(&threads[i]->wait.owner->lock);
346
347 sched_submit(threads[i]);
348 amountUnblocked++;
349 }
350 }
351
352 return amountUnblocked;
353}
#define assert(expression)
Definition assert.h:29
#define CLOCKS_NEVER
Definition clock_t.h:16
int errno_t
Definition errno_t.h:4
void ipi_invoke(void)
Invoke a IPI interrupt on the current CPU.
Definition ipi.c:267
static cpu_t * cpu_get_unsafe(void)
Gets the current CPU structure without disabling interrupts.
Definition cpu.h:289
static cpu_t * cpu_get(void)
Gets the current CPU structure.
Definition cpu.h:263
static void cpu_put(void)
Releases the current CPU structure.
Definition cpu.h:277
NORETURN void panic(const interrupt_frame_t *frame, const char *format,...)
Panic the kernel, printing a message and halting.
Definition panic.c:266
bool thread_is_note_pending(thread_t *thread)
Check if a thread has a note pending.
Definition thread.c:156
thread_state_t
Thread state enum.
Definition thread.h:30
@ THREAD_UNBLOCKING
Has started unblocking, used to prevent the same thread being unblocked multiple times.
Definition thread.h:35
@ THREAD_PRE_BLOCK
Has started the process of blocking but has not yet been given to a owner cpu.
Definition thread.h:33
@ THREAD_BLOCKED
Is blocking and waiting in one or multiple wait queues.
Definition thread.h:34
@ THREAD_ACTIVE
Is either running or ready to run.
Definition thread.h:32
uint64_t wait_block_prepare(wait_queue_t **waitQueues, uint64_t amount, clock_t timeout)
Prepare to block the currently running thread.
Definition wait.c:103
uint64_t wait_unblock(wait_queue_t *queue, uint64_t amount, errno_t err)
Unblock threads waiting on a wait queue.
Definition wait.c:296
uint64_t wait_block_commit(void)
Block the currently running thread.
Definition wait.c:199
void wait_init(wait_t *wait)
Initialize an instance of the waiting subsystem.
Definition wait.c:63
bool wait_block_finalize(interrupt_frame_t *frame, cpu_t *self, thread_t *thread, clock_t uptime)
Finalize blocking of a thread.
Definition wait.c:231
void wait_block_cancel(void)
Cancels blocking of the currently running thread.
Definition wait.c:178
void wait_queue_deinit(wait_queue_t *queue)
Deinitialize wait queue.
Definition wait.c:44
void wait_check_timeouts(interrupt_frame_t *frame, cpu_t *self)
Check for timeouts and unblock threads as needed.
Definition wait.c:69
void wait_queue_init(wait_queue_t *queue)
Initialize wait queue.
Definition wait.c:38
void wait_client_init(wait_client_t *client)
Initialize a threads wait client.
Definition wait.c:54
void wait_unblock_thread(thread_t *thread, errno_t err)
Unblock a specific thread.
Definition wait.c:284
void sched_submit(thread_t *thread)
Submits a thread to the scheduler.
Definition sched.c:375
static void lock_init(lock_t *lock)
Initializes a lock.
Definition lock.h:86
#define LOCK_SCOPE(lock)
Acquires a lock for the reminder of the current scope.
Definition lock.h:57
static void lock_release(lock_t *lock)
Releases a lock.
Definition lock.h:146
static void lock_acquire(lock_t *lock)
Acquires a lock, blocking until it is available.
Definition lock.h:103
clock_t sys_time_uptime(void)
Time since boot.
Definition sys_time.c:99
void timer_set(clock_t uptime, clock_t deadline)
Schedule a one-shot timer interrupt on the current CPU.
Definition timer.c:134
#define EINVAL
Invalid argument.
Definition errno.h:142
#define EINTR
Interrupted system call.
Definition errno.h:52
#define ETIMEDOUT
Connection timed out.
Definition errno.h:577
#define ENOMEM
Out of memory.
Definition errno.h:92
#define errno
Error number variable.
Definition errno.h:27
#define EOK
No error.
Definition errno.h:32
#define LIST_FOR_EACH(elem, list, member)
Iterates over a list.
Definition list.h:63
static list_entry_t * list_first(list_t *list)
Gets the first entry in the list without removing it.
Definition list.h:417
static list_entry_t * list_pop_first(list_t *list)
Pops the first entry from the list.
Definition list.h:375
static list_entry_t * list_last(list_t *list)
Gets the last entry in the list without removing it.
Definition list.h:435
static void list_push_back(list_t *list, list_entry_t *entry)
Pushes an entry to the end of the list.
Definition list.h:343
static void list_prepend(list_t *list, list_entry_t *head, list_entry_t *entry)
Prepends an entry to the list.
Definition list.h:303
#define LIST_FOR_EACH_SAFE(elem, temp, list, member)
Safely iterates over a list, allowing for element removal during iteration.
Definition list.h:79
static void list_remove(list_t *list, list_entry_t *entry)
Removes a list entry from its current list.
Definition list.h:315
static bool list_is_empty(list_t *list)
Checks if a list is empty.
Definition list.h:227
static void list_entry_init(list_entry_t *entry)
Initializes a list entry.
Definition list.h:182
static void list_init(list_t *list)
Initializes a list.
Definition list.h:196
clock_t uptime(void)
System call for retreving the time since boot.
Definition uptime.c:6
#define NULL
Pointer error value.
Definition NULL.h:23
#define ERR
Integer error value.
Definition ERR.h:17
#define CONTAINER_OF(ptr, type, member)
Container of macro.
#define CONTAINER_OF_SAFE(ptr, type, member)
Safe container of macro.
#define CLOCKS_DEADLINE(timeout, uptime)
Safely calculate deadline from timeout.
Definition clock_t.h:45
__UINT64_TYPE__ clock_t
A nanosecond time.
Definition clock_t.h:13
static lock_t lock
Definition io.c:13
#define RFLAGS_INTERRUPT_ENABLE
Definition regs.h:32
static uint64_t rflags_read()
Definition regs.h:78
#define atomic_store(object, desired)
Definition stdatomic.h:289
#define atomic_compare_exchange_strong(object, expected, desired)
Definition stdatomic.h:278
#define atomic_exchange(object, desired)
Definition stdatomic.h:282
#define atomic_load(object)
Definition stdatomic.h:288
__UINT64_TYPE__ uint64_t
Definition stdint.h:17
_PUBLIC void * malloc(size_t size)
Definition malloc.c:5
_PUBLIC void free(void *ptr)
Definition free.c:11
CPU structure.
Definition cpu.h:122
wait_t wait
Definition cpu.h:133
sched_t sched
Definition cpu.h:134
Trap Frame Structure.
Definition interrupt.h:143
thread_t *volatile runThread
The currently running thread on this CPU.
Definition sched.h:400
Thread of execution structure.
Definition thread.h:63
wait_client_t wait
Definition thread.h:78
Represents a thread in the waiting subsystem.
Definition wait.h:195
list_entry_t entry
Definition wait.h:196
list_t entries
List of wait entries, one for each wait queue the thread is waiting on.
Definition wait.h:197
wait_t * owner
The wait cpu context of the cpu the thread is blocked on.
Definition wait.h:200
clock_t deadline
Deadline for timeout, CLOCKS_NEVER for no timeout.
Definition wait.h:199
errno_t err
Error number set when unblocking the thread, EOK for no error.
Definition wait.h:198
Represents a thread waiting on a wait queue.
Definition wait.h:170
thread_t * thread
The thread that is waiting.
Definition wait.h:173
wait_queue_t * queue
The wait queue the thread is waiting on.
Definition wait.h:174
list_entry_t queueEntry
Used in wait_queue_t->entries.
Definition wait.h:171
list_entry_t threadEntry
Used in wait_client_t->entries.
Definition wait.h:172
The primitive that threads block on.
Definition wait.h:182
lock_t lock
Definition wait.h:183
list_t entries
List of wait entries for threads waiting on this queue.
Definition wait.h:184
Represents one instance of the waiting subsystem for a CPU.
Definition wait.h:208
lock_t lock
Definition wait.h:210
list_t blockedThreads
List of blocked threads, sorted by deadline.
Definition wait.h:209
static void wait_remove_wait_entries(thread_t *thread, errno_t err)
Definition wait.c:19