pylibschc: A python wrapper for libSCHC

https://github.com/anr-bmbf-pivot/pylibschc/actions/workflows/test.yml/badge.svg https://codecov.io/gh/anr-bmbf-pivot/pylibschc/branch/main/graph/badge.svg?token=KPOQ0ERP9H PyPI - Status PyPI - Python Version

This provides a pythonic wrapper for libSCHC.

Installation

You can use pip to install the package once from PyPI:

pip install pylibschc

Usage

More documentation can be found here.

Rules

Rules are managed using a pydantic model, i.e., they can be loaded from a correctly typed dictionary (e.g. generated from a JSON or YAML file) using the pylibschc.rules module:

>>> import json
>>> from pylibschc.rules import Config
>>>
>>> with open("tests/artifacts/rules_example.json", encoding="utf-8") as f:
...    rules = Config(**json.load(f))
...    config = rules.deploy()

Do not forget to call the deploy() method if you change any rules to re-deploy the rules with libSCHC.

The header file for the rules, so they can be used with libSCHC on an embedded device, can be generated using

>>> with open("rule_config.h", "w", encoding="utf-8") as f:
...     written = f.write(rules.to_c_header())

An example for such a dictionary is provided in rules_example.json as JSON, the documentation of the concrete pydantic model you can find its API reference.

Compression/Decompression

Both compression and decompression can be achieved by using the pylibschc.compressor.Decompressor class. We use scapy in our example to construct a valid CoAP over IPv6 packet for compression for which the output() method is called:

>>> from scapy.all import IPv6, UDP, raw
>>> from scapy.contrib.coap import CoAP
>>> import pylibschc.compressor
>>>
>>> comp_dec = pylibschc.compressor.CompressorDecompressor(device=config.devices[0])
>>> pkt = raw(
...     IPv6(hlim=64, src="2001:db8::1", dst="2001:db8:1::2")
...     / UDP(
...         sport=5683,
...         dport=61618,
...     )
...     / CoAP(
...         ver=1,
...         code="2.05 Content",
...         type="NON",
...         msg_id=0x23B3,
...         token=b"\x32\x3a\xf3\xa3",
...         paymark=b"\xff",
...     )
...     / (
...         b'[{"id":1, "name":"CJ.H.L.(Joe) Lecomte) Heliport","code":"YOY","country":"CA"}]'
...     )
... )
>>> res, bit_array = comp_dec.output(pkt, direction=pylibschc.rules.Direction.UP)
>>> res
<CompressionResult.COMPRESSED: 1>
>>> bit_array.buffer
b'\x01\t3#\xaf:5\xb7\xb2&\x96B#\xa3\x12\xc2\x02&\xe6\x16\xd6R#\xa2$4\xa2\xe4\x82\xe4\xc2\xe2\x84\xa6\xf6R\x92\x04\xc6V6\xf6\xd7FR\x92\x04\x86V\xc6\x97\x06\xf7\'B"\xc2&6\xf6FR#\xa2%\x94\xf5\x92"\xc2&6\xf7V\xe7G\'\x92#\xa2$4\x12\'\xd5\xd0'

For decompression, the input() method is called:

>>> comp_dec.input(bit_array, direction=pylibschc.rules.Direction.UP)
b'`\x00\x00\x00\x00`\x11@ \x01\r\xb8\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01 \x01\r\xb8\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x163\xf0\xb2\x00`r\xf2TE#\xb32:\xf3\xa3\xff[{"id":1, "name":"CJ.H.L.(Joe) Lecomte) Heliport","code":"YOY","country":"CA"}]'
>>> pkt == comp_dec.input(bit_array, direction=pylibschc.rules.Direction.UP)
True

Both input() and output() take either BitArray- or bytes-typed variables as input.

Fragmentation/Reassembly

For fragmentation, call the output() method of a pylibschc.fragmenter.FragmenterReassembler object. To actually send then from the, a send function needs to be registered for the device of the fragmenter. For reassembly or to handle acknowledgments, call the input() method of a pylibschc.fragmenter.FragmenterReassembler object. Both input() and output(), take either BitArray- or bytes-typed variables as input.

>>> import asyncio
>>> import logging
>>> import pylibschc.fragmenter
>>>
>>> fragmenter_queue = None
>>> loop = None
>>> timer_tasks = {}
>>> reassembly_buffer = None
>>> # shorten waiting times for this example
>>> config.devices[0].duty_cycle_ms = 500
>>>
>>> def send(buffer):
...     fragmenter_queue.put_nowait({"cmd": "send", "data": buffer})
...     return len(buffer)
...
>>> def post_timer_task(conn, timer_task, delay_sec, arg):
...     if conn in timer_tasks:
...         remove_timer_entry(conn)
...     timer_tasks[conn] = loop.call_later(delay_sec, timer_task, arg)
...
>>> def remove_timer_entry(conn):
...     if conn in timer_tasks:
...         timer_tasks[conn].cancel()
...         del timer_tasks[conn]
...
>>> def end_rx(conn):
...     reassembly_buffer.set_result(conn.mbuf)
...
>>> def end_tx(conn):
...     fragmenter_queue.put_nowait({"cmd": "end_tx"})
...
>>> async def asyncized_input(reassembler, buffer):
...     return reassembler.input(buffer)
...
>>> async def fragment_and_reassemble():
...     # just making sure these variables are initialized in the same loop
...     global fragmenter_queue
...     global loop
...     global reassembly_buffer
...
...     fragmenter_queue = asyncio.Queue()
...     loop = asyncio.get_running_loop()
...     reassembly_buffer = loop.create_future()
...     fragmenter = pylibschc.fragmenter.FragmenterReassembler(
...         device=config.devices[0],
...         mode=pylibschc.fragmenter.FragmentationMode.NO_ACK,
...         post_timer_task=post_timer_task,
...         end_tx=end_tx,
...         remove_timer_entry=remove_timer_entry,
...     )
...     fragmenter.register_send(send)
...     reassembler = pylibschc.fragmenter.FragmenterReassembler(
...         device=config.devices[0],
...         post_timer_task=post_timer_task,
...         end_rx=end_rx,
...         remove_timer_entry=remove_timer_entry,
...     )
...     print("fragmenter.output ->", fragmenter.output(bit_array))
...     cmd = {}
...     while cmd.get("cmd") != "end_tx":
...         cmd = await asyncio.wait_for(fragmenter_queue.get(), timeout=2)
...         if cmd["cmd"] == "send":
...             print(
...                 "reassembler.input ->",
...                 await asyncized_input(reassembler, cmd["data"])
...             )
...     return await asyncio.wait_for(reassembly_buffer, timeout=5)
...
>>> asyncio.run(fragment_and_reassemble()) == bit_array.buffer
fragmenter.output -> FragmentationResult.SUCCESS
reassembler.input -> ReassemblyStatus.ONGOING
reassembler.input -> ReassemblyStatus.COMPLETED
True

While this example uses asyncio to parallelize timer calls, any method to establish concurrency can be used (see test for a threaded fragmenter/reassembler for an example using the threading module) as long as the access to libSCHC (including calls to timer tasks) is synchronized.

License

This code is published under the GNU General Public License Version 3 (GPLv3). Please keep in mind, that libSCHC is dual licensed for non-open source use. For more, have a look at the license information over at libSCHC.