Skip to content

Serial-IP

This example showcases the functionaly of a serial-to-IP converter. Any messages that come in to the serial port are sent over TCP to the remote server/client.

It consists of two files:

conftest.py - Implements a fixture that returns a SerialInterface() interface and a fixture that returns a TCPServer().

test_serial_ip.py - Contains a test that sends a message over serial to the DUT and receives the same message back from the DUT over TCP.

import logging
import pytest

from hil_sdk.interfaces.network.tcp import TCPServer
from hil_sdk.interfaces.serial.serial_interface import SerialInterface

logging.basicConfig(level=logging.INFO)

@pytest.fixture(scope="session")
def serial_interface_fixture():
    try:
        serial = SerialInterface("/dev/ttyUSB0", 115200)
        yield serial
    except Exception as e:
        logging.error(f"Failed to setup Serial interface: {e}")
        raise
    finally:
        serial.close()

@pytest.fixture(scope="session")
def tcp_server_fixture():
    try:
        server = TCPServer("0.0.0.0", 1234)
        yield server
    except Exception as e:
        logging.error(f"Failed to setup TCP server: {e}")
        raise
    finally:
        server.close()
import logging
import time

logging.basicConfig(level=logging.INFO)

def test_serial_ip(serial_interface_fixture, tcp_server_fixture):

    logging.info(f"Connecting to a TCP client...")
    tcp_server_fixture.connect()

    try:
        message_to_send_via_serial = b'ping'
        logging.info(f"Sending {message_to_send_via_serial.decode('utf-8')} via serial...")
        serial_interface_fixture.write(message_to_send_via_serial)
        time.sleep(1)

        message_received_via_tcp = tcp_server_fixture.receive()
        assert message_received_via_tcp == message_to_send_via_serial
        logging.info(f"Received {message_received_via_tcp.decode('utf-8')} via TCP")
        time.sleep(1)

        message_to_send_via_tcp = b'pong'
        MSG_TO_RECEIVE_STR_LEN = len(message_to_send_via_tcp)
        logging.info(f"Sending {message_to_send_via_tcp.decode('utf-8')} via TCP...")
        tcp_server_fixture.send(message_to_send_via_tcp)
        time.sleep(1)

        message_received_via_serial = serial_interface_fixture.read(MSG_TO_RECEIVE_STR_LEN)
        assert len(message_received_via_serial) == MSG_TO_RECEIVE_STR_LEN
        assert message_received_via_serial == message_to_send_via_tcp
        logging.info(f"Received {bytes.decode(message_received_via_serial).strip()} via serial")
    finally:
        serial_interface_fixture.flush_output()