EDS to RJN data pipeline
Date: 2025 04-April 25
Query 0:
I have a ProjectManager class, for directory management
I also have a function, in calls.py, called get_points_live(), to get the value and timestamp for live sensor values
I need to store the live values and timestamps, to a JSON file, every five minutes
I also have an ingestion.py file, to be run every hour, for taking the stored data from the live queries, packaging it into 12 values per sensor, generating a final export JSON
From there, I need to call a third party api, each hour, and POST the aggregate data
Chat GPT 0:
Sounds like you've got a well-structured data pipeline in place! Here's how you could design and connect the parts together, based on your needs:
π§± Your Components So Far
-
ProjectManager
: Handles paths and directory structure. -
get_points_live()
incalls.py
: Grabs live values + timestamps. -
ingestion.py
: Aggregates 5-min interval JSONs into hourly summaries. -
Hourly POST to 3rd-party API: Needs to happen after aggregation.
β GOAL
-
Every 5 minutes: Query live data β Save to JSON
-
Every hour: Read 12 x 5-min JSONs β Aggregate β Save β POST to API
π§© Step-by-Step Architecture
1. ProjectManager updates
from pathlib import Path
import datetime
class ProjectManager:
def __init__(self, base_dir):
self.base = Path(base_dir)
self.data_dir = self.base / "data"
self.live_dir = self.data_dir / "live_snapshots"
self.export_dir = self.data_dir / "hourly_exports"
self.live_dir.mkdir(parents=True, exist_ok=True)
self.export_dir.mkdir(parents=True, exist_ok=True)
def get_live_snapshot_path(self, timestamp=None):
ts = timestamp or datetime.datetime.utcnow()
return self.live_dir / f"live_{ts.strftime('%Y%m%d_%H%M')}.json"
def get_hourly_export_path(self, timestamp=None):
ts = timestamp or datetime.datetime.utcnow()
return self.export_dir / f"export_{ts.strftime('%Y%m%d_%H')}.json"
2. Script for 5-minute Live Snapshot (snapshot.py
)
import json
import datetime
from calls import get_points_live
from project_manager import ProjectManager
pm = ProjectManager("/path/to/rjn-clarity")
data = get_points_live()
timestamp = datetime.datetime.utcnow()
output = {
"timestamp": timestamp.isoformat(),
"data": data
}
with open(pm.get_live_snapshot_path(timestamp), "w") as f:
json.dump(output, f, indent=2)
You can run this every 5 minutes with a cron
job or schedule
module.
3. Hourly Ingestion + API Push (ingestion.py
)
import json
import datetime
from pathlib import Path
from project_manager import ProjectManager
import requests
pm = ProjectManager("/path/to/rjn-clarity")
now = datetime.datetime.utcnow()
hour_start = now.replace(minute=0, second=0, microsecond=0)
# Collect last 12 5-min snapshots
snapshots = []
for i in range(12):
ts = hour_start - datetime.timedelta(minutes=5 * i)
path = pm.get_live_snapshot_path(ts)
if path.exists():
with open(path) as f:
snapshots.append(json.load(f))
snapshots.reverse() # ensure chronological order
# Aggregate data by sensor
aggregated = {}
for snap in snapshots:
for sensor, val in snap["data"].items():
aggregated.setdefault(sensor, []).append(val)
final_export = {
"timestamp": hour_start.isoformat(),
"aggregated_data": aggregated
}
# Save export JSON
export_path = pm.get_hourly_export_path(hour_start)
with open(export_path, "w") as f:
json.dump(final_export, f, indent=2)
# Post to 3rd-party API
response = requests.post("https://third-party.api/endpoint", json=final_export)
print(f"Posted to API, status code: {response.status_code}")
π Scheduling
-
Run
snapshot.py
every 5 minutes. -
Run
ingestion.py
every hour (at HH:00).
Use cron
, systemd
, or a schedule
loop for local runs.
Would you like help generating the cron job setup, or a main.py
that runs both tasks based on time with a scheduler?
Query 1:
i have a windows system - i think i should leverage Task Schedule to call powershell on a clock, to launch the python code, if it is off.
Also the program can run continuously. Would this be computationally expensive, to call three sensor values every five minutes, and then to transmit them each hour?
ChatGPT 1:
You're spot-on, George β on Windows, you'd use Task Scheduler instead of cron. And you're absolutely right: you can use Task Scheduler to run a PowerShell script every 5 minutes or hourly to launch your Python code only if needed.
β Answering Your Main Concern:
Would this be computationally expensive?
Nope, not at all.
You're only:
-
Calling 3 sensor values every 5 minutes
-
Writing a small JSON file
-
Aggregating & posting once per hour
This is very lightweight, especially if:
-
Youβre not querying a massive database
-
The sensor query is via HTTP, Modbus, or a local API
-
Your computer isnβt severely underpowered (like a Raspberry Pi Zero)
Even continuously running a Python loop with sleep()
and occasional I/O would use minimal CPU and memory β especially if the process is I/O-bound (waiting on sensor response or writing files).
β Alternatives
You have two options:
A. Scheduled Tasks with PowerShell
Let Python quit after running.
-
5-min task β
snapshot.py
-
60-min task β
ingestion.py
Example PowerShell command:
python "C:\path\to\rjn-clarity\snapshot.py"
Schedule that in Task Scheduler with:
-
Trigger: Daily β repeat every 5 minutes
-
Action: Start a program β PowerShell with
-Command
set as above
B. Python Daemon That Runs All Day
Write a loop that:
-
Sleeps until the next 5-minute mark
-
Runs
get_points_live()
and saves snapshot -
Once an hour (e.g., on minute == 0), runs the aggregation + POST
This gives you a single running program β super clean if youβre deploying it on a local machine or VM and want log control.
Want Help With Either?
-
A sample PowerShell command + scheduler XML?
-
Or a version of your app that runs as a loop with internal timers?
Let me know your preference and Iβll build you the right setup.
Query 2:
Let's go with the python daemon, that is operating system agnostic, so that it can ultimately be set up on a server.
I would also like powershell commands to check if the python daemon is running or not, and if it is not, to be able to start it.
ChatGPT 2:
Perfect β a Python daemon is a great way to make your system cross-platform and self-contained. Here's how we can structure it:
π Python Daemon: rjn_daemon.py
This script runs indefinitely:
-
Every 5 minutes: call
get_points_live()
and save a JSON -
Every hour (on the dot): aggregate and POST to API
β
rjn_daemon.py
(basic version)
import time
import datetime
from calls import get_points_live
from ingestion import run_hourly_ingestion
from project_manager import ProjectManager
pm = ProjectManager("/path/to/rjn-clarity")
def log(msg):
print(f"[{datetime.datetime.now().isoformat()}] {msg}")
def run_daemon():
last_hour = None
while True:
now = datetime.datetime.utcnow()
minute = now.minute
second = now.second
# Every 5 min snapshot
if minute % 5 == 0 and second < 5:
log("Running 5-minute snapshot")
snapshot = {
"timestamp": now.isoformat(),
"data": get_points_live()
}
with open(pm.get_live_snapshot_path(now), "w") as f:
import json
json.dump(snapshot, f, indent=2)
# Hourly aggregation
if now.minute == 0 and now.hour != last_hour:
last_hour = now.hour
log("Running hourly ingestion and API push")
run_hourly_ingestion(pm)
time.sleep(5) # Check every 5 seconds
Make sure ingestion.py
exposes a function like:
def run_hourly_ingestion(pm: ProjectManager):
# ... your ingestion logic from earlier ...
Then use:
if __name__ == "__main__":
run_daemon()
πͺ PowerShell Utilities
β
1. Check if rjn_daemon.py
is running
Get-Process | Where-Object { $_.Path -like "*rjn_daemon.py*" }
Or, if launched via python.exe
:
Get-Process python | Where-Object { $_.Path -like "*rjn_daemon.py*" -or $_.CommandLine -like "*rjn_daemon.py*" }
β 2. Start the daemon if not running
$daemonRunning = Get-Process python | Where-Object { $_.CommandLine -like "*rjn_daemon.py*" }
if (-not $daemonRunning) {
Start-Process -NoNewWindow -FilePath "python.exe" -ArgumentList "C:\path\to\rjn-clarity\rjn_daemon.py"
Write-Output "Daemon started."
} else {
Write-Output "Daemon is already running."
}
You could save this check/start script as check_daemon.ps1
.
π‘ Optional Enhancements
-
Add a PID file or lock file for more robust daemon tracking.
-
Log to a file with
logging
module. -
Consider graceful shutdown (e.g., catch Ctrl+C to clean up resources).
-
Run it as a service (Windows: NSSM, Linux: systemd).
Want me to write a full rjn_daemon.py
+ ingestion.py
with importable structure and logging?
Query 3:
sure, show me what you've got, with rigorous comments and a clear jargon glossary, targeted for a person who has never set up a daemon.
The purpose of this daemon is to retrieve data from the EDS server(s) every five minutes, and then POST is to RJN.
I want the code to be modular, non-specific to these API's mentioned, to work with for yet-unknown API's down the line.
Using a name like RJNCaptureDaemon in the code would be overly specific. Make it agnostic.
Also leverage the existing ProjectManager class. Here is that code:
(paste, 8:05 AM, 25 April 2025, entire projectmanager.py)
Chat GPT 3:
Here's your clean, modular, and extensible Python daemon framework ready to drop into your project. Key points:
π§ How to Use This
-
Implement your live query as
retrieve_snapshot_data()
β return a dictionary like:{"sensor_1": {"value": 5.2, "unit": "mg/L"}, "sensor_2": {...}}
-
Implement your ingestion logic as
run_hourly_ingestion(pm)
β e.g.:-
Load past 12 JSON snapshots
-
Compile them into one
-
POST to an external API
-
-
Run the daemon:
python rjn_daemon.py
π§ Why This Design?
-
π§© Modular: Works with any project and future APIs
-
π Time-based control: 5-min and hourly triggers
-
π§Ό Clean separation: ProjectManager handles paths, daemon handles timing, external code handles logic
-
π Portable: No system dependencies β cross-platform
If you'd like, I can build:
-
Example snapshot data (mock sensor values)
-
Ingestion logic that pulls the last 12 files
-
API POST stub (with
requests
)
Want that next?
Query 3:
Excellent, I like that you left the retrieve_snapshot_data() functions to be developed by me, based on the rest of my code base it might be appropriate to do a dynamic import , (at startup of the daemon, not continuously), from the active project directory. in
projects/<project>/scripts
is where the main script lives for the current API hookup.
Is this poor design?
# These should be implemented elsewhere in your project to stay modular
def retrieve_snapshot_data() -> dict:
""" Return live data as a dictionary.
Should include timestamp and sensor data. """
psuedocode:
import script from current project, in the hypomain.py file at pm.get_scripts_dir()
raise NotImplementedError("You must implement retrieve_snapshot_data()")
Chat GPT 3:
That's actually a smart and flexible design β using a dynamic import from projects/<project>/scripts/hypomain.py
makes the daemon reusable across multiple projects without hardcoding logic.
Hereβs how you can cleanly and safely do it using importlib
, so the daemon dynamically loads the correct hook at runtime:
β Modify the Daemon Startup
Replace the hardcoded retrieve_snapshot_data
function with a dynamic loader:
import importlib.util
import os
def load_snapshot_hook(pm: ProjectManager) -> Callable[[], dict]:
"""
Dynamically load retrieve_snapshot_data() from the project's hypomain.py script.
"""
script_path = pm.get_scripts_file_path("hypomain.py")
if not os.path.exists(script_path):
raise FileNotFoundError(f"No hypomain.py found at: {script_path}")
spec = importlib.util.spec_from_file_location("hypomain", script_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if not hasattr(module, "retrieve_snapshot_data"):
raise AttributeError(f"'retrieve_snapshot_data()' not found in {script_path}")
return getattr(module, "retrieve_snapshot_data")
β
Update your daemonβs __main__
block:
if __name__ == "__main__":
project_name = "eds_to_stantec_2025"
pm = ProjectManager(project_name)
snapshot_hook = load_snapshot_hook(pm)
daemon = DataCaptureDaemon(
project_manager=pm,
snapshot_hook=snapshot_hook,
hourly_hook=run_hourly_ingestion # You can keep this modular too if you want
)
daemon.run()
π§ Why this works well:
-
β Keeps the daemon generic
-
π Your per-project logic lives in
hypomain.py
, so swapping projects = no daemon changes -
π§ͺ Easy to test each piece independently
-
π‘οΈ Has error checking for missing files or functions
Let me know if you'd like to extend this pattern to run_hourly_ingestion()
as well!
Query 4:
Excellent
I want to make the main block more modular by having it pull the project name from a toml file, called default_project.toml, which is found in the "projects" directory and can point to any of the project directories found there.
this is the toml structure i have chosen
[default project]
project = "eds_to_rjn"
i need to add functionality to the project manager class,
def identify_default_project(self):
# look here for the toml file, default_project.toml
self.get_projects_dir()
def get_projects_dir(self):
return os.path.join(self.base_dir, 'projects')
def get_project_dir(self):
#
pass
Chat GPT 4:
Perfect approach β using a default_project.toml
file to decouple hardcoded project names is clean and extensible. Letβs flesh this out by:
-
Adding TOML parsing to load the default project.
-
Adding
get_projects_dir()
andget_project_dir()
methods toProjectManager
. -
Updating your
__main__
logic to be driven by that TOML file.
β
Full ProjectManager
Updates (Additions Only)
import toml # Requires `toml` or `tomli` for Python < 3.11
class ProjectManager:
# ... existing methods ...
def get_projects_dir(self):
return os.path.join(self.base_dir, 'projects')
def get_project_dir(self):
return os.path.join(self.get_projects_dir(), self.project_name)
@classmethod
def identify_default_project(cls):
"""
Class method that reads default_project.toml to identify the default project.
"""
base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
projects_dir = os.path.join(base_dir, 'projects')
default_toml_path = os.path.join(projects_dir, 'default_project.toml')
if not os.path.exists(default_toml_path):
raise FileNotFoundError(f"Missing default_project.toml in {projects_dir}")
with open(default_toml_path, 'r') as f:
config = toml.load(f)
try:
return config['default project']['project']
except KeyError as e:
raise KeyError(f"Missing key in default_project.toml: {e}")
β
Example default_project.toml
[default project]
project = "eds_to_rjn"
β
Cleaner __main__
Block Using the Default Project
if __name__ == "__main__":
# Dynamically identify the default project from TOML
project_name = ProjectManager.identify_default_project()
pm = ProjectManager(project_name)
# Dynamically load the hook from the default project's script
snapshot_hook = load_snapshot_hook(pm)
# Pass all into the daemon
daemon = DataCaptureDaemon(
project_manager=pm,
snapshot_hook=snapshot_hook,
hourly_hook=run_hourly_ingestion
)
daemon.run()
π§ Benefits Recap:
-
π Easily switch the active project via TOML β no code change needed
-
β»οΈ
ProjectManager
is now aware of its parentprojects/
structure -
π Clear separation of config (TOML) and logic (Python)
Let me know if you want to enforce .toml
schema validation or include project-specific logging next.