API examples

Setting things up

To get the example code, clone the ekmdevice repository:

git clone https://github.com/ekmmetering/ekmdevice.git

Install the ekmdevice Python package. It is highly recommended you create a virtualenv.

cd ekmdevice
python -m venv venv
. venv/bin/activate
python -m pip install .

Example code is in the ekmdevice/examples directory:

cd examples

Connect one or more meters and iostacks to your computer using a USB-rs485 adapter.

Once the adapter is plugged in, determine the name of the serial port.

These articles may be helpful:

Meter examples

In these examples the serial port will be /dev/ttyUSB0 and the meter is a v4 Omnimeter with address 30001.

Note that meter version and address can be specified as a number or a string. So v4 can be 4, “4”, or “v4”, and address can be 30001, “30001”, or “000000030001”.

Example: Read A and B meter data

v4 meters have both A and B data which must be read separately. The read_data() function performs both if include_b is True.

meter_read.py

import json

from ekmdevice import ekmmeter, ekmserial

with ekmserial.create_serial_port('/dev/ttyUSB0', 'meter') as sp:
    data = ekmmeter.read_data(sp, 300001285, include_b=True)
    print(json.dumps(data, indent=2))

If you have a heterogenous collection of devices connected to the RS-485 bus then it’s generally a good idea to end the read session by setting end_session=True. This will ensure the meter is no longer listening for command messages until the next read, which lessens the chance that serial traffic to/from other devices will confuse the meter:

data = ekmmeter.read_data(sp, 4, 300001285, include_b=True, end_session=True)

Run the code:

$ python meter_read.py
{
  "Model": "1024",
  "Firmware": "45",
  "Meter_Address": "000000030001",
  "kWh_Tot": 2692.0,
  "Reactive_Energy_Tot": 1139.0,
  "Rev_kWh_Tot": 0.0,
  "kWh_Ln_1": 2693.0,
  "kWh_Ln_2": 0.0,
  "kWh_Ln_3": 0.0,
  "Rev_kWh_Ln_1": 0.0,
  "Rev_kWh_Ln_2": 0.0,
  "Rev_kWh_Ln_3": 0.0,
  "kWh_Rst": 0.0884,
  "Rev_kWh_Rst": 0.0,
  "RMS_Volts_Ln_1": 123.5,
  "RMS_Volts_Ln_2": 0.0,
  "RMS_Volts_Ln_3": 0.0,
  "Amps_Ln_1": 4.0,
  "Amps_Ln_2": 0.0,
  "Amps_Ln_3": 0.0,
  "RMS_Watts_Ln_1": 340,
  "RMS_Watts_Ln_2": 0,
  "RMS_Watts_Ln_3": 0,
  "RMS_Watts_Tot": 340,
  "Cos_Theta_Ln_1": "C087",
  "Cos_Theta_Ln_2": "C000",
  "Cos_Theta_Ln_3": "C000",
  "Reactive_Pwr_Ln_1": 180,
  "Reactive_Pwr_Ln_2": 0,
  "Reactive_Pwr_Ln_3": 0,
  "Reactive_Pwr_Tot": 180,
  "Line_Freq": 60.06,
  "Pulse_Cnt_1": 4851608,
  "Pulse_Cnt_2": 0,
  "Pulse_Cnt_3": 0,
  "State_Inputs": 4,
  "State_Watts_Dir": 1,
  "State_Out": 1,
  "kWh_Scale": 4,
  "Meter_Time": "24052904112724",
  "Request_Type": 1,
  "kWh_Tariff_1": 148.0,
  "kWh_Tariff_2": 481.0,
  "kWh_Tariff_3": 175.0,
  "kWh_Tariff_4": 1887.0,
  "Rev_kWh_Tariff_1": 0.0,
  "Rev_kWh_Tariff_2": 0.0,
  "Rev_kWh_Tariff_3": 0.0,
  "Rev_kWh_Tariff_4": 0.0,
  "RMS_Watts_Max_Demand": 170.0,
  "Max_Demand_Period": 4,
  "Pulse_Ratio_1": 1,
  "Pulse_Ratio_2": 1,
  "Pulse_Ratio_3": 1,
  "CT_Ratio": 2000,
  "Max_Demand_Rst": 4,
  "CF_Ratio": 5,
  "Meter_Status_Code_A": "00",
  "Meter_Status_Code_B": "01",
  "Meter_Status_Code_C": "00"
}

