1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 | |
10 | |
11 | |
12 | |
13 | |
14 | |
15 | |
16 | |
17 | #include "apr.h" |
18 | |
19 | #if APR_HAVE_STDIO_H1 |
20 | #include <stdio.h> |
21 | #endif |
22 | #if APR_HAVE_STDLIB_H1 |
23 | #include <stdlib.h> |
24 | #endif |
25 | #if APR_HAVE_UNISTD_H1 |
26 | #include <unistd.h> |
27 | #endif |
28 | |
29 | #include "apu.h" |
30 | #include "apr_portable.h" |
31 | #include "apr_thread_mutex.h" |
32 | #include "apr_thread_cond.h" |
33 | #include "apr_errno.h" |
34 | #include "apr_queue.h" |
35 | |
36 | #if APR_HAS_THREADS1 |
37 | |
38 | |
39 | |
40 | |
41 | |
42 | |
43 | struct apr_queue_t { |
44 | void **data; |
45 | unsigned int nelts; |
46 | unsigned int in; |
47 | unsigned int out; |
48 | unsigned int bounds; |
49 | unsigned int full_waiters; |
50 | unsigned int empty_waiters; |
51 | apr_thread_mutex_t *one_big_mutex; |
52 | apr_thread_cond_t *not_empty; |
53 | apr_thread_cond_t *not_full; |
54 | int terminated; |
55 | }; |
56 | |
57 | #ifdef QUEUE_DEBUG |
58 | static void Q_DBG(char*msg, apr_queue_t *q) { |
59 | fprintf(stderrstderr, "%ld\t#%d in %d out %d\t%s\n", |
60 | apr_os_thread_current(), |
61 | q->nelts, q->in, q->out, |
62 | msg |
63 | ); |
64 | } |
65 | #else |
66 | #define Q_DBG(x,y) |
67 | #endif |
68 | |
69 | |
70 | |
71 | |
72 | |
73 | #define apr_queue_full(queue)((queue)->nelts == (queue)->bounds) ((queue)->nelts == (queue)->bounds) |
74 | |
75 | |
76 | |
77 | |
78 | |
79 | #define apr_queue_empty(queue)((queue)->nelts == 0) ((queue)->nelts == 0) |
80 | |
81 | |
82 | |
83 | |
84 | |
85 | static apr_status_t queue_destroy(void *data) |
86 | { |
87 | apr_queue_t *queue = data; |
88 | |
89 | |
90 | |
91 | apr_thread_cond_destroy(queue->not_empty); |
92 | apr_thread_cond_destroy(queue->not_full); |
93 | apr_thread_mutex_destroy(queue->one_big_mutex); |
94 | |
95 | return APR_SUCCESS0; |
96 | } |
97 | |
98 | |
99 | |
100 | |
101 | APU_DECLARE(apr_status_t)apr_status_t apr_queue_create(apr_queue_t **q, |
102 | unsigned int queue_capacity, |
103 | apr_pool_t *a) |
104 | { |
105 | apr_status_t rv; |
106 | apr_queue_t *queue; |
107 | queue = apr_palloc(a, sizeof(apr_queue_t)); |
108 | *q = queue; |
109 | |
110 | |
111 | rv = apr_thread_mutex_create(&queue->one_big_mutex, |
112 | APR_THREAD_MUTEX_UNNESTED0x2, |
113 | a); |
114 | if (rv != APR_SUCCESS0) { |
115 | return rv; |
116 | } |
117 | |
118 | rv = apr_thread_cond_create(&queue->not_empty, a); |
119 | if (rv != APR_SUCCESS0) { |
120 | return rv; |
121 | } |
122 | |
123 | rv = apr_thread_cond_create(&queue->not_full, a); |
124 | if (rv != APR_SUCCESS0) { |
125 | return rv; |
126 | } |
127 | |
128 | |
129 | queue->data = apr_palloc(a, queue_capacity * sizeof(void*)); |
130 | if (!queue->data) return APR_ENOMEM12; |
131 | memset(queue->data, 0, queue_capacity * sizeof(void*)); |
132 | queue->bounds = queue_capacity; |
133 | queue->nelts = 0; |
134 | queue->in = 0; |
135 | queue->out = 0; |
136 | queue->terminated = 0; |
137 | queue->full_waiters = 0; |
138 | queue->empty_waiters = 0; |
139 | |
140 | apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null); |
141 | |
142 | return APR_SUCCESS0; |
143 | } |
144 | |
145 | |
146 | |
147 | |
148 | |
149 | |
150 | APU_DECLARE(apr_status_t)apr_status_t apr_queue_push(apr_queue_t *queue, void *data) |
151 | { |
152 | apr_status_t rv; |
153 | |
154 | if (queue->terminated) { |
155 | return APR_EOF((20000 + 50000) + 14); |
156 | } |
157 | |
158 | rv = apr_thread_mutex_lock(queue->one_big_mutex); |
159 | if (rv != APR_SUCCESS0) { |
160 | return rv; |
161 | } |
162 | |
163 | if (apr_queue_full(queue)((queue)->nelts == (queue)->bounds)) { |
164 | if (!queue->terminated) { |
165 | queue->full_waiters++; |
166 | rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex); |
167 | queue->full_waiters--; |
168 | if (rv != APR_SUCCESS0) { |
169 | apr_thread_mutex_unlock(queue->one_big_mutex); |
170 | return rv; |
171 | } |
172 | } |
173 | |
174 | if (apr_queue_full(queue)((queue)->nelts == (queue)->bounds)) { |
175 | Q_DBG("queue full (intr)", queue); |
176 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
177 | if (rv != APR_SUCCESS0) { |
178 | return rv; |
179 | } |
180 | if (queue->terminated) { |
181 | return APR_EOF((20000 + 50000) + 14); |
182 | } |
183 | else { |
184 | return APR_EINTR4; |
185 | } |
186 | } |
187 | } |
188 | |
189 | queue->data[queue->in] = data; |
190 | queue->in = (queue->in + 1) % queue->bounds; |
191 | queue->nelts++; |
192 | |
193 | if (queue->empty_waiters) { |
194 | Q_DBG("sig !empty", queue); |
195 | rv = apr_thread_cond_signal(queue->not_empty); |
196 | if (rv != APR_SUCCESS0) { |
197 | apr_thread_mutex_unlock(queue->one_big_mutex); |
198 | return rv; |
199 | } |
200 | } |
201 | |
202 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
203 | return rv; |
204 | } |
205 | |
206 | |
207 | |
208 | |
209 | |
210 | |
211 | APU_DECLARE(apr_status_t)apr_status_t apr_queue_trypush(apr_queue_t *queue, void *data) |
212 | { |
213 | apr_status_t rv; |
214 | |
215 | if (queue->terminated) { |
216 | return APR_EOF((20000 + 50000) + 14); |
217 | } |
218 | |
219 | rv = apr_thread_mutex_lock(queue->one_big_mutex); |
220 | if (rv != APR_SUCCESS0) { |
221 | return rv; |
222 | } |
223 | |
224 | if (apr_queue_full(queue)((queue)->nelts == (queue)->bounds)) { |
225 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
| Value stored to 'rv' is never read |
226 | return APR_EAGAIN11; |
227 | } |
228 | |
229 | queue->data[queue->in] = data; |
230 | queue->in = (queue->in + 1) % queue->bounds; |
231 | queue->nelts++; |
232 | |
233 | if (queue->empty_waiters) { |
234 | Q_DBG("sig !empty", queue); |
235 | rv = apr_thread_cond_signal(queue->not_empty); |
236 | if (rv != APR_SUCCESS0) { |
237 | apr_thread_mutex_unlock(queue->one_big_mutex); |
238 | return rv; |
239 | } |
240 | } |
241 | |
242 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
243 | return rv; |
244 | } |
245 | |
246 | |
247 | |
248 | |
249 | APU_DECLARE(unsigned int)unsigned int apr_queue_size(apr_queue_t *queue) { |
250 | return queue->nelts; |
251 | } |
252 | |
253 | |
254 | |
255 | |
256 | |
257 | |
258 | |
259 | APU_DECLARE(apr_status_t)apr_status_t apr_queue_pop(apr_queue_t *queue, void **data) |
260 | { |
261 | apr_status_t rv; |
262 | |
263 | if (queue->terminated) { |
264 | return APR_EOF((20000 + 50000) + 14); |
265 | } |
266 | |
267 | rv = apr_thread_mutex_lock(queue->one_big_mutex); |
268 | if (rv != APR_SUCCESS0) { |
269 | return rv; |
270 | } |
271 | |
272 | |
273 | if (apr_queue_empty(queue)((queue)->nelts == 0)) { |
274 | if (!queue->terminated) { |
275 | queue->empty_waiters++; |
276 | rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); |
277 | queue->empty_waiters--; |
278 | if (rv != APR_SUCCESS0) { |
279 | apr_thread_mutex_unlock(queue->one_big_mutex); |
280 | return rv; |
281 | } |
282 | } |
283 | |
284 | if (apr_queue_empty(queue)((queue)->nelts == 0)) { |
285 | Q_DBG("queue empty (intr)", queue); |
286 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
287 | if (rv != APR_SUCCESS0) { |
288 | return rv; |
289 | } |
290 | if (queue->terminated) { |
291 | return APR_EOF((20000 + 50000) + 14); |
292 | } |
293 | else { |
294 | return APR_EINTR4; |
295 | } |
296 | } |
297 | } |
298 | |
299 | *data = queue->data[queue->out]; |
300 | queue->nelts--; |
301 | |
302 | queue->out = (queue->out + 1) % queue->bounds; |
303 | if (queue->full_waiters) { |
304 | Q_DBG("signal !full", queue); |
305 | rv = apr_thread_cond_signal(queue->not_full); |
306 | if (rv != APR_SUCCESS0) { |
307 | apr_thread_mutex_unlock(queue->one_big_mutex); |
308 | return rv; |
309 | } |
310 | } |
311 | |
312 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
313 | return rv; |
314 | } |
315 | |
316 | |
317 | |
318 | |
319 | |
320 | |
321 | |
322 | APU_DECLARE(apr_status_t)apr_status_t apr_queue_pop_timeout(apr_queue_t *queue, void **data, apr_interval_time_t timeout) |
323 | { |
324 | apr_status_t rv; |
325 | |
326 | if (queue->terminated) { |
327 | return APR_EOF((20000 + 50000) + 14); |
328 | } |
329 | |
330 | rv = apr_thread_mutex_lock(queue->one_big_mutex); |
331 | if (rv != APR_SUCCESS0) { |
332 | return rv; |
333 | } |
334 | |
335 | |
336 | if (apr_queue_empty(queue)((queue)->nelts == 0)) { |
337 | if (!queue->terminated) { |
338 | queue->empty_waiters++; |
339 | rv = apr_thread_cond_timedwait(queue->not_empty, queue->one_big_mutex, timeout); |
340 | queue->empty_waiters--; |
341 | |
342 | if (rv != APR_SUCCESS0) { |
343 | apr_thread_mutex_unlock(queue->one_big_mutex); |
344 | return rv; |
345 | } |
346 | } |
347 | |
348 | if (apr_queue_empty(queue)((queue)->nelts == 0)) { |
349 | Q_DBG("queue empty (intr)", queue); |
350 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
351 | if (rv != APR_SUCCESS0) { |
352 | return rv; |
353 | } |
354 | if (queue->terminated) { |
355 | return APR_EOF((20000 + 50000) + 14); |
356 | } |
357 | else { |
358 | return APR_EINTR4; |
359 | } |
360 | } |
361 | } |
362 | |
363 | *data = queue->data[queue->out]; |
364 | queue->nelts--; |
365 | |
366 | queue->out = (queue->out + 1) % queue->bounds; |
367 | if (queue->full_waiters) { |
368 | Q_DBG("signal !full", queue); |
369 | rv = apr_thread_cond_signal(queue->not_full); |
370 | if (rv != APR_SUCCESS0) { |
371 | apr_thread_mutex_unlock(queue->one_big_mutex); |
372 | return rv; |
373 | } |
374 | } |
375 | |
376 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
377 | return rv; |
378 | } |
379 | |
380 | |
381 | |
382 | |
383 | |
384 | |
385 | |
386 | APU_DECLARE(apr_status_t)apr_status_t apr_queue_trypop(apr_queue_t *queue, void **data) |
387 | { |
388 | apr_status_t rv; |
389 | |
390 | if (queue->terminated) { |
391 | return APR_EOF((20000 + 50000) + 14); |
392 | } |
393 | |
394 | rv = apr_thread_mutex_lock(queue->one_big_mutex); |
395 | if (rv != APR_SUCCESS0) { |
396 | return rv; |
397 | } |
398 | |
399 | if (apr_queue_empty(queue)((queue)->nelts == 0)) { |
400 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
401 | return APR_EAGAIN11; |
402 | } |
403 | |
404 | *data = queue->data[queue->out]; |
405 | queue->nelts--; |
406 | |
407 | queue->out = (queue->out + 1) % queue->bounds; |
408 | if (queue->full_waiters) { |
409 | Q_DBG("signal !full", queue); |
410 | rv = apr_thread_cond_signal(queue->not_full); |
411 | if (rv != APR_SUCCESS0) { |
412 | apr_thread_mutex_unlock(queue->one_big_mutex); |
413 | return rv; |
414 | } |
415 | } |
416 | |
417 | rv = apr_thread_mutex_unlock(queue->one_big_mutex); |
418 | return rv; |
419 | } |
420 | |
421 | APU_DECLARE(apr_status_t)apr_status_t apr_queue_interrupt_all(apr_queue_t *queue) |
422 | { |
423 | apr_status_t rv; |
424 | Q_DBG("intr all", queue); |
425 | if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS0) { |
426 | return rv; |
427 | } |
428 | apr_thread_cond_broadcast(queue->not_empty); |
429 | apr_thread_cond_broadcast(queue->not_full); |
430 | |
431 | if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS0) { |
432 | return rv; |
433 | } |
434 | |
435 | return APR_SUCCESS0; |
436 | } |
437 | |
438 | APU_DECLARE(apr_status_t)apr_status_t apr_queue_term(apr_queue_t *queue) |
439 | { |
440 | apr_status_t rv; |
441 | |
442 | if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS0) { |
443 | return rv; |
444 | } |
445 | |
446 | |
447 | |
448 | |
449 | |
450 | queue->terminated = 1; |
451 | if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS0) { |
452 | return rv; |
453 | } |
454 | return apr_queue_interrupt_all(queue); |
455 | } |
456 | |
457 | #endif /* APR_HAS_THREADS */ |