Skip to content
This repository was archived by the owner on Aug 19, 2021. It is now read-only.

Add set of event queue composition functions #13

Merged
merged 4 commits into from
Aug 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions EventQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,20 @@ void EventQueue::cancel(int id) {
return equeue_cancel(&_equeue, id);
}

void EventQueue::background(Callback<void(int)> update) {
_update = update;

if (_update) {
equeue_background(&_equeue, &Callback<void(int)>::thunk, &_update);
} else {
equeue_background(&_equeue, 0, 0);
}
}

void EventQueue::chain(EventQueue *target) {
if (target) {
equeue_chain(&_equeue, &target->_equeue);
} else {
equeue_chain(&_equeue, 0);
}
}
24 changes: 24 additions & 0 deletions EventQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,29 @@ class EventQueue {
*/
void cancel(int id);

/** Background an event queue onto a single-shot timer
*
* The provided update function will be called to indicate when the queue
* should be dispatched. A negative timeout will be passed to the update
* function when the timer is no longer needed. A null update function
* will disable the existing timer.
*
* @param update Function called to indicate when the queue should be
* dispatched
*/
void background(mbed::Callback<void(int)> update);

/** Chain an event queue onto another event queue
*
* After chaining a queue to a target, calling dispatch on the target
* queue will also dispatch events from this queue. The queues will use
* their own buffers and events are handled independently. A null queue
* as the target will unchain the queue.
*
* @param target Queue to chain onto
*/
void chain(EventQueue *target);

/** Post an event to the queue
*
* @param f Function to call on event dispatch
Expand Down Expand Up @@ -221,6 +244,7 @@ class EventQueue {
void break_();

struct equeue _equeue;
mbed::Callback<void(int)> _update;

template <typename F, typename A0, typename A1, typename A2, typename A3, typename A4>
struct Context5 {
Expand Down
75 changes: 75 additions & 0 deletions equeue/equeue.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) {
q->generation = 0;
q->breaks = 0;

q->background.active = false;
q->background.update = 0;
q->background.timer = 0;

int err;
err = equeue_sema_create(&q->eventsema);
if (err < 0) {
Expand Down Expand Up @@ -68,6 +72,10 @@ void equeue_destroy(equeue_t *q) {
}
}

if (q->background.update) {
q->background.update(q->background.timer, -1);
}

equeue_mutex_destroy(&q->memlock);
equeue_mutex_destroy(&q->queuelock);
equeue_sema_destroy(&q->eventsema);
Expand Down Expand Up @@ -199,6 +207,11 @@ static int equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) {
*p = e;
e->ref = p;

if ((q->background.update && q->background.active) &&
(q->queue == e && !e->sibling)) {
q->background.update(q->background.timer, ms);
}

equeue_mutex_unlock(&q->queuelock);

return id;
Expand Down Expand Up @@ -312,6 +325,7 @@ void equeue_break(equeue_t *q) {
void equeue_dispatch(equeue_t *q, int ms) {
unsigned tick = equeue_tick();
unsigned timeout = tick + ms;
q->background.active = false;

while (1) {
// collect all the available events and next deadline
Expand Down Expand Up @@ -344,6 +358,16 @@ void equeue_dispatch(equeue_t *q, int ms) {
if (ms >= 0) {
deadline = equeue_tickdiff(timeout, tick);
if (deadline <= 0) {
// update background timer if necessary
if (q->background.update) {
equeue_mutex_lock(&q->queuelock);
if (q->background.update && q->queue) {
q->background.update(q->background.timer,
equeue_tickdiff(q->queue->target, tick));
}
q->background.active = true;
equeue_mutex_unlock(&q->queuelock);
}
return;
}
}
Expand Down Expand Up @@ -441,3 +465,54 @@ int equeue_call_every(equeue_t *q, int ms, void (*cb)(void*), void *data) {
e->data = data;
return equeue_post(q, ecallback_dispatch, e);
}

// backgrounding
void equeue_background(equeue_t *q,
void (*update)(void *timer, int ms), void *timer) {
equeue_mutex_lock(&q->queuelock);
if (q->background.update) {
q->background.update(q->background.timer, -1);
}

q->background.update = update;
q->background.timer = timer;

if (q->background.update && q->queue) {
q->background.update(q->background.timer,
equeue_tickdiff(q->queue->target, equeue_tick()));
}
q->background.active = true;
equeue_mutex_unlock(&q->queuelock);
}

struct equeue_chain_context {
equeue_t *q;
equeue_t *target;
int id;
};

static void equeue_chain_dispatch(void *p) {
equeue_dispatch((equeue_t *)p, 0);
}

static void equeue_chain_update(void *p, int ms) {
struct equeue_chain_context *c = (struct equeue_chain_context *)p;
equeue_cancel(c->target, c->id);

if (ms >= 0) {
c->id = equeue_call_in(c->target, ms, equeue_chain_dispatch, c->q);
} else {
equeue_dealloc(c->target, c);
}
}

void equeue_chain(equeue_t *q, equeue_t *target) {
struct equeue_chain_context *c = equeue_alloc(q,
sizeof(struct equeue_chain_context));

c->q = q;
c->target = target;
c->id = 0;

equeue_background(q, equeue_chain_update, c);
}
23 changes: 23 additions & 0 deletions equeue/equeue.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ typedef struct equeue {
unsigned char *data;
} slab;

struct equeue_background {
bool active;
void (*update)(void *timer, int ms);
void *timer;
} background;

equeue_sema_t eventsema;
equeue_mutex_t queuelock;
equeue_mutex_t memlock;
Expand Down Expand Up @@ -136,6 +142,23 @@ int equeue_post(equeue_t *queue, void (*cb)(void *), void *event);
// stop a currently executing event
void equeue_cancel(equeue_t *queue, int event);

// Background an event queue onto a single-shot timer
//
// The provided update function will be called to indicate when the queue
// should be dispatched. A negative timeout will be passed to the update
// function when the timer is no longer needed. A null update function
// will disable the existing timer.
void equeue_background(equeue_t *queue,
void (*update)(void *timer, int ms), void *timer);

// Chain an event queue onto another event queue
//
// After chaining a queue to a target, calling equeue_dispatch on the
// target queue will also dispatch events from this queue. The queues
// will use their own buffers and events are handled independently.
// A null queue as the target will unchain this queue.
void equeue_chain(equeue_t *queue, equeue_t *target);


#ifdef __cplusplus
}
Expand Down
70 changes: 70 additions & 0 deletions equeue/tests/tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,74 @@ void multithread_test(void) {
equeue_destroy(&q);
}

void background_func(void *p, int ms) {
*(unsigned *)p = ms;
}

void background_test(void) {
equeue_t q;
int err = equeue_create(&q, 2048);
test_assert(!err);

int id = equeue_call_in(&q, 20, pass_func, 0);
test_assert(id);

unsigned ms;
equeue_background(&q, background_func, &ms);
test_assert(ms == 20);

id = equeue_call_in(&q, 10, pass_func, 0);
test_assert(id);
test_assert(ms == 10);

id = equeue_call(&q, pass_func, 0);
test_assert(id);
test_assert(ms == 0);

equeue_dispatch(&q, 0);
test_assert(ms == 10);

equeue_destroy(&q);
test_assert(ms == -1);
}

void chain_test(void) {
equeue_t q1;
int err = equeue_create(&q1, 2048);
test_assert(!err);

equeue_t q2;
err = equeue_create(&q2, 2048);
test_assert(!err);

equeue_chain(&q2, &q1);

int touched = 0;

int id1 = equeue_call_in(&q1, 20, simple_func, &touched);
int id2 = equeue_call_in(&q2, 20, simple_func, &touched);
test_assert(id1 && id2);

id1 = equeue_call(&q1, simple_func, &touched);
id2 = equeue_call(&q2, simple_func, &touched);
test_assert(id1 && id2);

id1 = equeue_call_in(&q1, 5, simple_func, &touched);
id2 = equeue_call_in(&q2, 5, simple_func, &touched);
test_assert(id1 && id2);

equeue_cancel(&q1, id1);
equeue_cancel(&q2, id2);

id1 = equeue_call_in(&q1, 10, simple_func, &touched);
id2 = equeue_call_in(&q2, 10, simple_func, &touched);
test_assert(id1 && id2);

equeue_dispatch(&q1, 30);

test_assert(touched == 6);
}

// Barrage tests
void simple_barrage_test(int N) {
equeue_t q;
Expand Down Expand Up @@ -589,6 +657,8 @@ int main() {
test_run(period_test);
test_run(nested_test);
test_run(sloth_test);
test_run(background_test);
test_run(chain_test);
test_run(multithread_test);
test_run(simple_barrage_test, 20);
test_run(fragmenting_barrage_test, 20);
Expand Down