API examples
Setting things up
To get the example code, clone the ekmdevice repository:
git clone https://github.com/ekmmetering/ekmdevice.git
Install the ekmdevice Python package. It is highly recommended you create a virtualenv.
cd ekmdevice
python -m venv venv
. venv/bin/activate
python -m pip install .
Example code is in the ekmdevice/examples directory:
cd examples
Connect one or more meters and iostacks to your computer using a USB-rs485 adapter.
Once the adapter is plugged in, determine the name of the serial port.
These articles may be helpful:
Meter examples
In these examples the serial port will be /dev/ttyUSB0 and the meter is a v4 Omnimeter with address 30001.
Note that meter version and address can be specified as a number or a string. So v4 can be 4, “4”, or “v4”, and address can be 30001, “30001”, or “000000030001”.
Example: Read A and B meter data
v4 meters have both A and B data which must be read
separately. The read_data()
function performs
both if include_b is True.
meter_read.py
import json
from ekmdevice import ekmmeter, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'meter') as sp:
data = ekmmeter.read_data(sp, 300001285, include_b=True)
print(json.dumps(data, indent=2))
If you have a heterogenous collection of devices connected to the RS-485 bus then it’s generally a good idea to end the read session by setting end_session=True. This will ensure the meter is no longer listening for command messages until the next read, which lessens the chance that serial traffic to/from other devices will confuse the meter:
data = ekmmeter.read_data(sp, 4, 300001285, include_b=True, end_session=True)
Run the code:
$ python meter_read.py
{
"Model": "1024",
"Firmware": "45",
"Meter_Address": "000000030001",
"kWh_Tot": 2692.0,
"Reactive_Energy_Tot": 1139.0,
"Rev_kWh_Tot": 0.0,
"kWh_Ln_1": 2693.0,
"kWh_Ln_2": 0.0,
"kWh_Ln_3": 0.0,
"Rev_kWh_Ln_1": 0.0,
"Rev_kWh_Ln_2": 0.0,
"Rev_kWh_Ln_3": 0.0,
"kWh_Rst": 0.0884,
"Rev_kWh_Rst": 0.0,
"RMS_Volts_Ln_1": 123.5,
"RMS_Volts_Ln_2": 0.0,
"RMS_Volts_Ln_3": 0.0,
"Amps_Ln_1": 4.0,
"Amps_Ln_2": 0.0,
"Amps_Ln_3": 0.0,
"RMS_Watts_Ln_1": 340,
"RMS_Watts_Ln_2": 0,
"RMS_Watts_Ln_3": 0,
"RMS_Watts_Tot": 340,
"Cos_Theta_Ln_1": "C087",
"Cos_Theta_Ln_2": "C000",
"Cos_Theta_Ln_3": "C000",
"Reactive_Pwr_Ln_1": 180,
"Reactive_Pwr_Ln_2": 0,
"Reactive_Pwr_Ln_3": 0,
"Reactive_Pwr_Tot": 180,
"Line_Freq": 60.06,
"Pulse_Cnt_1": 4851608,
"Pulse_Cnt_2": 0,
"Pulse_Cnt_3": 0,
"State_Inputs": 4,
"State_Watts_Dir": 1,
"State_Out": 1,
"kWh_Scale": 4,
"Meter_Time": "24052904112724",
"Request_Type": 1,
"kWh_Tariff_1": 148.0,
"kWh_Tariff_2": 481.0,
"kWh_Tariff_3": 175.0,
"kWh_Tariff_4": 1887.0,
"Rev_kWh_Tariff_1": 0.0,
"Rev_kWh_Tariff_2": 0.0,
"Rev_kWh_Tariff_3": 0.0,
"Rev_kWh_Tariff_4": 0.0,
"RMS_Watts_Max_Demand": 170.0,
"Max_Demand_Period": 4,
"Pulse_Ratio_1": 1,
"Pulse_Ratio_2": 1,
"Pulse_Ratio_3": 1,
"CT_Ratio": 2000,
"Max_Demand_Rst": 4,
"CF_Ratio": 5,
"Meter_Status_Code_A": "00",
"Meter_Status_Code_B": "01",
"Meter_Status_Code_C": "00"
}
Example: Set meter relay
This example will close relay 1 for one second.
meter_set_relay.py
from ekmdevice import ekmmeter, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'meter') as serial_port:
ekmmeter.start_cmd_session(serial_port, 300001285)
ekmmeter.set_relay(serial_port, ekmmeter.RELAY_1, ekmmeter.RELAY_CLOSED, 1)
Run the code:
python meter_set_relay.py
Note that meter commands require a read before sending a command message.
The start_cmd_session()
function will perform a read without
unpacking the data which uses slightly less resources than read_data()
.
If you would like to use the read data you can substitute read_data()
for start_cmd_session()
:
meter_set_relay2.py
import json
from ekmdevice import ekmmeter, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'meter') as serial_port:
data = ekmmeter.read_data(serial_port, 300001285)
ekmmeter.set_relay(serial_port, ekmmeter.RELAY_1, ekmmeter.RELAY_CLOSED, 1)
print(json.dumps(data, indent=2))
ioStack examples
In these examples the serial port will be /dev/ttyUSB0 and the ioStack will have the address 10001.
ioStacks use a different serial protocol than what meters use, so the serial port must be reconfigured if meters are used on the same serial port.
Example: Get ioStack status
Getting the status using get_status()
:
iostack_get_status.py
import json
from ekmdevice import ekmiostack, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'iostack') as sp:
data = ekmiostack.get_status(sp, 1)
print(json.dumps(data, indent=2))
Run the code:
$ python iostack_get_status.py
{
"hw_type": 133,
"model": 1,
"version_major": 1,
"version_minor": 2,
"lifetime_ms": 10139455050,
"chip_id": "0009ffffffffffff4e4553157009000e",
"rtc_period": 60,
"rtc_offset": 1,
"device_time": "2024-05-28T18:52:10"
}
Example: Read ioStack data
Reading data using the read_a()
API function.
iostack_read_a.py
import json
from ekmdevice import ekmiostack, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'iostack') as sp:
data = ekmiostack.read_a(sp, 1)
print(json.dumps(data, indent=2))
Run the code:
$ python iostack_read_a.py
{
"hw_type": 133,
"model": 1,
"version_major": 99,
"version_minor": 2,
"device_time": "2024-05-28T18:46:54",
"lifetime_ms": 10139138540,
"ai_count": 4,
"ai_values": [
2405,
1095,
905,
856
],
"ai_watch": [
0,
0,
0,
0
],
"di_count": 4,
"di_levels": 0,
"di_cwatch": [
0,
0,
0,
0
],
"di_lwatch": [
0,
0,
0,
0
],
"di_rwatch": [
0,
0,
0,
0
],
"do_count": 4,
"do_levels": 1,
"do_owner": [
16,
0,
0,
0
],
"ow_dio_count": 0,
"ow_dio": [],
"ow_thl_count": 4,
"ow_thl": [
{
"slot": 1,
"temp": 180,
"humidity": 255,
"lux": 65535
},
{
"slot": 17,
"temp": 186,
"humidity": 255,
"lux": 65535
},
{
"slot": 33,
"temp": 185,
"humidity": 255,
"lux": 65535
},
{
"slot": 49,
"temp": 185,
"humidity": 255,
"lux": 65535
}
]
}
Example: Read ioStack data A and B with EKM field names
Read data using read_data()
which performs both
read_a()
and read_b()
,
and translates data to a flat dictionary of EKM field names/values
that matches the EKM REST API.
EKM field name information for ioStacks can be found here.
iostack_read_data.py
import json
from ekmdevice import ekmiostack, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'iostack') as sp:
data = ekmiostack.read_data(sp, 1)
print(json.dumps(data, indent=2))
Run the code:
$ python iostack_read_data.py
{
"Firmware": "99.2",
"Hardware_Type": 133,
"Model": 1,
"Device_Time": "2024-05-28T18:35:05",
"State_Inputs": 0,
"State_Out": 1,
"Analog_In_1": 2406,
"Analog_In_2": 1249,
"Analog_In_3": 948,
"Analog_In_4": 836,
"Pulse_Hold_ms_1": 8733567548,
"Pulse_Hold_ms_2": 8733567548,
"Pulse_Hold_ms_3": 8733567548,
"Pulse_Hold_ms_4": 8733567548,
"Pulse_Hi_Prev_ms_1": 0,
"Pulse_Hi_Prev_ms_2": 0,
"Pulse_Hi_Prev_ms_3": 0,
"Pulse_Hi_Prev_ms_4": 0,
"Pulse_Lo_Prev_ms_1": 0,
"Pulse_Lo_Prev_ms_2": 0,
"Pulse_Lo_Prev_ms_3": 0,
"Pulse_Lo_Prev_ms_4": 0,
"Pulse_Cnt_Rst_1": 0,
"Pulse_Cnt_Rst_2": 0,
"Pulse_Cnt_Rst_3": 0,
"Pulse_Cnt_Rst_4": 0,
"Pulse_Cnt_1": 0,
"Pulse_Cnt_2": 0,
"Pulse_Cnt_3": 0,
"Pulse_Cnt_4": 0,
"Pulse_Hi_Total_sec_1": 0,
"Pulse_Hi_Total_sec_2": 0,
"Pulse_Hi_Total_sec_3": 0,
"Pulse_Hi_Total_sec_4": 0,
"Pulse_Lo_Total_sec_1": 10138429,
"Pulse_Lo_Total_sec_2": 10138429,
"Pulse_Lo_Total_sec_3": 10138429,
"Pulse_Lo_Total_sec_4": 10138429,
"OW_1_1_degC": 18.0,
"OW_1_1_Humidity": 255,
"OW_1_1_Lux": 65535,
"OW_2_1_degC": 18.6,
"OW_2_1_Humidity": 255,
"OW_2_1_Lux": 65535,
"OW_3_1_degC": 18.4,
"OW_3_1_Humidity": 255,
"OW_3_1_Lux": 65535,
"OW_4_1_degC": 18.5,
"OW_4_1_Humidity": 255,
"OW_4_1_Lux": 65535
}
Reading multiple devices and storing data
When multiple devices are connected to a single RS485 serial bus.
Read loop
This example program continuously reads a set of devices (meters/iostacks) and stores the data in a SQL database (SQLite in this case). You can specify the serial port and list of devices in a TOML configuration file.
To run the program you will need to have installed the ekmdevice package already. To exit the program press Ctrl-C.
python readloop.py
An example configuration examples/readloop.toml:
[readloop]
# Name of serial port
port = "/dev/ttyUSB0"
# Path of SQLite database file
database = "ekmdata.db"
# Read cycle interval in seconds
min_interval = 10
# List of meters as a list of inline tables:
# { address = "<meter-address>", version = "<meter-version>" }
# version is optional, default is "v4"
devices.meter = [
{address = "000000020870", version = "v3"},
{address = "000300001285"},
{address = "000300001317"},
{address = "000350008932"},
]
# List of iostacks as a list of inline tables:
# { address = "<meter-address>", version = "<meter-version>" }
# version is optional, default is 1.
devices.iostack = [
{address = "000000000001"},
{address = "000000000002"},
{address = "000000000003"},
]
Source code:
readloop.py
#!/usr/bin/env python3
"""Example EKM device data read loop.
.. code::
python readloop.py --help
"""
from __future__ import annotations
import argparse
import contextlib
import datetime
import json
import pathlib
import sqlite3
import sys
import time
try:
import tomllib # type: ignore [import-not-found]
except ImportError:
import tomli as tomllib # python<3.11
import serial
import ekmdevice
from ekmdevice import ekmiostack, ekmmeter, ekmserial
# ruff: noqa: T201
_SQL_TABLE = """
CREATE TABLE IF NOT EXISTS
{device}_data(
address TEXT,
timestamp UNSIGNED BIG INT,
data TEXT
)
"""
_SQL_INSERT_DATA = """
INSERT INTO {device}_data VALUES(
:address,
:timestamp,
:data
)
"""
def main() -> None:
"""Program to read time series data and store it in a SQL database."""
parser = argparse.ArgumentParser()
parser.add_argument(
'confpath',
type=pathlib.Path,
nargs='?',
default='readloop.toml',
help='Configuration file path',
)
options = parser.parse_args()
with options.confpath.open('rb') as f:
config = tomllib.load(f)['readloop']
port: str = config['port']
min_interval: int = config.get('min_interval', 0)
database: str = config.get('database', 'ekmdata.db')
device_info: dict = config['devices']
with sqlite3.connect(database) as sql_con:
# Create tables if necessary
with contextlib.closing(sql_con.cursor()) as curs:
for device_type in device_info:
curs.execute(_SQL_TABLE.format(device=device_type))
sql_con.commit()
try:
# Continuously read devices until ^C (quit)
read_loop(port, device_info, min_interval, sql_con)
except KeyboardInterrupt:
print('Stopped')
def read_loop(
port: str,
device_info: dict,
min_interval: int,
sql_con: sqlite3.Connection,
) -> None:
"""Continuously read all devices every *interval* seconds."""
while True:
t_start = time.time()
for device_type, devices in device_info.items():
# Open serial port configured for the device type
with ekmserial.create_serial_port(port, device_type) as sp:
for device in devices:
read_device(sp, device_type, device, sql_con)
# Sleep for remaining interval
time.sleep(max(0, min_interval - (time.time() - t_start)))
def read_device(
sp: serial.Serial,
device_type: str,
device: dict,
sql_con: sqlite3.Connection,
) -> None:
"""Read device and store timestamped data in SQL database."""
address = device['address']
# Timestamp in milliseconds
timestamp = int(
datetime.datetime.now(tz=datetime.timezone.utc).timestamp() * 1000
)
data: dict | None = None
print(f'Reading {device_type}[{address}]')
try:
if device_type == 'meter':
version = device.get('version')
data = ekmmeter.read_data(sp, address, version=version, include_b=True)
elif device_type == 'iostack':
data = ekmiostack.read_data(sp, address, include_b=True)
except ekmdevice.EKMDeviceError as e:
# Note error and continue to read devices
print(f'{device_type}[{address}]: {e}', file=sys.stderr)
if data:
# Store read data in SQL database
with contextlib.closing(sql_con.cursor()) as curs:
curs.execute(
_SQL_INSERT_DATA.format(device=device_type),
{
'address': address,
'timestamp': timestamp,
'data': json.dumps(data),
},
)
sql_con.commit()
if __name__ == '__main__':
main()
Getting stored data
If you have SQLite installed, start a SQLite session with the file path name of the database created by readloop.py:
$ sqlite3 ekmdata.db
SQLite version 3.37.2 2022-01-06 13:25:41
Enter ".help" for usage hints.
sqlite>
Set the output mode:
sqlite> .mode column
Try the following query to get the most recent value of the meter field kWh_Tot for meter 300001317 (use your own meter address):
sqlite> select datetime(timestamp/1000, 'unixepoch', 'localtime') as time, json_extract(data, '$.kWh_Tot') as kWh from meter_data where address = '000300001317' order by timestamp desc limit 1;
time kWh
------------------- ------
2024-06-02 14:06:26 1724.0
A simple web form
This is a very simple web server that will query the SQL database created by readloop.py and displays the last stored value of a device field (a meter “kWh_Tot” field by default).
To run:
cd examples
python readserv.py
Open a web browser and type this url in the address bar (or just click on the link): http://localhost:8000

