Introduction
How can we design a priority queue in Kotlin taking coroutines into account?
The code of this class can be found in my lib-p2p project: TimedPriorityQueue.kt
What are Priority Queues?
But first, what is a priority queue? A priority queue is a queue where its element are sorted (prioritized). In a normal queue, the order of taking elements from the queue is the same order as when the elements were added to the queue. So, when you add elements A, B and C (in this order) to the queue, you also take first A then B, then C from the queue. In a priority queue, all elements have a certain weight, and when you take an element from the queue, the element with the highest weight is taken first. When adding the elements A$_3$, B$_9$ and C$_8$ (where the subscript denotes the weight of the element). First B is taken, then C and finally A. Note that the queue always reorders the elements such that the element with the highest priority is taken first.
So… Java already has a priority queue, can’t we use that in Kotlin? Well, yes but that one is not coroutines aware. If the queue is empty, and you want to take an element, you want the coroutine to be suspended. Same when you want to add an element and the queue is full, you want the coroutine to be suspended. Channels are great, but you can not order the elements.
In this blog I discuss the design of a priority queue in Kotlin that is coroutines aware.
Time based priority queue
The elements in the priority queue that we are discussing here, are prioritized on time. Every element in the queue has a “schedule time”, i.e. the time that this element must be scheduled. The queue orders the elements according to its scheduling time, and the “dequeue” method suspends (even when there are elements in the queue) until the scheduling time of the top element has been passed.
Class declaration
The TimedPriorityQueue queue needs two arguments: a CoroutineScope, and a capacity. The elements of the queue is a generic, as long as it implements the TimedQueueElement interface.
|
|
Three core elements
This design revolves around three core elements: the priority queue itself, an input and an output channel.
The priority queue is for prioritizing the elements:
|
|
The comparator method is for ordering (prioritizing) the elements:
|
|
This will sort the elements in the priority queue on time.
Input/output channels
These are needed for the suspend/resume behaviour, and to enqueue/dequeue elements to/from the priority queue.
|
|
The type of the channels is of course T, the type of the elements in the queue.
The reason RENDEZVOUS is used for the channel capacity is, is because RENDEZVOUS channels do not have a buffer. We do not want to keep elements in these channels, because we can not sort those. There should only one object that holds elements, and
that is the priorityQueue. Yes, the channel contains one element that is not part of the prioritization process. This is fine because this element is taken from the channel as soon as possible. And we can always have the situation that a queue is
completely full of lower priority elements and a higher element is presented.
General operation
As said, the inputChannel is used for feeding the priorityQueue, and the outputChannel is used to take elements from the queue and provide them to the user:
|
|
Note that both channels do not have a buffer, so these two methods suspend when the inputChannel/outputChannel contain only one element.
During init a new coroutine is created, which essentially loops as long as the channels are open:
|
|
As long as the inputChannel is open for receive, and the outputChannel open for send, try to read elements from the inputChannel and fill up to queue as much as possible. Also, try to send the top element to the outputChannel.
If the outputChannel is closed (the user is no longer interested in updates), there is no use of the inputChannel and priorityQueue. Then cleanup everything, and the while-loop stops. When the inputChannel is closed (there are no more elements the provider can give) we can not close the outputChannel immediately since we have to wait until the priorityQueue is empty (the user can still take elements from the queue until its empty).
Filling up the queue
The method to fill up the priority queue is as follows:
|
|
Do, as long the inputChannel is not closed for receiving, one of the following actions based on the size of the priority queue:
-
If the priorityQueue is empty, call
receive()on the inputChannel and add it to the queue. Note that receiving from the inputChannel is a suspending operation: it blocks until an element can be taken from the channel. This is okay, since the priorityQueue is empty. And because of this, no prioritizing needs to take place, and also we can not take elements from an empty queue. -
If there are elements in the priorityQueue call
tryReceive(). This is not a suspending operation, and it either returns the element, or null when no element is found. When an element is taken from the inputChannel, we add it to the queue and we continue input processing. We take precedence filling up the priorityQueue over sending elements out to the outputChannel. This is because the priorityQueue prioritizes elements and needs to be filled up as much as possible to properly prioritize elements. If no more elements are on the inputChannel, weyield()and break the loop. In this case we have to give the output a chance to send out elements. -
When the priorityQueue is full, we immediately
yield()and break the loop. Input processing does not make sense when the queue is full.
Taking elements from the queue
This method looks a s follows:
|
|
Firstly, only proceed when the outputChannel is not closed for sending, and there are elements in the priorityQueue. Then, peek the top element from the queue, take its schedule time and calculate the duration between now and the schedule time. If the priorityQueue is full, or the inputChannel is closed for receiving, the order of the elements in the queue is fixed. In this case we can sleep until the scheduleTime of the top element has been passed (when duration is negative, the scheduleTime is in the past. In this case proceed immediately). When the scheduleTime of the top element has been passed, add the element to the outputChannel and take it from the queue. Sending an element to the outputChannel is blocking, but in a full queue or when the inputChannel is closed is okay (we can re-prioritize on a full queue).
Otherwise, when the scheduleTime of the top element has been passed then try to send the element on the outputChannel. If this succeeds, remove it from the queue. If it does not succeed, we yield() and start all over (processing input, and trying
to output the next eligible element). Note that we peek the element from the queue here, and not immediately take it. This is because trySend can fail. In this case the element should be processed again for the following iteration. Hence, it should
be still part of the queue.
Selecting on input and output channel
Kotlin has the nice ability to await multiple suspending functions simultaneously and select the first one that becomes available.
Our time based priority queue also provides this functionality with these two methods:
|
|
Which can be used as such:
|
|
This awaits on requestChannel, responseChannel (not part of this discussion), and the priority queue. Whenever becomes available first (it is the element from the queue).
Testing
See TimedPriorityQueueTest.kt for some unit tests.