API examples
Setting things up
To get the example code, clone the ekmdevice repository:
git clone https://github.com/ekmmetering/ekmdevice.git
Install the ekmdevice Python package. It is highly recommended you create a virtualenv.
cd ekmdevice
python -m venv venv
. venv/bin/activate
python -m pip install .
Example code is in the ekmdevice/examples directory:
cd examples
Connect one or more meters and iostacks to your computer using a USB-rs485 adapter.
Once the adapter is plugged in, determine the name of the serial port.
These articles may be helpful:
Meter examples
In these examples the serial port will be /dev/ttyUSB0 and the meter is a v4 Omnimeter with address 30001.
Note that meter version and address can be specified as a number or a string. So v4 can be 4, “4”, or “v4”, and address can be 30001, “30001”, or “000000030001”.
Example: Read A and B meter data
v4 meters have both A and B data which must be read
separately. The read_data()
function performs
both if include_b is True.
meter_read.py
import json
from ekmdevice import ekmmeter, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'meter') as sp:
data = ekmmeter.read_data(sp, 300001285, include_b=True)
print(json.dumps(data, indent=2))
If you have a heterogenous collection of devices connected to the RS-485 bus then it’s generally a good idea to end the read session by setting end_session=True. This will ensure the meter is no longer listening for command messages until the next read, which lessens the chance that serial traffic to/from other devices will confuse the meter:
data = ekmmeter.read_data(sp, 4, 300001285, include_b=True, end_session=True)
Run the code:
$ python meter_read.py
{
"Model": "1024",
"Firmware": "45",
"Meter_Address": "000000030001",
"kWh_Tot": 2692.0,
"Reactive_Energy_Tot": 1139.0,
"Rev_kWh_Tot": 0.0,
"kWh_Ln_1": 2693.0,
"kWh_Ln_2": 0.0,
"kWh_Ln_3": 0.0,
"Rev_kWh_Ln_1": 0.0,
"Rev_kWh_Ln_2": 0.0,
"Rev_kWh_Ln_3": 0.0,
"kWh_Rst": 0.0884,
"Rev_kWh_Rst": 0.0,
"RMS_Volts_Ln_1": 123.5,
"RMS_Volts_Ln_2": 0.0,
"RMS_Volts_Ln_3": 0.0,
"Amps_Ln_1": 4.0,
"Amps_Ln_2": 0.0,
"Amps_Ln_3": 0.0,
"RMS_Watts_Ln_1": 340,
"RMS_Watts_Ln_2": 0,
"RMS_Watts_Ln_3": 0,
"RMS_Watts_Tot": 340,
"Cos_Theta_Ln_1": "C087",
"Cos_Theta_Ln_2": "C000",
"Cos_Theta_Ln_3": "C000",
"Reactive_Pwr_Ln_1": 180,
"Reactive_Pwr_Ln_2": 0,
"Reactive_Pwr_Ln_3": 0,
"Reactive_Pwr_Tot": 180,
"Line_Freq": 60.06,
"Pulse_Cnt_1": 4851608,
"Pulse_Cnt_2": 0,
"Pulse_Cnt_3": 0,
"State_Inputs": 4,
"State_Watts_Dir": 1,
"State_Out": 1,
"kWh_Scale": 4,
"Meter_Time": "24052904112724",
"Request_Type": 1,
"kWh_Tariff_1": 148.0,
"kWh_Tariff_2": 481.0,
"kWh_Tariff_3": 175.0,
"kWh_Tariff_4": 1887.0,
"Rev_kWh_Tariff_1": 0.0,
"Rev_kWh_Tariff_2": 0.0,
"Rev_kWh_Tariff_3": 0.0,
"Rev_kWh_Tariff_4": 0.0,
"RMS_Watts_Max_Demand": 170.0,
"Max_Demand_Period": 4,
"Pulse_Ratio_1": 1,
"Pulse_Ratio_2": 1,
"Pulse_Ratio_3": 1,
"CT_Ratio": 2000,
"Max_Demand_Rst": 4,
"CF_Ratio": 5,
"Meter_Status_Code_A": "00",
"Meter_Status_Code_B": "01",
"Meter_Status_Code_C": "00"
}
Example: Set meter relay
This example will close relay 1 for one second.
meter_set_relay.py
from ekmdevice import ekmmeter, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'meter') as serial_port:
ekmmeter.start_cmd_session(serial_port, 300001285)
ekmmeter.set_relay(serial_port, ekmmeter.RELAY_1, ekmmeter.RELAY_CLOSED, 1)
Run the code:
python meter_set_relay.py
Note that meter commands require a read before sending a command message.
The start_cmd_session()
function will perform a read without
unpacking the data which uses slightly less resources than read_data()
.
If you would like to use the read data you can substitute read_data()
for start_cmd_session()
:
meter_set_relay2.py
import json
from ekmdevice import ekmmeter, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'meter') as serial_port:
data = ekmmeter.read_data(serial_port, 300001285)
ekmmeter.set_relay(serial_port, ekmmeter.RELAY_1, ekmmeter.RELAY_CLOSED, 1)
print(json.dumps(data, indent=2))
ioStack examples
In these examples the serial port will be /dev/ttyUSB0 and the ioStack will have the address 10001.
ioStacks use a different serial protocol than what meters use, so the serial port must be reconfigured if meters are used on the same serial port.
Example: Get ioStack status
Getting the status using get_status()
:
iostack_get_status.py
import json
from ekmdevice import ekmiostack, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'iostack') as sp:
data = ekmiostack.get_status(sp, 1)
print(json.dumps(data, indent=2))
Run the code:
$ python iostack_get_status.py
{
"hw_type": 133,
"model": 1,
"version_major": 1,
"version_minor": 2,
"lifetime_ms": 10139455050,
"chip_id": "0009ffffffffffff4e4553157009000e",
"rtc_period": 60,
"rtc_offset": 1,
"device_time": "2024-05-28T18:52:10"
}
Example: Read ioStack data
Reading data using the read_a()
API function.
iostack_read_a.py
import json
from ekmdevice import ekmiostack, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'iostack') as sp:
data = ekmiostack.read_a(sp, 1)
print(json.dumps(data, indent=2))
Run the code:
$ python iostack_read_a.py
{
"hw_type": 133,
"model": 1,
"version_major": 99,
"version_minor": 2,
"device_time": "2024-05-28T18:46:54",
"lifetime_ms": 10139138540,
"ai_count": 4,
"ai_values": [
2405,
1095,
905,
856
],
"ai_watch": [
0,
0,
0,
0
],
"di_count": 4,
"di_levels": 0,
"di_cwatch": [
0,
0,
0,
0
],
"di_lwatch": [
0,
0,
0,
0
],
"di_rwatch": [
0,
0,
0,
0
],
"do_count": 4,
"do_levels": 1,
"do_owner": [
16,
0,
0,
0
],
"ow_dio_count": 0,
"ow_dio": [],
"ow_thl_count": 4,
"ow_thl": [
{
"slot": 1,
"temp": 180,
"humidity": 255,
"lux": 65535
},
{
"slot": 17,
"temp": 186,
"humidity": 255,
"lux": 65535
},
{
"slot": 33,
"temp": 185,
"humidity": 255,
"lux": 65535
},
{
"slot": 49,
"temp": 185,
"humidity": 255,
"lux": 65535
}
]
}
Example: Read ioStack data A and B with EKM field names
Read data using read_data()
which performs both
read_a()
and read_b()
,
and translates data to a flat dictionary of EKM field names/values
that matches the EKM REST API.
EKM field name information for ioStacks can be found here.
iostack_read_data.py
import json
from ekmdevice import ekmiostack, ekmserial
with ekmserial.create_serial_port('/dev/ttyUSB0', 'iostack') as sp:
data = ekmiostack.read_data(sp, 1)
print(json.dumps(data, indent=2))
Run the code:
$ python iostack_read_data.py
{
"Firmware": "99.2",
"Hardware_Type": 133,
"Model": 1,
"Device_Time": "2024-05-28T18:35:05",
"State_Inputs": 0,
"State_Out": 1,
"Analog_In_1": 2406,
"Analog_In_2": 1249,
"Analog_In_3": 948,
"Analog_In_4": 836,
"Pulse_Hold_ms_1": 8733567548,
"Pulse_Hold_ms_2": 8733567548,
"Pulse_Hold_ms_3": 8733567548,
"Pulse_Hold_ms_4": 8733567548,
"Pulse_Hi_Prev_ms_1": 0,
"Pulse_Hi_Prev_ms_2": 0,
"Pulse_Hi_Prev_ms_3": 0,
"Pulse_Hi_Prev_ms_4": 0,
"Pulse_Lo_Prev_ms_1": 0,
"Pulse_Lo_Prev_ms_2": 0,
"Pulse_Lo_Prev_ms_3": 0,
"Pulse_Lo_Prev_ms_4": 0,
"Pulse_Cnt_Rst_1": 0,
"Pulse_Cnt_Rst_2": 0,
"Pulse_Cnt_Rst_3": 0,
"Pulse_Cnt_Rst_4": 0,
"Pulse_Cnt_1": 0,
"Pulse_Cnt_2": 0,
"Pulse_Cnt_3": 0,
"Pulse_Cnt_4": 0,
"Pulse_Hi_Total_sec_1": 0,
"Pulse_Hi_Total_sec_2": 0,
"Pulse_Hi_Total_sec_3": 0,
"Pulse_Hi_Total_sec_4": 0,
"Pulse_Lo_Total_sec_1": 10138429,
"Pulse_Lo_Total_sec_2": 10138429,
"Pulse_Lo_Total_sec_3": 10138429,
"Pulse_Lo_Total_sec_4": 10138429,
"OW_1_1_degC": 18.0,
"OW_1_1_Humidity": 255,
"OW_1_1_Lux": 65535,
"OW_2_1_degC": 18.6,
"OW_2_1_Humidity": 255,
"OW_2_1_Lux": 65535,
"OW_3_1_degC": 18.4,
"OW_3_1_Humidity": 255,
"OW_3_1_Lux": 65535,
"OW_4_1_degC": 18.5,
"OW_4_1_Humidity": 255,
"OW_4_1_Lux": 65535
}
Reading multiple devices and storing data
When multiple devices are connected to a single RS485 serial bus.
Read loop
This example program continuously reads a set of devices (meters/iostacks) and stores the data in a SQL database (SQLite in this case). You can specify the serial port and list of devices in a TOML configuration file.
To run the program you will need to have installed the ekmdevice package already. To exit the program press Ctrl-C.
python readloop.py
An example configuration examples/readloop.toml:
[readloop]
# Name of serial port
port = "/dev/ttyUSB0"
# Path of SQLite database file
database = "ekmdata.db"
# Read cycle interval in seconds
min_interval = 10
# List of meters as a list of inline tables:
# { address = "<meter-address>", version = "<meter-version>" }
# version is optional, default is "v4"
devices.meter = [
{address = "000000020870", version = "v3"},
{address = "000300001285"},
{address = "000300001317"},
{address = "000350008932"},
]
# List of iostacks as a list of inline tables:
# { address = "<meter-address>", version = "<meter-version>" }
# version is optional, default is 1.
devices.iostack = [
{address = "000000000001"},
{address = "000000000002"},
{address = "000000000003"},
]
Source code:
readloop.py
#!/usr/bin/env python3
"""Example EKM device data read loop.
.. code::
python readloop.py --help
"""
from __future__ import annotations
import argparse
import contextlib
import datetime
import json
import pathlib
import sqlite3
import sys
import time
try:
import tomllib # type: ignore [import-not-found]
except ImportError:
import tomli as tomllib # python<3.11
import ekmdevice
import serial
from ekmdevice import ekmiostack, ekmmeter, ekmserial
# ruff: noqa: T201
_SQL_TABLE = """
CREATE TABLE IF NOT EXISTS
{device}_data(
address TEXT,
timestamp UNSIGNED BIG INT,
data TEXT
)
"""
_SQL_INSERT_DATA = """
INSERT INTO {device}_data VALUES(
:address,
:timestamp,
:data
)
"""
def main() -> None:
"""Program to read time series data and store it in a SQL database."""
parser = argparse.ArgumentParser()
parser.add_argument(
'confpath',
type=pathlib.Path,
nargs='?',
default='readloop.toml',
help='Configuration file path',
)
options = parser.parse_args()
with options.confpath.open('rb') as f:
config = tomllib.load(f)['readloop']
port: str = config['port']
min_interval: int = config.get('min_interval', 0)
database: str = config.get('database', 'ekmdata.db')
device_info: dict = config['devices']
with sqlite3.connect(database) as sql_con:
# Create tables if necessary
with contextlib.closing(sql_con.cursor()) as curs:
for device_type in device_info:
curs.execute(_SQL_TABLE.format(device=device_type))
sql_con.commit()
try:
# Continuously read devices until ^C (quit)
read_loop(port, device_info, min_interval, sql_con)
except KeyboardInterrupt:
print('Stopped')
def read_loop(
port: str,
device_info: dict,
min_interval: int,
sql_con: sqlite3.Connection,
) -> None:
"""Continuously read all devices every *interval* seconds."""
while True:
t_start = time.time()
for device_type, devices in device_info.items():
# Open serial port configured for the device type
with ekmserial.create_serial_port(port, device_type) as sp:
for device in devices:
read_device(sp, device_type, device, sql_con)
# Sleep for remaining interval
time.sleep(max(0, min_interval - (time.time() - t_start)))
def read_device(
sp: serial.Serial,
device_type: str,
device: dict,
sql_con: sqlite3.Connection,
) -> None:
"""Read device and store timestamped data in SQL database."""
address = device['address']
# Timestamp in milliseconds
timestamp = int(
datetime.datetime.now(tz=datetime.timezone.utc).timestamp() * 1000
)
data: dict | None = None
print(f'Reading {device_type}[{address}]')
try:
if device_type == 'meter':
version = device.get('version')
data = ekmmeter.read_data(sp, address, version=version, include_b=True)
elif device_type == 'iostack':
data = ekmiostack.read_data(sp, address, include_b=True)
except ekmdevice.EKMDeviceError as e:
# Note error and continue to read devices
print(f'{device_type}[{address}]: {e}', file=sys.stderr)
if data:
# Store read data in SQL database
with contextlib.closing(sql_con.cursor()) as curs:
curs.execute(
_SQL_INSERT_DATA.format(device=device_type),
{
'address': address,
'timestamp': timestamp,
'data': json.dumps(data),
},
)
sql_con.commit()
if __name__ == '__main__':
main()
Getting stored data
If you have SQLite installed, start a SQLite session with the file path name of the database created by readloop.py:
$ sqlite3 ekmdata.db
SQLite version 3.37.2 2022-01-06 13:25:41
Enter ".help" for usage hints.
sqlite>
Set the output mode:
sqlite> .mode column
Try the following query to get the most recent value of the meter field kWh_Tot for meter 300001317 (use your own meter address):
sqlite> select datetime(timestamp/1000, 'unixepoch', 'localtime') as time, json_extract(data, '$.kWh_Tot') as kWh from meter_data where address = '000300001317' order by timestamp desc limit 1;
time kWh
------------------- ------
2024-06-02 14:06:26 1724.0
A simple web form
This is a very simple web server that will query the SQL database created by readloop.py and displays the last stored value of a device field (a meter “kWh_Tot” field by default).
To run:
cd examples
python readserv.py
Open a web browser and type this url in the address bar (or just click on the link): http://localhost:8000
Enter your device type, address, and field name in the displayed form. Then click on the Query button.
Source code:
readserv.py
#!/usr/bin/env python3
"""Simple HTTP server showing field values for meter or iostack.
See: readloop.py
"""
from __future__ import annotations
import contextlib
import http.server
import sqlite3
import urllib.parse
import ekmdevice
# ruff: noqa: T201 # ignore print()
PORT = 8000
_HTML_BODY = """
<!DOCTYPE HTML>
<html lang="en">
<head><meta charset="utf-8"><title>EKM Device Data</title></head>
<body style="font-family: sans-serif; padding: 1rem 1rem;">
<h1>EKM Device Data</h1>
<form method="GET" style="margin-bottom: 3rem">
Device <input name="device" value="{device}"/>
Address <input name="address" value="{address}"/>
Field <input name="field" value="{field}"/>
<input type="submit" value="Query"/>
</form>
<h3>{field}</h3>
<table>
{result}
</table>
</body>
</html>
"""
_HTML_ROW = """
<tr><td>{time}</td><td style="width: 8rem; text-align: right">{value}</td></tr>
"""
# Note: SQLite does not allow placeholders for table names or functions
# so field and device values should be validated.
_SQL_QUERY = (
'SELECT datetime(timestamp/1000, \'unixepoch\', \'localtime\'), '
'json_extract(data, \'$.{field}\') FROM {device}_data '
'WHERE address = :address ORDER BY timestamp DESC LIMIT 10;'
)
class ReqHandler(http.server.BaseHTTPRequestHandler):
"""Simple HTTP request handler class."""
def do_GET(self) -> None: # noqa: N802 # invalid name
"""Handle HTTP GET request."""
q = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
try:
database = q.get('db', ['ekmdata.db'])[0]
device = q.get('device', ['meter'])[0]
field = q.get('field', ['kWh_Tot'])[0]
address = q.get('address', [''])[0]
if address:
address = ekmdevice.normalize_address_str(address)
result_html = get_content(database, device, address, field)
else:
result_html = f'Please specify {device} address.'
except (KeyError, IndexError):
result_html = 'Error: invalid query'
except (OSError, sqlite3.Error) as e:
result_html = f'Error: {e}'
body = _HTML_BODY.format(
device=device, address=address, field=field, result=result_html
)
# Write HTTP headers
self.send_response(http.HTTPStatus.OK)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.send_header('Content-Length', str(len(body)))
self.end_headers()
# Write HTML content
self.wfile.write(body.encode('utf-8', 'replace'))
def get_content(database: str, device: str, address: str, field: str) -> str:
"""Get last max ten rows of device data rendered as HTML list."""
with sqlite3.connect(database) as con, contextlib.closing(
con.cursor()
) as curs:
query = _SQL_QUERY.format(field=field, device=device)
curs.execute(query, {'address': address})
result = curs.fetchall()
if result and result[0][1] is not None:
html = [
_HTML_ROW.format(time=row[0], value=row[1]) for row in result
]
return '\n'.join(html)
raise sqlite3.Error('data not found')
def main() -> None:
"""Create and start simple HTTP server."""
httpd = http.server.ThreadingHTTPServer(('', PORT), ReqHandler)
print(f'Serving on port {PORT}')
try:
httpd.serve_forever()
except KeyboardInterrupt:
print('Stopped.')
if __name__ == '__main__':
main()