Example: Set meter relay

This example will close relay 1 for one second.

meter_set_relay.py

from ekmdevice import ekmmeter, ekmserial

with ekmserial.create_serial_port('/dev/ttyUSB0', 'meter') as serial_port:
    ekmmeter.start_cmd_session(serial_port, 300001285)
    ekmmeter.set_relay(serial_port, ekmmeter.RELAY_1, ekmmeter.RELAY_CLOSED, 1)

Run the code:

python meter_set_relay.py

Note that meter commands require a read before sending a command message. The start_cmd_session() function will perform a read without unpacking the data which uses slightly less resources than read_data().

If you would like to use the read data you can substitute read_data() for start_cmd_session():

meter_set_relay2.py

import json

from ekmdevice import ekmmeter, ekmserial

with ekmserial.create_serial_port('/dev/ttyUSB0', 'meter') as serial_port:
    data = ekmmeter.read_data(serial_port, 300001285)
    ekmmeter.set_relay(serial_port, ekmmeter.RELAY_1, ekmmeter.RELAY_CLOSED, 1)
    print(json.dumps(data, indent=2))

ioStack examples

In these examples the serial port will be /dev/ttyUSB0 and the ioStack will have the address 10001.

ioStacks use a different serial protocol than what meters use, so the serial port must be reconfigured if meters are used on the same serial port.

Example: Get ioStack status

Getting the status using get_status():

iostack_get_status.py

import json

from ekmdevice import ekmiostack, ekmserial

with ekmserial.create_serial_port('/dev/ttyUSB0', 'iostack') as sp:
    data = ekmiostack.get_status(sp, 1)
    print(json.dumps(data, indent=2))

Run the code:

$ python iostack_get_status.py
{
  "hw_type": 133,
  "model": 1,
  "version_major": 1,
  "version_minor": 2,
  "lifetime_ms": 10139455050,
  "chip_id": "0009ffffffffffff4e4553157009000e",
  "rtc_period": 60,
  "rtc_offset": 1,
  "device_time": "2024-05-28T18:52:10"
}

Example: Read ioStack data

Reading data using the read_a() API function.

iostack_read_a.py

import json

from ekmdevice import ekmiostack, ekmserial

with ekmserial.create_serial_port('/dev/ttyUSB0', 'iostack') as sp:
    data = ekmiostack.read_a(sp, 1)
    print(json.dumps(data, indent=2))

Run the code:

$ python iostack_read_a.py
{
  "hw_type": 133,
  "model": 1,
  "version_major": 99,
  "version_minor": 2,
  "device_time": "2024-05-28T18:46:54",
  "lifetime_ms": 10139138540,
  "ai_count": 4,
  "ai_values": [
    2405,
    1095,
    905,
    856
  ],
  "ai_watch": [
    0,
    0,
    0,
    0
  ],
  "di_count": 4,
  "di_levels": 0,
  "di_cwatch": [
    0,
    0,
    0,
    0
  ],
  "di_lwatch": [
    0,
    0,
    0,
    0
  ],
  "di_rwatch": [
    0,
    0,
    0,
    0
  ],
  "do_count": 4,
  "do_levels": 1,
  "do_owner": [
    16,
    0,
    0,
    0
  ],
  "ow_dio_count": 0,
  "ow_dio": [],
  "ow_thl_count": 4,
  "ow_thl": [
    {
      "slot": 1,
      "temp": 180,
      "humidity": 255,
      "lux": 65535
    },
    {
      "slot": 17,
      "temp": 186,
      "humidity": 255,
      "lux": 65535
    },
    {
      "slot": 33,
      "temp": 185,
      "humidity": 255,
      "lux": 65535
    },
    {
      "slot": 49,
      "temp": 185,
      "humidity": 255,
      "lux": 65535
    }
  ]
}

Example: Read ioStack data A and B with EKM field names

