Featured image of post Time based priority queue in Kotlin with coroutines

Time based priority queue in Kotlin with coroutines

Introduction

How can we design a priority queue in Kotlin taking coroutines into account?

The code of this class can be found in my lib-p2p project: TimedPriorityQueue.kt

What are Priority Queues?

But first, what is a priority queue? A priority queue is a queue where its element are sorted (prioritized). In a normal queue, the order of taking elements from the queue is the same order as when the elements were added to the queue. So, when you add elements A, B and C (in this order) to the queue, you also take first A then B, then C from the queue. In a priority queue, all elements have a certain weight, and when you take an element from the queue, the element with the highest weight is taken first. When adding the elements A$_3$, B$_9$ and C$_8$ (where the subscript denotes the weight of the element). First B is taken, then C and finally A. Note that the queue always reorders the elements such that the element with the highest priority is taken first.

So… Java already has a priority queue, can’t we use that in Kotlin? Well, yes but that one is not coroutines aware. If the queue is empty, and you want to take an element, you want the coroutine to be suspended. Same when you want to add an element and the queue is full, you want the coroutine to be suspended. Channels are great, but you can not order the elements.

In this blog I discuss the design of a priority queue in Kotlin that is coroutines aware.

Time based priority queue

The elements in the priority queue that we are discussing here, are prioritized on time. Every element in the queue has a “schedule time”, i.e. the time that this element must be scheduled. The queue orders the elements according to its scheduling time, and the “dequeue” method suspends (even when there are elements in the queue) until the scheduling time of the top element has been passed.

Class declaration

The TimedPriorityQueue queue needs two arguments: a CoroutineScope, and a capacity. The elements of the queue is a generic, as long as it implements the TimedQueueElement interface.

1
2
3
4
5
6
7
8
interface TimedQueueElement {
    val scheduleTime: Instant
}

internal class TimedPriorityQueue<T : TimedQueueElement>(
    scope: CoroutineScope,
    private val capacity: Int,
)

Three core elements

This design revolves around three core elements: the priority queue itself, an input and an output channel.

The priority queue is for prioritizing the elements:

1
private val priorityQueue = PriorityBlockingQueue(capacity, ::comparator)

The comparator method is for ordering (prioritizing) the elements:

1
2
3
private fun comparator(elementA: T, elementB: T): Int {
    return elementA.scheduleTime.compareTo(elementB.scheduleTime)
}

This will sort the elements in the priority queue on time.

Input/output channels

These are needed for the suspend/resume behaviour, and to enqueue/dequeue elements to/from the priority queue.

1
2
private val inputChannel = Channel<T>(RENDEZVOUS)
private val outputChannel = Channel<T>(RENDEZVOUS)

The type of the channels is of course T, the type of the elements in the queue.

The reason RENDEZVOUS is used for the channel capacity is, is because RENDEZVOUS channels do not have a buffer. We do not want to keep elements in these channels, because we can not sort those. There should only one object that holds elements, and that is the priorityQueue. Yes, the channel contains one element that is not part of the prioritization process. This is fine because this element is taken from the channel as soon as possible. And we can always have the situation that a queue is completely full of lower priority elements and a higher element is presented.

General operation

As said, the inputChannel is used for feeding the priorityQueue, and the outputChannel is used to take elements from the queue and provide them to the user:

1
2
3
4
5
6
7
suspend fun queueElement(element: T) {
    inputChannel.send(element)
}

suspend fun dequeueElement(): T {
    return outputChannel.receive()
}

Note that both channels do not have a buffer, so these two methods suspend when the inputChannel/outputChannel contain only one element.

During init a new coroutine is created, which essentially loops as long as the channels are open:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
init {
    scope.launch(_context + CoroutineName("timed-prio-queue")) {
        while (!inputChannel.isClosedForReceive || !outputChannel.isClosedForSend) {
            try {
                receiveAndFillUpPriorityQueue()
                sendOneEligibleElement()
                if (outputChannel.isClosedForSend) {
                    // outputChannel is closed. Makes no sense for further processing.
                    inputChannel.cancel()
                    outputChannel.cancel()
                    priorityQueue.clear()
                } else if (inputChannel.isClosedForReceive && priorityQueue.isEmpty()) {
                    outputChannel.close()
                }
            } catch (e: CancellationException) {
                break
            } catch (e: Exception) {
                logger.warn { "Unexpected error occurred in timed priority queue: ${errorMessage(e)}" }
                throw e
            }
        }
    }
}

As long as the inputChannel is open for receive, and the outputChannel open for send, try to read elements from the inputChannel and fill up to queue as much as possible. Also, try to send the top element to the outputChannel.

If the outputChannel is closed (the user is no longer interested in updates), there is no use of the inputChannel and priorityQueue. Then cleanup everything, and the while-loop stops. When the inputChannel is closed (there are no more elements the provider can give) we can not close the outputChannel immediately since we have to wait until the priorityQueue is empty (the user can still take elements from the queue until its empty).

