April 2024 - This site, and Kamaelia are being updated. There is significant work needed, and PRs are welcome.

Kamaelia.Protocol.Framing

Simple data Framing and chunking

A simple set of components for framing data and chunking it, and for reversing the process.

The Framer component frames messages as string, prefixed with a tag (eg. sequence number) and their length. The Chunker component inserts markers into the data stream to identify the start of chunks (eg. frames).

The DeChunker and DeFramer reverse the process.

Example Usage

Framing messages for transport over a stream based connection (eg, TCP):

Pipeline(MessageSource(...),   # emits message
         DataChunker(),
         TCPClient("<server ip>", 1500),
        ).activate()

And on the server:

Pipeline(SingleServer(1500),
         DataDeChunker(),
         MessageReceiver(...)
        ).activate()

Packing data for transport over a link that may loose packets:

Pipeline(DataSource(...),     # emits (sequence_number, data) pairs
         Framer(),
         Chunker(),
         UnreliableTransportMechanismSender(),
        ).activate()

At the receiver:

Pipeline(UnreliableTransportMechanismReceiver(),
         DeChunker(),
         DeFramer(),
         DataHandler()        # receives (sequence_number, data) pairs
        ).activate()

Note that this example doesn't attempt to fix errors in the stream, just detect them.

How does it work?

Framer / DeFramer

Framer/DeFramer frame and deframe data pairs of the form (tag,data). 'data' should be the main payload, and 'tag' is suitable for something like a frame sequence number.

Both tag and data are treated as strings. 'data' can contain any data. 'tag' should not contain newline or any whitespace character(s).

The framed data has the format "<tag> <length> <data>" where 'length' is the length of the 'data' string.

The SimpleFrame class performs the actual framing and deframing of the data.

These components terminate if they receive a producerFinished() message on their "control" inbox. They pass the message onto their "signal" outbox before terminating.

DataChunker / DataDeChunker

The DataChunker/DataDeChunker components chunk and dechunk the data by inserting 'sync' sequences of characters to delimit chunks of data. Each message received by DataChunker on its "inbox" inbox is considered a chunk.

DataChunker prefixes each chunk with the 'sync' message sequence and escapes any occurrences of that sequence within the data itself. The result is output on its "outbox" outbox.

DataDeChunker does the reverse process. If data is received without a preceeding 'sync' sequence then there is no way to tell if that chunk is complete (whole) and it will be discarded. Once the internal buffer contains a full chunk of data with a 'sync' sequence before and after it, that chunk is output from its "outbox" outbox. The 'sync' sequences are removed and any escaped occurrences of the 'sync' message within the data are un-escaped again.

Note that DataDeChunker buffers chunks until it knows they have been fully received. If a final chunk is not followed by a occurence of the 'sync' message then it will never be output.

However DataDeChunker can be told to flush the remaining contents of its buffer by sending any message to its "flush" inbox.

These components terminate if they receive a producerFinished() message on their "control" inbox. They pass the message onto their "signal" outbox before terminating.


Kamaelia.Protocol.Framing.DataChunker

class DataChunker(Axon.Component.component)

DataChunker([syncmessage]) -> new DataChunker component.

Delineates messages by prefixing them with a 'sync' sequence, allowing a receiver to synchronise to the chunks in the stream. Any occurrences of the sequence within the message itself are escaped to prevent misinterpretation.

Keyword arguments:

  • syncmessage -- string to use as 'sync' sequence (default="XXXXXXXXXXXXXXXXXXXXXXXX")

Inboxes

  • control : shutdown messages (producerFinished)
  • inbox : message strings to be chunked

Outboxes

  • outbox : chunked message strings
  • signal : producerFinished shutdown messages

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

__init__(self[, syncmessage])

x.__init__(...) initializes x; see x.__class__.__doc__ for signature

encodeChunk(self, message)

Returns the message with the 'sync' message prefix and escaping done.

escapeSyncMessage(self, message)

Returns the message, with occurrences of 'sync' message escaped.

main(self)

Main loop.

shutdown(self)

Shutdown on producerFinished message arriving at "control" inbox.

Kamaelia.Protocol.Framing.DataDeChunker

class DataDeChunker(Axon.Component.component)

DataDeChunker([syncmessage]) -> new DataDeChunker component.

Synchronises to a stream of string data, delimited into chunks by a 'sync' sequence. Chunks are buffered until the next 'sync sequence is received and are then passed on.

Keyword arguments:

  • syncmessage -- string to use as 'sync' sequence (default="XXXXXXXXXXXXXXXXXXXXXXXX")

Inboxes

  • control : shutdown messages (producerFinished)
  • inbox : partial message chunk strings
  • flush : instructions to flush the internal buffer

Outboxes

  • outbox : dechunked message strings
  • signal : producerFinished shutdown messages

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

__init__(self[, syncmessage])

x.__init__(...) initializes x; see x.__class__.__doc__ for signature

decodeChunk(self, chunk)

unEscape and return the chunk, sans the 'sync' sequence prefix, or raise IncompleteChunk if the chunk isn't prefixed with the 'sync' sequence (can't guarantee the chunk is whole).

main(self)

Main loop.

shouldFlush(self)

Returns non-zero if a message has been received on the "flush" inbox

shutdown(self)

Shutdown on producerFinished message arriving at "control" inbox.

unEscapeSyncMessage(self, message)

Returns message with escaped occurrences of the 'sync' message unescaped again.

Kamaelia.Protocol.Framing.DeFramer

class DeFramer(Axon.Component.component)

DeFramer -> new DeFramer component.

Converts string that were framed using the Framer component back into (tag, data) pairs.

Inboxes

  • control : shutdown messages (producerFinished)
  • inbox : framed data strings

Outboxes

  • outbox : deframed (tag, data) pairs
  • signal : producerFinished shutdown messages

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

main(self)

Main loop.

shutdown(self)

Shutdown on producerFinished message arriving at "control" inbox.

Kamaelia.Protocol.Framing.Framer

class Framer(Axon.Component.component)

Framer() -> new Framer component.

Frames (tag, data) pairs into strings containing the same data.

Inboxes

  • control : shutdown messages (producerFinished)
  • inbox : (tag, data) pairs to be framed

Outboxes

  • outbox : framed data strings
  • signal : producerFinished shutdown messages

Methods defined here

Warning!

You should be using the inbox/outbox interface, not these methods (except construction). This documentation is designed as a roadmap as to their functionalilty for maintainers and new component developers.

main(self)

Main loop.

shutdown(self)

Shutdown on producerFinished message arriving at "control" inbox.

Feedback

Got a problem with the documentation? Something unclear that could be clearer? Want to help improve it? Constructive criticism is very welcome - especially if you can suggest a better rewording!

Please leave you feedback here in reply to the documentation thread in the Kamaelia blog.

-- Automatic documentation generator, 05 Jun 2009 at 03:01:38 UTC/GMT