Read data using read_data() which performs both read_a() and read_b(), and translates data to a flat dictionary of EKM field names/values that matches the EKM REST API.

EKM field name information for ioStacks can be found here.

iostack_read_data.py

import json

from ekmdevice import ekmiostack, ekmserial

with ekmserial.create_serial_port('/dev/ttyUSB0', 'iostack') as sp:
    data = ekmiostack.read_data(sp, 1)
    print(json.dumps(data, indent=2))

Run the code:

$ python iostack_read_data.py
{
  "Firmware": "99.2",
  "Hardware_Type": 133,
  "Model": 1,
  "Device_Time": "2024-05-28T18:35:05",
  "State_Inputs": 0,
  "State_Out": 1,
  "Analog_In_1": 2406,
  "Analog_In_2": 1249,
  "Analog_In_3": 948,
  "Analog_In_4": 836,
  "Pulse_Hold_ms_1": 8733567548,
  "Pulse_Hold_ms_2": 8733567548,
  "Pulse_Hold_ms_3": 8733567548,
  "Pulse_Hold_ms_4": 8733567548,
  "Pulse_Hi_Prev_ms_1": 0,
  "Pulse_Hi_Prev_ms_2": 0,
  "Pulse_Hi_Prev_ms_3": 0,
  "Pulse_Hi_Prev_ms_4": 0,
  "Pulse_Lo_Prev_ms_1": 0,
  "Pulse_Lo_Prev_ms_2": 0,
  "Pulse_Lo_Prev_ms_3": 0,
  "Pulse_Lo_Prev_ms_4": 0,
  "Pulse_Cnt_Rst_1": 0,
  "Pulse_Cnt_Rst_2": 0,
  "Pulse_Cnt_Rst_3": 0,
  "Pulse_Cnt_Rst_4": 0,
  "Pulse_Cnt_1": 0,
  "Pulse_Cnt_2": 0,
  "Pulse_Cnt_3": 0,
  "Pulse_Cnt_4": 0,
  "Pulse_Hi_Total_sec_1": 0,
  "Pulse_Hi_Total_sec_2": 0,
  "Pulse_Hi_Total_sec_3": 0,
  "Pulse_Hi_Total_sec_4": 0,
  "Pulse_Lo_Total_sec_1": 10138429,
  "Pulse_Lo_Total_sec_2": 10138429,
  "Pulse_Lo_Total_sec_3": 10138429,
  "Pulse_Lo_Total_sec_4": 10138429,
  "OW_1_1_degC": 18.0,
  "OW_1_1_Humidity": 255,
  "OW_1_1_Lux": 65535,
  "OW_2_1_degC": 18.6,
  "OW_2_1_Humidity": 255,
  "OW_2_1_Lux": 65535,
  "OW_3_1_degC": 18.4,
  "OW_3_1_Humidity": 255,
  "OW_3_1_Lux": 65535,
  "OW_4_1_degC": 18.5,
  "OW_4_1_Humidity": 255,
  "OW_4_1_Lux": 65535
}

Reading multiple devices and storing data

When multiple devices are connected to a single RS485 serial bus.

Read loop

This example program continuously reads a set of devices (meters/iostacks) and stores the data in a SQL database (SQLite in this case). You can specify the serial port and list of devices in a TOML configuration file.

To run the program you will need to have installed the ekmdevice package already. To exit the program press Ctrl-C.

python readloop.py

An example configuration examples/readloop.toml:

[readloop]

# Name of serial port
port = "/dev/ttyUSB0"

# Path of SQLite database file
database = "ekmdata.db"

# Read cycle interval in seconds
min_interval = 10

# List of meters as a list of inline tables:
# { address = "<meter-address>", version = "<meter-version>" }
# version is optional, default is "v4"
devices.meter = [
    {address = "000000020870", version = "v3"},
    {address = "000300001285"},
    {address = "000300001317"},
    {address = "000350008932"},
]

# List of iostacks as a list of inline tables:
# { address = "<meter-address>", version = "<meter-version>" }
# version is optional, default is 1.
devices.iostack = [
    {address = "000000000001"},
    {address = "000000000002"},
    {address = "000000000003"},
]

Source code:

