Event driven simulation using priority queue implemented with binary heap

1.9k views Asked by At

I need to simulate the execution of a set of tasks that are given. This means that you need to keep track of which tasks are active at any given point in time, and remove them from the active list as they finish.

I need to use priority queue for this problem, to be implemented using binary heap.

The input consists of a set of tasks given in increasing order of start time, and each task has a certain duration associated. The first line is number of tasks, for example

3
2 5
4 23
7 4

This means that there are 3 tasks. The first one starts at time 2, and ends at 7 (2+5). Second starts at 4, ends at 27. Third starts at 7, ends at 11.

We can keep track of number of active tasks:

Time       #tasks
0 - 2        0
2 - 4        1
4 - 11       2
11 - 27      1

I need to find:

  1. Max number of active tasks at any given time (2 in this case) and
  2. Average number of active tasks over the entire duration computed here as :

[ 0*(2-0) + 1*(4-2) + 2*(11-4) + 1*(27-11) ] / 27

I've written the following code to read in the time values into a structure:

#include "stdio.h"
#include "stdlib.h"

typedef struct
{
    long int start;
    long int end;
    int dur;
} task;

int main()
{
    long int num_tasks;
    scanf("%ld",&num_tasks);
    task *t = new task[num_tasks];
    for (int i=0;i<num_tasks;i++)
    {
        scanf("%ld %d",&t[i].start,&t[i].dur);
        t[i].end = t[i].start + t[i].dur;
    }
}

I would like to understand how this can be implemented as a heap priority queue and to obtain the necessary outputs from the heap.

2

There are 2 answers

4
user3386109 On BEST ANSWER

This problem can be solved by using two heaps, one that contains start times, and the other contains end times. When reading the tasks, add the start and end times to the two heaps. Then the algorithm looks like this:

number_of_tasks = 0

while start_heap not empty
    if min_start_time < min_end_time
       pop min_start_time
       number_of_tasks += 1    
    else if min_end_time < min_start_time
       pop min_end_time
       number_of_tasks -= 1
    else 
       pop min_start_time
       pop min_end_time

while end_heap not empty
       pop min_end_time
       number_of_tasks -= 1
0
pjs On

Since you said pseudocode would be sufficient for you, I'm taking you at your word. The following is implemented in Ruby, which is like runnable pseudo-code. I've also commented it fairly extensively.

The approach outlined here only needs one priority queue. Your model conceptually revolves around two events - when a task starts, and when it ends. A very flexible discrete-event implementation mechanism is to use a priority queue to store event notices, ordered by the time the event will fire. Each event is implemented as a separate method/function which does whatever state transitions are associated with the event, and can schedule further events by putting their event notices on the priority queue. You then need an executive loop which keeps pulling event notices off the priority queue, updates the clock to the current event's time, and invokes the corresponding event method. For more information on this approach, see this paper. The paper implements these concepts in Java, but they can be (and are) implemented in lots of other languages.

Without further ado, here's a working implementation for your case:

# User needs to type "gem install simplekit" on the command line to
# snag a copy of this library from the public gem repository
require 'simplekit' # contains a PriorityQueue implementation

# define an "event notice" structure which stores the tag for an event method,
# the time the event should occur, and what arguments are to be passed to it.
EVENT_NOTICE = Struct.new(:event, :time, :args) {
  include Comparable
  def <=>(other)    # define a comparison ordering based on my time vs other's time
    return time - other.time  # comparison of two times is positive, zero, or negative
  end
}

@pq = PriorityQueue.new    # @ makes globally shared (not really, but close enough for illustration purposes)
@num_tasks = 0      # number of tasks currently active
@clock = 0          # current time in the simulation

# define a report method
def report()
  puts "#{@clock}: #{@num_tasks}"  # print current simulation time & num_tasks
end

# define an event for starting a task, that increments the @num_tasks counter
# and uses the @clock and task duration to schedule when this task will end
# by pushing a suitable EVENT_NOTICE onto the priority queue.
def start_task(current_task)
  @num_tasks += 1
  @pq.push(EVENT_NOTICE.new(:end_task, @clock + current_task.duration, nil))
  report()
end

# define an event for ending a task, which decrements the counter
def end_task(_)   # _ means ignore any argument
  @num_tasks -= 1
  report()
end

# define a task as a suitable structure containing start time and duration
task = Struct.new(:start, :duration)

# Create a set of three tasks.  I've wired them in, but they could
# be read in or generated dynamically.
task_set = [task.new(2, 5), task.new(4, 23), task.new(7, 4)]

# Add each of the task's start_task event to the priority queue, ordered
# by time of occurrence (as specified in EVENT_NOTICE)
for t in task_set
  @pq.push(EVENT_NOTICE.new(:start_task, t.start, t))
end

report()
# Keep popping EVENT_NOTICE's off the priority queue until you run out. For
# each notice, update the @clock and invoke the event contained in the notice
until @pq.empty?
  current_event = @pq.pop
  @clock = current_event.time
  send(current_event.event, current_event.args)
end

I used Ruby because while it looks like pseudocode, this actually runs and produces the following output:

0: 0
2: 1
4: 2
7: 1
7: 2
11: 1
27: 0


C IMPLEMENTATION

