Scala - TCP Packet frame using Akka

1.7k views Asked by At

Is there any way in Akka to achieve packet frame just like in Erlang with {packet,4}? Packet looks something like this:

4 bytes length in big endian | body...

For example:

00 00 00 05 H E L L O 0 0 0 5 W O R L D

Would be two packets "HELLO" and "WORLD", but they are received as one.

Or

00 00 00 05 H E L L

Now Akka receives these 8 bytes, but one more is still missing and it would be received in next call to "receive"

Problem is that my actor's receive is always called with partial or full request, but I would like to only get "body" part in receive and only when it's fully received.

So all that would be needed is that it first reads those 4 bytes and then waits until it reads other N bytes (N = length in 4 bytes header) and THEN it sends message to my actor. Is it possible somehow?

My server code:

import java.net.InetSocketAddress

import akka.actor.{Props, Actor}
import akka.io.Tcp.Bind
import akka.io.{IO, Tcp}

class Server extends Actor{
    import context.system
    import Tcp._
    IO(Tcp) ! Bind(self, new InetSocketAddress("0.0.0.0", 1234))

    def receive ={
        case bound @ Bound(localAddr) =>
            println("Server is bound to "+localAddr.toString())
        case failed @ CommandFailed(_ : Bind) =>
            context stop self
        case connected @ Connected(remote, local) =>
            val handler = context.actorOf(Props[ClientHandler])
            val connection = sender()
            println(remote.toString + "connected to "+local.toString())

            connection ! Register(handler)
    }
}
1

There are 1 answers

0
David Weber On

As far as I am aware, there is no library function for this in Akka or Scala. Akka trades in ByteString for reading and writing so I put together a trait which will do exactly what you request. You pass it a ByteString as sent to your actor. It then breaks the stream up according to the packet lengths in the headers. It is stateless so it returns a tuple containing a list of extracted packets and any unused data from the TCP stream as a ByteString. You concatenate new TCP data to the unused portion of the stream returned in this byte string as shown in the example below.

trait Buffering {

  val MAX_PACKET_LEN: Short = 10000

  /**
   * Extracts complete packets of the specified length, preserving remainder
   * data. If there is no complete packet, then we return an empty list. If
   * there are multiple packets available, all packets are extracted, Any remaining data
   * is returned to the caller for later submission
   * @param data A list of the packets extracted from the raw data in order of receipt
   * @return A list of ByteStrings containing extracted packets as well as any remaining buffer data not consumed
   */
  def getPacket(data: ByteString): (List[ByteString], ByteString) = {

    val headerSize = 2

    @tailrec
    def multiPacket(packets: List[ByteString], current: ByteString): (List[ByteString], ByteString) = {
      if (current.length < headerSize) {
        (packets.reverse, current)
      } else {
        val len = current.iterator.getShort
        if (len > MAX_PACKET_LEN || len < 0) throw new RuntimeException(s"Invalid packet length: $len")
        if (current.length < len + headerSize) {
          (packets.reverse, current)
        } else {
          val rem = current drop headerSize // Pop off header
          val (front, back) = rem.splitAt(len) // Front contains a completed packet, back contains the remaining data
          // Pull of the packet and recurse to see if there is another packet available
          multiPacket(front :: packets, back)
        }
      }
    }
    multiPacket(List[ByteString](), data)
  }

Usage from an actor is as folllows:

def receive = buffer(CompactByteString())

def buffer(buf: ByteString): Receive = {
  // Messages inbound from the network
  case Received(data) =>
    val (pkt, remainder) = getPacket(buf ++ data)
    // Do something with your packet
    context become buffer(remainder) 
  case Other Stuff => // Etc
}