readloop.py

#!/usr/bin/env python3
"""Example EKM device data read loop.

.. code::

        python readloop.py --help
"""

from __future__ import annotations

import argparse
import contextlib
import datetime
import json
import pathlib
import sqlite3
import sys
import time

try:
    import tomllib  # type: ignore [import-not-found]
except ImportError:
    import tomli as tomllib  # python<3.11

import ekmdevice
import serial
from ekmdevice import ekmiostack, ekmmeter, ekmserial

# ruff: noqa: T201

_SQL_TABLE = """
CREATE TABLE IF NOT EXISTS
{device}_data(
    address TEXT,
    timestamp UNSIGNED BIG INT,
    data TEXT
)
"""

_SQL_INSERT_DATA = """
INSERT INTO {device}_data VALUES(
    :address,
    :timestamp,
    :data
)
"""


def main() -> None:
    """Program to read time series data and store it in a SQL database."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        'confpath',
        type=pathlib.Path,
        nargs='?',
        default='readloop.toml',
        help='Configuration file path',
    )
    options = parser.parse_args()
    with options.confpath.open('rb') as f:
        config = tomllib.load(f)['readloop']

    port: str = config['port']
    min_interval: int = config.get('min_interval', 0)
    database: str = config.get('database', 'ekmdata.db')
    device_info: dict = config['devices']

    with sqlite3.connect(database) as sql_con:
        # Create tables if necessary
        with contextlib.closing(sql_con.cursor()) as curs:
            for device_type in device_info:
                curs.execute(_SQL_TABLE.format(device=device_type))
            sql_con.commit()

        try:
            # Continuously read devices until ^C (quit)
            read_loop(port, device_info, min_interval, sql_con)
        except KeyboardInterrupt:
            print('Stopped')


def read_loop(
    port: str,
    device_info: dict,
    min_interval: int,
    sql_con: sqlite3.Connection,
) -> None:
    """Continuously read all devices every *interval* seconds."""
    while True:
        t_start = time.time()
        for device_type, devices in device_info.items():
            # Open serial port configured for the device type
            with ekmserial.create_serial_port(port, device_type) as sp:
                for device in devices:
                    read_device(sp, device_type, device, sql_con)
        # Sleep for remaining interval
        time.sleep(max(0, min_interval - (time.time() - t_start)))


def read_device(
    sp: serial.Serial,
    device_type: str,
    device: dict,
    sql_con: sqlite3.Connection,
) -> None:
    """Read device and store timestamped data in SQL database."""
    address = device['address']
    # Timestamp in milliseconds
    timestamp = int(
        datetime.datetime.now(tz=datetime.timezone.utc).timestamp() * 1000
    )
    data: dict | None = None
    print(f'Reading {device_type}[{address}]')
    try:
        if device_type == 'meter':
            version = device.get('version')
            data = ekmmeter.read_data(sp, address, version=version, include_b=True)
        elif device_type == 'iostack':
            data = ekmiostack.read_data(sp, address, include_b=True)
    except ekmdevice.EKMDeviceError as e:
        # Note error and continue to read devices
        print(f'{device_type}[{address}]: {e}', file=sys.stderr)

    if data:
        # Store read data in SQL database
        with contextlib.closing(sql_con.cursor()) as curs:
            curs.execute(
                _SQL_INSERT_DATA.format(device=device_type),
                {
                    'address': address,
                    'timestamp': timestamp,
                    'data': json.dumps(data),
                },
            )
            sql_con.commit()


if __name__ == '__main__':
    main()

Getting stored data

If you have SQLite installed, start a SQLite session with the file path name of the database created by readloop.py:

$ sqlite3 ekmdata.db
SQLite version 3.37.2 2022-01-06 13:25:41
Enter ".help" for usage hints.
sqlite>

Set the output mode:

sqlite> .mode column

Try the following query to get the most recent value of the meter field kWh_Tot for meter 300001317 (use your own meter address):

sqlite> select datetime(timestamp/1000, 'unixepoch', 'localtime') as time, json_extract(data, '$.kWh_Tot') as kWh from meter_data where address = '000300001317' order by timestamp desc limit 1;
time                 kWh
-------------------  ------
2024-06-02 14:06:26  1724.0

A simple web form

This is a very simple web server that will query the SQL database created by readloop.py and displays the last stored value of a device field (a meter “kWh_Tot” field by default).

To run:

cd examples
python readserv.py

Open a web browser and type this url in the address bar (or just click on the link): http://localhost:8000

Screenshot of browser showing example form.

What you should see when navigating to http://localhost:8000

Enter your device type, address, and field name in the displayed form. Then click on the Query button.

Screenshot of browser showing example form and results.

Example of what you should see after filling in meter address and clicking on “Query”.

Source code:

readserv.py

#!/usr/bin/env python3
"""Simple HTTP server showing field values for meter or iostack.