I finally broke loose some time to brush up on twenty year old skills and implement this in C. The structure is very similar to that of Ruby, but there are a lot more details that need to be managed. I've factored this into the model, the simulation engine, and the heap to show that the executive loop is distinct from the specifics of any particular model. Here is the model implementation itself, which illustrates the "events are functions" orientation of building a model.

model.c

#include <stdio.h>
#include <stdlib.h>
#include "sim_engine.h"

// define a task as a suitable structure containing start time and duration
typedef struct {
  double start;
  double duration;
} task;

// stamp out new tasks on demand
task* new_task(double start, double duration) {
  task* t = (task*) calloc(1, sizeof(task));
  t->start = start;
  t->duration = duration;
  return t;
}

// create state variables
static int num_tasks;

// provide reporting
void report() {
  // print current simulation time & num_tasks
  printf("%8.3lf: %d\n", sim_time(), num_tasks);
}

// define an event for ending a task, which decrements the counter
void end_task(void* current_task) {
  --num_tasks;
  free(current_task);
  report();
}

// define an event for starting a task, that increments the num_tasks counter
// and uses the task duration to schedule when this task will end.
void start_task(void* current_task) {
  ++num_tasks;
  schedule(end_task, ((task*) current_task)->duration, current_task);
  report();
}

// all event graphs must supply an initialize event to kickstart the process.
void initialize() {
  num_tasks = 0;      // number of tasks currently active
  // Create an initial set of three tasks.  I've wired them in, but they could
  // be read in or generated dynamically.
  task* task_set[] = {
    new_task(2.0, 5.0), new_task(4.0, 23.0), new_task(7.0, 4.0)
  };
  // Add each of the task's start_task event to the priority queue, ordered
  // by time of occurrence.  In general, though, events can be scheduled
  // dynamically from trigger events.
  for(int i = 0; i < 3; ++i) {
    schedule(start_task, task_set[i]->start, task_set[i]);
  }
  report();
}

int main() {
  run_sim();
  return 0;
}

Note the strong similarity in layout to the Ruby implementation. Other than having floating point time, the output is identical to the Ruby version. (Ruby would have given decimal places as well if they were needed, but with the trial tasks given by OP that wasn't necessary.)

Next are the simulation engine headers and implementation. Note that this is designed to isolate the model builder from direct use of the priority queue. The details are handled by a schedule() front-end to put things into the event list, and the executive loop to extract and run things at the correct point in time.

sim_engine.h

typedef void (*event_p)(void*);

void initialize();
void schedule(event_p event, double delay, void* args);
void run_sim();
double sim_time();

sim_engine.c

#include <stdlib.h>
#include "sim_engine.h"
#include "heap.h"

typedef struct {
  double time;
  event_p event;
  void* args;
} event_notice;

static heap_t *event_list;
static double sim_clock;

// return the current simulated time on demand
double sim_time() {
  return sim_clock;
}

// schedule the specified event to occur after the specified delay, with args
void schedule(event_p event, double delay, void* args) {
  event_notice* en = (event_notice*) calloc(1, sizeof(event_notice));
  en->time = sim_clock + delay;
  en->event = event;
  en->args = args;
  push(event_list, en->time, en);
}

// simulation executive loop.
void run_sim() {
  event_list = (heap_t *) calloc(1, sizeof(heap_t));
  sim_clock = 0.0;     // initialize time in the simulation

  initialize();

  // Keep popping event_notice's off the priority queue until you run out. For
  // each notice, update the clock, invoke the event contained in the notice,
  // and cleanup.
  while(event_list->len > 0) {
    event_notice* current_event = pop(event_list);
    sim_clock = current_event->time;
    current_event->event(current_event->args);
    free(current_event);
  }
}

And finally, the priority queue implementation lifted whole-hog from Rosetta Code, refactored, and switched to use void* for the data payload rather than strings.

heap.h

typedef struct {
    double priority;
    void *data;
} node_t;

typedef struct {
    node_t *nodes;
    int len;
    int size;
} heap_t;

void push(heap_t *h, double priority, void *data);
void *pop(heap_t *h);

heap.c

#include <stdlib.h>
#include "heap.h"

void push(heap_t *h, double priority, void *data) {
    if (h->len + 1 >= h->size) {
        h->size = h->size ? h->size * 2 : 4;
        h->nodes = (node_t *)realloc(h->nodes, h->size * sizeof (node_t));
    }
    int i = h->len + 1;
    int j = i / 2;
    while (i > 1 && h->nodes[j].priority > priority) {
        h->nodes[i] = h->nodes[j];
        i = j;
        j = j / 2;
    }
    h->nodes[i].priority = priority;
    h->nodes[i].data = data;
    h->len++;
}

void *pop(heap_t *h) {
    int i, j, k;
    if (!h->len) {
        return NULL;
    }
    void *data = h->nodes[1].data;

    h->nodes[1] = h->nodes[h->len];

    h->len--;

    i = 1;
    while (i!=h->len+1) {
        k = h->len+1;
        j = 2 * i;
        if (j <= h->len && h->nodes[j].priority < h->nodes[k].priority) {
            k = j;
        }
        if (j + 1 <= h->len && h->nodes[j + 1].priority < h->nodes[k].priority) {
            k = j + 1;
        }
        h->nodes[i] = h->nodes[k];
        i = k;
    }
    return data;
}

Bottom line: This event scheduling approach is extremely flexible, and pretty straightforward to implement given implementations for a priority queue and the simulation engine. As you can see, the engine is actually pretty trivial.