PatchworkOS
Loading...
Searching...
No Matches
wait.c
Go to the documentation of this file.
1#include <kernel/sched/wait.h>
2
3#include <kernel/cpu/cpu.h>
4#include <kernel/cpu/smp.h>
5#include <kernel/log/panic.h>
9#include <kernel/sync/lock.h>
10
11#include <assert.h>
12#include <errno.h>
13#include <stdatomic.h>
14#include <stdlib.h>
15#include <sys/list.h>
16
17static void wait_remove_wait_entries(thread_t* thread, errno_t err)
18{
19 assert(atomic_load(&thread->state) == THREAD_UNBLOCKING);
20
21 thread->wait.err = err;
22
23 wait_entry_t* temp;
24 wait_entry_t* entry;
25 LIST_FOR_EACH_SAFE(entry, temp, &thread->wait.entries, threadEntry)
26 {
27 lock_acquire(&entry->waitQueue->lock);
28 list_remove(&entry->waitQueue->entries, &entry->queueEntry);
29 lock_release(&entry->waitQueue->lock);
30
31 list_remove(&entry->thread->wait.entries, &entry->threadEntry); // Belongs to thread, no lock needed.
32 free(entry);
33 }
34}
35
36static void wait_timer_handler(interrupt_frame_t* frame, cpu_t* self)
37{
38 (void)frame; // Unused
39
40 LOCK_SCOPE(&self->wait.lock);
41
42 while (1)
43 {
45 if (thread == NULL)
46 {
47 return;
48 }
49
51 if (thread->wait.deadline > uptime)
52 {
53 timer_one_shot(self, uptime, thread->wait.deadline - uptime);
54 return;
55 }
56
57 list_remove(&self->wait.blockedThreads, &thread->entry);
58
59 // Already unblocking.
61 if (!atomic_compare_exchange_strong(&thread->state, &expected, THREAD_UNBLOCKING))
62 {
63 continue;
64 }
65
67 sched_push(thread, thread->wait.cpu->cpu);
68 }
69}
70
72{
73 lock_init(&waitQueue->lock);
74 list_init(&waitQueue->entries);
75}
76
78{
79 LOCK_SCOPE(&waitQueue->lock);
80
81 if (!list_is_empty(&waitQueue->entries))
82 {
83 panic(NULL, "Wait queue with pending threads freed");
84 }
85}
86
88{
89 list_init(&wait->entries);
90 wait->err = EOK;
91 wait->deadline = 0;
92 wait->cpu = NULL;
93}
94
96{
98 wait->cpu = self;
99 lock_init(&wait->lock);
101}
102
104{
105 (void)frame; // Unused
106
107 thread->wait.cpu = &self->wait;
108 LOCK_SCOPE(&self->wait.lock);
109
110 thread_t* lastThread = (CONTAINER_OF(list_last(&self->wait.blockedThreads), thread_t, entry));
111
112 // Sort blocked threads by deadline
113 if (thread->wait.deadline == CLOCKS_NEVER || list_is_empty(&self->wait.blockedThreads) ||
114 lastThread->wait.deadline <= thread->wait.deadline)
115 {
116 list_push(&self->wait.blockedThreads, &thread->entry);
117 }
118 else
119 {
120 thread_t* other;
121 LIST_FOR_EACH(other, &self->wait.blockedThreads, entry)
122 {
123 if (other->wait.deadline > thread->wait.deadline)
124 {
125 list_prepend(&self->wait.blockedThreads, &other->entry, &thread->entry);
126 break;
127 }
128 }
129 }
130
132 if (!atomic_compare_exchange_strong(&thread->state, &expected, THREAD_BLOCKED)) // Prematurely unblocked
133 {
134 list_remove(&self->wait.blockedThreads, &thread->entry);
136 atomic_store(&thread->state, THREAD_RUNNING);
137 return false;
138 }
139
140 if (thread_is_note_pending(thread))
141 {
143 if (atomic_compare_exchange_strong(&thread->state, &expected, THREAD_UNBLOCKING))
144 {
145 list_remove(&self->wait.blockedThreads, &thread->entry);
147 atomic_store(&thread->state, THREAD_RUNNING);
148 return false;
149 }
150 }
151
152 timer_one_shot(self, uptime, thread->wait.deadline > uptime ? thread->wait.deadline - uptime : 0);
153 return true;
154}
155
157{
158 assert(atomic_load(&thread->state) == THREAD_UNBLOCKING);
159
160 lock_acquire(&thread->wait.cpu->lock);
161 list_remove(&thread->wait.cpu->blockedThreads, &thread->entry);
162 wait_remove_wait_entries(thread, err);
163 lock_release(&thread->wait.cpu->lock);
164
165 sched_push(thread, thread->wait.cpu->cpu);
166}
167
169{
170 uint64_t amountUnblocked = 0;
171
172 const uint64_t threadsPerBatch = 64;
173 while (1)
174 {
175 // For concistent lock ordering we first remove from the wait queue, release the wait queues lock and then
176 // acquire the threads cpu lock. Such that every time we acquire the locks its, cpu lock -> wait queue lock.
177
178 thread_t* threads[threadsPerBatch];
179 uint64_t toUnblock = amount < threadsPerBatch ? amount : threadsPerBatch;
180
181 lock_acquire(&waitQueue->lock);
182
183 wait_entry_t* temp;
184 wait_entry_t* waitEntry;
185 uint64_t collected = 0;
186 LIST_FOR_EACH_SAFE(waitEntry, temp, &waitQueue->entries, queueEntry)
187 {
188 if (collected == toUnblock)
189 {
190 break;
191 }
192
193 thread_t* thread = waitEntry->thread;
194
195 if (atomic_exchange(&thread->state, THREAD_UNBLOCKING) == THREAD_BLOCKED)
196 {
197 list_remove(&waitQueue->entries, &waitEntry->queueEntry);
198 list_remove(&thread->wait.entries, &waitEntry->threadEntry);
199 free(waitEntry);
200 threads[collected] = thread;
201 collected++;
202 }
203 }
204
205 lock_release(&waitQueue->lock);
206
207 if (collected == 0)
208 {
209 break;
210 }
211
212 for (uint64_t i = 0; i < collected; i++)
213 {
214 lock_acquire(&threads[i]->wait.cpu->lock);
215 list_remove(&threads[i]->wait.cpu->blockedThreads, &threads[i]->entry);
217 lock_release(&threads[i]->wait.cpu->lock);
218
219 sched_push(threads[i], threads[i]->wait.cpu->cpu);
220 amountUnblocked++;
221 }
222 }
223
224 return amountUnblocked;
225}
226
228{
229 if (waitQueues == NULL || amount == 0)
230 {
231 errno = EINVAL;
232 return ERR;
233 }
234
235 // Disable interrupts and retrive thread.
236 thread_t* thread = smp_self()->sched.runThread;
237
238 assert(thread != NULL);
239
240 for (uint64_t i = 0; i < amount; i++)
241 {
242 lock_acquire(&waitQueues[i]->lock);
243 }
244
245 for (uint64_t i = 0; i < amount; i++)
246 {
247 wait_entry_t* entry = malloc(sizeof(wait_entry_t));
248 if (entry == NULL)
249 {
250 while (1)
251 {
252 wait_entry_t* other = CONTAINER_OF_SAFE(list_pop(&thread->wait.entries), wait_entry_t, threadEntry);
253 if (other == NULL)
254 {
255 break;
256 }
257 free(other);
258 }
259
260 for (uint64_t j = 0; j < amount; j++)
261 {
262 lock_release(&waitQueues[j]->lock);
263 }
264
265 smp_put(); // Interrupts enable.
266 return ERR;
267 }
270 entry->waitQueue = waitQueues[i];
271 entry->thread = thread;
272
273 list_push(&thread->wait.entries, &entry->threadEntry);
274 list_push(&entry->waitQueue->entries, &entry->queueEntry);
275 }
276
277 thread->wait.err = EOK;
278 thread->wait.deadline = timeout == CLOCKS_NEVER ? CLOCKS_NEVER : timer_uptime() + timeout;
279 thread->wait.cpu = NULL;
280
281 for (uint64_t i = 0; i < amount; i++)
282 {
283 lock_release(&waitQueues[i]->lock);
284 }
285
287 if (!atomic_compare_exchange_strong(&thread->state, &expected, THREAD_PRE_BLOCK))
288 {
289 if (expected != THREAD_UNBLOCKING)
290 {
291 panic(NULL, "Thread in invalid state in wait_block_setup() state=%d", expected);
292 }
293 // We wait until the wait_block_commit() or wait_block_cancel() to actually do the early unblock.
294 }
295
296 // Return without enabling interrupts, they will be enabled in wait_block_commit() or wait_block_cancel().
297 return 0;
298}
299
301{
303
305 assert(thread != NULL);
306
307 thread_state_t state = atomic_exchange(&thread->state, THREAD_UNBLOCKING);
308
309 // State might already be unblocking if the thread unblocked prematurely.
310 assert(state == THREAD_PRE_BLOCK || state == THREAD_UNBLOCKING);
311
313
314 thread_state_t newState = atomic_exchange(&thread->state, THREAD_RUNNING);
315 assert(newState == THREAD_UNBLOCKING); // Make sure state did not change.
316
317 smp_put(); // Release cpu from wait_block_setup().
319}
320
322{
324
326
327 thread_state_t state = atomic_load(&thread->state);
328 switch (state)
329 {
332 atomic_store(&thread->state, THREAD_RUNNING);
333 smp_put(); // Release cpu from wait_block_setup().
334 break;
335 case THREAD_PRE_BLOCK:
336 smp_put(); // Release cpu from wait_block_setup().
338 break;
339 default:
340 panic(NULL, "Invalid thread state %d in wait_block_commit()", state);
341 }
342
344
345 if (thread->wait.err != EOK)
346 {
347 errno = thread->wait.err;
348 return ERR;
349 }
350 return 0;
351}
#define assert(expression)
Definition assert.h:29
#define CLOCKS_NEVER
Definition clock_t.h:16
int errno_t
Definition errno_t.h:4
static cpu_t * smp_self_unsafe(void)
Returns a pointer to the cpu_t structure of the current CPU.
Definition smp.h:90
static cpu_t * smp_self(void)
Returns a pointer to the cpu_t structure of the current CPU.
Definition smp.h:115
static void smp_put(void)
Re-enables interrupts after a call to smp_self().
Definition smp.h:125
NORETURN void panic(const interrupt_frame_t *frame, const char *format,...)
Panic the kernel, printing a message and halting.
Definition panic.c:362
bool thread_is_note_pending(thread_t *thread)
Check if a thread has a note pending.
Definition thread.c:127
thread_state_t
Thread state enum.
Definition thread.h:29
@ THREAD_UNBLOCKING
Has started unblocking, used to prevent the same thread being unblocked multiple times.
Definition thread.h:35
@ THREAD_PRE_BLOCK
Has started the process of blocking but has not yet been given to a owner cpu.
Definition thread.h:33
@ THREAD_BLOCKED
Is blocking and waiting in one or multiple wait queues.
Definition thread.h:34
@ THREAD_RUNNING
Is currently running on a cpu.
Definition thread.h:32
uint64_t wait_block_commit(void)
Block the currently running thread.
Definition wait.c:321
bool wait_block_finalize(interrupt_frame_t *frame, cpu_t *self, thread_t *thread, clock_t uptime)
Finalize blocking of a thread.
Definition wait.c:103
uint64_t wait_block_setup(wait_queue_t **waitQueues, uint64_t amount, clock_t timeout)
Setup blocking but dont block yet.
Definition wait.c:227
void wait_block_cancel(void)
Cancel blocking.
Definition wait.c:300
uint64_t wait_unblock(wait_queue_t *waitQueue, uint64_t amount, errno_t err)
Unblock threads waiting on a wait queue.
Definition wait.c:168
void wait_queue_init(wait_queue_t *waitQueue)
Initialize wait queue.
Definition wait.c:71
void wait_cpu_ctx_init(wait_cpu_ctx_t *wait, cpu_t *self)
Initialize per-CPU wait context.
Definition wait.c:95
void wait_thread_ctx_init(wait_thread_ctx_t *wait)
Initialize per-thread wait context.
Definition wait.c:87
void wait_queue_deinit(wait_queue_t *waitQueue)
Deinitialize wait queue.
Definition wait.c:77
void wait_unblock_thread(thread_t *thread, errno_t err)
Unblock a specific thread.
Definition wait.c:156
void sched_push(thread_t *thread, cpu_t *target)
Pushes a thread onto a scheduling queue.
Definition sched.c:305
static void lock_init(lock_t *lock)
Initializes a lock.
Definition lock.h:80
#define LOCK_SCOPE(lock)
Acquires a lock for the reminder of the current scope.
Definition lock.h:57
static void lock_release(lock_t *lock)
Releases a lock.
Definition lock.h:140
static void lock_acquire(lock_t *lock)
Acquires a lock, blocking until it is available.
Definition lock.h:97
void timer_notify_self(void)
Trigger timer interrupt on self.
Definition timer.c:202
void timer_one_shot(cpu_t *self, clock_t uptime, clock_t timeout)
Schedule a one-shot timer interrupt.
Definition timer.c:153
void timer_subscribe(timer_ctx_t *ctx, timer_callback_t callback)
Subscribe to timer interrupts.
Definition timer.c:111
clock_t timer_uptime(void)
Time since boot.
Definition timer.c:73
#define EINVAL
Invalid argument.
Definition errno.h:142
#define EINTR
Interrupted system call.
Definition errno.h:52
#define ETIMEDOUT
Connection timed out.
Definition errno.h:577
#define errno
Error number variable.
Definition errno.h:27
#define EOK
No error.
Definition errno.h:32
#define LIST_FOR_EACH(elem, list, member)
Iterates over a list.
Definition list.h:65
static list_entry_t * list_first(list_t *list)
Gets the first entry in the list without removing it.
Definition list.h:382
static list_entry_t * list_last(list_t *list)
Gets the last entry in the list without removing it.
Definition list.h:400
static void list_prepend(list_t *list, list_entry_t *head, list_entry_t *entry)
Prepends an entry to the list.
Definition list.h:305
#define LIST_FOR_EACH_SAFE(elem, temp, list, member)
Safely iterates over a list, allowing for element removal during iteration.
Definition list.h:81
static void list_remove(list_t *list, list_entry_t *entry)
Removes a list entry from its current list.
Definition list.h:317
static void list_push(list_t *list, list_entry_t *entry)
Pushes an entry to the end of the list.
Definition list.h:345
static bool list_is_empty(list_t *list)
Checks if a list is empty.
Definition list.h:229
static void list_entry_init(list_entry_t *entry)
Initializes a list entry.
Definition list.h:184
static void list_init(list_t *list)
Initializes a list.
Definition list.h:198
static list_entry_t * list_pop(list_t *list)
Pops the first entry from the list.
Definition list.h:361
clock_t uptime(void)
System call for retreving the time since boot.
Definition uptime.c:6
#define NULL
Pointer error value.
Definition NULL.h:23
#define ERR
Integer error value.
Definition ERR.h:17
#define CONTAINER_OF(ptr, type, member)
Container of macro.
#define CONTAINER_OF_SAFE(ptr, type, member)
Safe container of macro.
__UINT64_TYPE__ clock_t
A nanosecond time.
Definition clock_t.h:13
static rwlock_t lock
Definition irq.c:10
static list_t threads
Definition thread.c:8
#define RFLAGS_INTERRUPT_ENABLE
Definition regs.h:32
static uint64_t rflags_read()
Definition regs.h:78
#define atomic_store(object, desired)
Definition stdatomic.h:289
#define atomic_compare_exchange_strong(object, expected, desired)
Definition stdatomic.h:278
#define atomic_exchange(object, desired)
Definition stdatomic.h:282
#define atomic_load(object)
Definition stdatomic.h:288
__UINT64_TYPE__ uint64_t
Definition stdint.h:17
_PUBLIC void * malloc(size_t size)
Definition malloc.c:5
_PUBLIC void free(void *ptr)
Definition free.c:11
CPU structure.
Definition cpu.h:42
timer_ctx_t timer
Definition cpu.h:49
wait_cpu_ctx_t wait
Definition cpu.h:50
sched_cpu_ctx_t sched
Definition cpu.h:51
Trap Frame Structure.
Definition interrupt.h:42
thread_t * runThread
The currently running thread.
Definition sched.h:119
Thread of execution structure.
Definition thread.h:55
list_entry_t entry
The entry for the scheduler and wait system.
Definition thread.h:56
wait_thread_ctx_t wait
Definition thread.h:71
Per-CPU wait context.
Definition wait.h:193
lock_t lock
Definition wait.h:196
list_t blockedThreads
List of blocked threads, sorted by deadline.
Definition wait.h:194
cpu_t * cpu
The CPU this context belongs to.
Definition wait.h:195
Per-thread wait entry.
Definition wait.h:179
wait_queue_t * waitQueue
The wait queue the thread is waiting on.
Definition wait.h:183
thread_t * thread
The thread that is waiting.
Definition wait.h:182
list_entry_t queueEntry
Used in wait_queue_t->entries.
Definition wait.h:180
list_entry_t threadEntry
Used in wait_thread_ctx_t->entries.
Definition wait.h:181
Wait queue structure.
Definition wait.h:166
lock_t lock
Definition wait.h:167
list_t entries
List of wait entries for threads waiting on this queue.
Definition wait.h:168
Per-thread wait context.
Definition wait.h:207
wait_cpu_ctx_t * cpu
The wait cpu context of the cpu the thread is blocked on.
Definition wait.h:211
clock_t deadline
Deadline for timeout, CLOCKS_NEVER for no timeout.
Definition wait.h:210
errno_t err
Error number set when unblocking the thread, EOK for no error.
Definition wait.h:209
list_t entries
List of wait entries, one for each wait queue the thread is waiting on.
Definition wait.h:208
static void wait_timer_handler(interrupt_frame_t *frame, cpu_t *self)
Definition wait.c:36
static void wait_remove_wait_entries(thread_t *thread, errno_t err)
Definition wait.c:17