See: readloop.py
"""
from __future__ import annotations

import contextlib
import http.server
import sqlite3
import urllib.parse

import ekmdevice

# ruff: noqa: T201 # ignore print()

PORT = 8000

_HTML_BODY = """
<!DOCTYPE HTML>
<html lang="en">
  <head><meta charset="utf-8"><title>EKM Device Data</title></head>
  <body style="font-family: sans-serif; padding: 1rem 1rem;">
    <h1>EKM Device Data</h1>
    <form method="GET" style="margin-bottom: 3rem">
      Device <input name="device" value="{device}"/>
      Address <input name="address" value="{address}"/>
      Field <input name="field" value="{field}"/>
      <input type="submit" value="Query"/>
    </form>
    <h3>{field}</h3>
    <table>
      {result}
    </table>
  </body>
</html>
"""

_HTML_ROW = """
<tr><td>{time}</td><td style="width: 8rem; text-align: right">{value}</td></tr>
"""

# Note: SQLite does not allow placeholders for table names or functions
# so field and device values should be validated.
_SQL_QUERY = (
    'SELECT datetime(timestamp/1000, \'unixepoch\', \'localtime\'), '
    'json_extract(data, \'$.{field}\') FROM {device}_data '
    'WHERE address = :address ORDER BY timestamp DESC LIMIT 10;'
)


class ReqHandler(http.server.BaseHTTPRequestHandler):
    """Simple HTTP request handler class."""

    def do_GET(self) -> None:  # noqa: N802  # invalid name
        """Handle HTTP GET request."""
        q = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
        try:
            database = q.get('db', ['ekmdata.db'])[0]
            device = q.get('device', ['meter'])[0]
            field = q.get('field', ['kWh_Tot'])[0]
            address = q.get('address', [''])[0]
            if address:
                address = ekmdevice.normalize_address_str(address)
                result_html = get_content(database, device, address, field)
            else:
                result_html = f'Please specify {device} address.'
        except (KeyError, IndexError):
            result_html = 'Error: invalid query'
        except (OSError, sqlite3.Error) as e:
            result_html = f'Error: {e}'

        body = _HTML_BODY.format(
            device=device, address=address, field=field, result=result_html
        )

        # Write HTTP headers
        self.send_response(http.HTTPStatus.OK)
        self.send_header('Content-type', 'text/html; charset=utf-8')
        self.send_header('Content-Length', str(len(body)))
        self.end_headers()

        # Write HTML content
        self.wfile.write(body.encode('utf-8', 'replace'))


def get_content(database: str, device: str, address: str, field: str) -> str:
    """Get last max ten rows of device data rendered as HTML list."""
    with sqlite3.connect(database) as con, contextlib.closing(
        con.cursor()
    ) as curs:
        query = _SQL_QUERY.format(field=field, device=device)
        curs.execute(query, {'address': address})
        result = curs.fetchall()
        if result and result[0][1] is not None:
            html = [
                _HTML_ROW.format(time=row[0], value=row[1]) for row in result
            ]
            return '\n'.join(html)
    raise sqlite3.Error('data not found')


def main() -> None:
    """Create and start simple HTTP server."""
    httpd = http.server.ThreadingHTTPServer(('', PORT), ReqHandler)
    print(f'Serving on port {PORT}')
    try:
        httpd.serve_forever()
    except KeyboardInterrupt:
        print('Stopped.')


if __name__ == '__main__':
    main()