What you should see when navigating to http://localhost:8000
Enter your device type, address, and field name in the displayed form. Then click on the Query button.

Example of what you should see after filling in meter address and clicking on “Query”.
Source code:
readserv.py
#!/usr/bin/env python3
"""Simple HTTP server showing field values for meter or iostack.
See: readloop.py
"""
from __future__ import annotations
import contextlib
import http.server
import sqlite3
import urllib.parse
import ekmdevice
# ruff: noqa: T201 # ignore print()
PORT = 8000
_HTML_BODY = """
<!DOCTYPE HTML>
<html lang="en">
<head><meta charset="utf-8"><title>EKM Device Data</title></head>
<body style="font-family: sans-serif; padding: 1rem 1rem;">
<h1>EKM Device Data</h1>
<form method="GET" style="margin-bottom: 3rem">
Device <input name="device" value="{device}"/>
Address <input name="address" value="{address}"/>
Field <input name="field" value="{field}"/>
<input type="submit" value="Query"/>
</form>
<h3>{field}</h3>
<table>
{result}
</table>
</body>
</html>
"""
_HTML_ROW = """
<tr><td>{time}</td><td style="width: 8rem; text-align: right">{value}</td></tr>
"""
# Note: SQLite does not allow placeholders for table names or functions
# so field and device values should be validated.
_SQL_QUERY = (
'SELECT datetime(timestamp/1000, \'unixepoch\', \'localtime\'), '
'json_extract(data, \'$.{field}\') FROM {device}_data '
'WHERE address = :address ORDER BY timestamp DESC LIMIT 10;'
)
class ReqHandler(http.server.BaseHTTPRequestHandler):
"""Simple HTTP request handler class."""
def do_GET(self) -> None: # noqa: N802 # invalid name
"""Handle HTTP GET request."""
q = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
try:
database = q.get('db', ['ekmdata.db'])[0]
device = q.get('device', ['meter'])[0]
field = q.get('field', ['kWh_Tot'])[0]
address = q.get('address', [''])[0]
if address:
address = ekmdevice.normalize_address_str(address)
result_html = get_content(database, device, address, field)
else:
result_html = f'Please specify {device} address.'
except (KeyError, IndexError):
result_html = 'Error: invalid query'
except (OSError, sqlite3.Error) as e:
result_html = f'Error: {e}'
body = _HTML_BODY.format(
device=device, address=address, field=field, result=result_html
)
# Write HTTP headers
self.send_response(http.HTTPStatus.OK)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.send_header('Content-Length', str(len(body)))
self.end_headers()
# Write HTML content
self.wfile.write(body.encode('utf-8', 'replace'))
def get_content(database: str, device: str, address: str, field: str) -> str:
"""Get last max ten rows of device data rendered as HTML list."""
with sqlite3.connect(database) as con, contextlib.closing(
con.cursor()
) as curs:
query = _SQL_QUERY.format(field=field, device=device)
curs.execute(query, {'address': address})
result = curs.fetchall()
if result and result[0][1] is not None:
html = [
_HTML_ROW.format(time=row[0], value=row[1]) for row in result
]
return '\n'.join(html)
raise sqlite3.Error('data not found')
def main() -> None:
"""Create and start simple HTTP server."""
httpd = http.server.ThreadingHTTPServer(('', PORT), ReqHandler)
print(f'Serving on port {PORT}')
try:
httpd.serve_forever()
except KeyboardInterrupt:
print('Stopped.')
if __name__ == '__main__':
main()