Filling up the queue

The method to fill up the priority queue is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Try to read as many elements as possible and fill up the priority queue.
private suspend fun receiveAndFillUpPriorityQueue() {
    while (!inputChannel.isClosedForReceive) {
        if (priorityQueue.size == 0) {
            try {
                priorityQueue.add(inputChannel.receive())
            } catch (e: ClosedReceiveChannelException) {
                break
            }
        } else if (priorityQueue.size < capacity) {
            val element = inputChannel.tryReceive().getOrNull()
            if (element != null) {
                priorityQueue.add(element)
            } else {
                // Nothing on the inputChannel. Try to send something out.
                yield()
                break
            }
        } else {
            // The priorityQueue is full
            yield()
            break
        }
    }
}

Do, as long the inputChannel is not closed for receiving, one of the following actions based on the size of the priority queue:

  • If the priorityQueue is empty, call receive() on the inputChannel and add it to the queue. Note that receiving from the inputChannel is a suspending operation: it blocks until an element can be taken from the channel. This is okay, since the priorityQueue is empty. And because of this, no prioritizing needs to take place, and also we can not take elements from an empty queue.

  • If there are elements in the priorityQueue call tryReceive(). This is not a suspending operation, and it either returns the element, or null when no element is found. When an element is taken from the inputChannel, we add it to the queue and we continue input processing. We take precedence filling up the priorityQueue over sending elements out to the outputChannel. This is because the priorityQueue prioritizes elements and needs to be filled up as much as possible to properly prioritize elements. If no more elements are on the inputChannel, we yield() and break the loop. In this case we have to give the output a chance to send out elements.

  • When the priorityQueue is full, we immediately yield() and break the loop. Input processing does not make sense when the queue is full.

Taking elements from the queue

This method looks a s follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Try to send one element, if eligible. The reason we only try to send one element is that we
// try to fill up the priority queue as much as possible. The prioritization happens in  this queue
// and the more elements it has, the better this prioritization works. If we send more elements, we
// miss the chance of a high priority element presented at the input queue not processed on time.
private suspend fun sendOneEligibleElement() {
    if (!outputChannel.isClosedForSend && priorityQueue.size > 0) {
        val element = priorityQueue.peek()
        val scheduleTime = element.scheduleTime
        val duration = Instant.now().until(scheduleTime, ChronoUnit.MILLIS)
        if (priorityQueue.size >= capacity || inputChannel.isClosedForReceive) {
            // The priority queue is full, which means that the order in the queue is fixed.
            // Or the inputChannel is closed, which means no more elements will be added to the priority queue
            // In both cases we can wait on the first element to expire and then send that element
            if (duration > 0) {
                delay(duration)
            }
            outputChannel.send(element)
            priorityQueue.poll()
        } else if (duration <= 0 && outputChannel.trySend(element).isSuccess) {
            // If the first is expired, try to send it to the outputChannel.
            // We succeeded in sending the element. Remove it from the queue.
            priorityQueue.poll()
        } else {
            yield()
        }
    }
}

Firstly, only proceed when the outputChannel is not closed for sending, and there are elements in the priorityQueue. Then, peek the top element from the queue, take its schedule time and calculate the duration between now and the schedule time. If the priorityQueue is full, or the inputChannel is closed for receiving, the order of the elements in the queue is fixed. In this case we can sleep until the scheduleTime of the top element has been passed (when duration is negative, the scheduleTime is in the past. In this case proceed immediately). When the scheduleTime of the top element has been passed, add the element to the outputChannel and take it from the queue. Sending an element to the outputChannel is blocking, but in a full queue or when the inputChannel is closed is okay (we can re-prioritize on a full queue).

Otherwise, when the scheduleTime of the top element has been passed then try to send the element on the outputChannel. If this succeeds, remove it from the queue. If it does not succeed, we yield() and start all over (processing input, and trying to output the next eligible element). Note that we peek the element from the queue here, and not immediately take it. This is because trySend can fail. In this case the element should be processed again for the following iteration. Hence, it should be still part of the queue.

Selecting on input and output channel

Kotlin has the nice ability to await multiple suspending functions simultaneously and select the first one that becomes available.

Our time based priority queue also provides this functionality with these two methods:

1
2
3
4
5
val onSend: SelectClause2<T, SendChannel<T>>
    get() = inputChannel.onSend

val onReceive: SelectClause1<T>
    get() = outputChannel.onReceive

Which can be used as such:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
select {
    requestChannel.onReceive {
        handleRequest(it)
    }
    queue.onReceive {
        doDial(it)
    }
    responseChannel.onReceive {
        handleResponse(it)
    }
}

This awaits on requestChannel, responseChannel (not part of this discussion), and the priority queue. Whenever becomes available first (it is the element from the queue).

Testing

See TimedPriorityQueueTest.kt for some unit tests.

Built with Hugo
Theme Stack designed by Jimmy