EDS to RJN data pipeline

Date: 2025 04-April 25

Query 0:

I have a ProjectManager class, for directory management

I also have a function, in calls.py, called get_points_live(), to get the value and timestamp for live sensor values

I need to store the live values and timestamps, to a JSON file, every five minutes

I also have an ingestion.py file, to be run every hour, for taking the stored data from the live queries, packaging it into 12 values per sensor, generating a final export JSON

From there, I need to call a third party api, each hour, and POST the aggregate data

Chat GPT 0:

Sounds like you've got a well-structured data pipeline in place! Here's how you could design and connect the parts together, based on your needs:


🧱 Your Components So Far

  1. ProjectManager: Handles paths and directory structure.

  2. get_points_live() in calls.py: Grabs live values + timestamps.

  3. ingestion.py: Aggregates 5-min interval JSONs into hourly summaries.

  4. Hourly POST to 3rd-party API: Needs to happen after aggregation.


βœ… GOAL


🧩 Step-by-Step Architecture

1. ProjectManager updates

from pathlib import Path
import datetime

class ProjectManager:
    def __init__(self, base_dir):
        self.base = Path(base_dir)
        self.data_dir = self.base / "data"
        self.live_dir = self.data_dir / "live_snapshots"
        self.export_dir = self.data_dir / "hourly_exports"
        self.live_dir.mkdir(parents=True, exist_ok=True)
        self.export_dir.mkdir(parents=True, exist_ok=True)

    def get_live_snapshot_path(self, timestamp=None):
        ts = timestamp or datetime.datetime.utcnow()
        return self.live_dir / f"live_{ts.strftime('%Y%m%d_%H%M')}.json"

    def get_hourly_export_path(self, timestamp=None):
        ts = timestamp or datetime.datetime.utcnow()
        return self.export_dir / f"export_{ts.strftime('%Y%m%d_%H')}.json"

2. Script for 5-minute Live Snapshot (snapshot.py)

import json
import datetime
from calls import get_points_live
from project_manager import ProjectManager

pm = ProjectManager("/path/to/rjn-clarity")
data = get_points_live()
timestamp = datetime.datetime.utcnow()

output = {
    "timestamp": timestamp.isoformat(),
    "data": data
}

with open(pm.get_live_snapshot_path(timestamp), "w") as f:
    json.dump(output, f, indent=2)

You can run this every 5 minutes with a cron job or schedule module.


3. Hourly Ingestion + API Push (ingestion.py)

import json
import datetime
from pathlib import Path
from project_manager import ProjectManager
import requests

pm = ProjectManager("/path/to/rjn-clarity")
now = datetime.datetime.utcnow()
hour_start = now.replace(minute=0, second=0, microsecond=0)

# Collect last 12 5-min snapshots
snapshots = []
for i in range(12):
    ts = hour_start - datetime.timedelta(minutes=5 * i)
    path = pm.get_live_snapshot_path(ts)
    if path.exists():
        with open(path) as f:
            snapshots.append(json.load(f))
snapshots.reverse()  # ensure chronological order

# Aggregate data by sensor
aggregated = {}
for snap in snapshots:
    for sensor, val in snap["data"].items():
        aggregated.setdefault(sensor, []).append(val)

final_export = {
    "timestamp": hour_start.isoformat(),
    "aggregated_data": aggregated
}

# Save export JSON
export_path = pm.get_hourly_export_path(hour_start)
with open(export_path, "w") as f:
    json.dump(final_export, f, indent=2)

# Post to 3rd-party API
response = requests.post("https://third-party.api/endpoint", json=final_export)
print(f"Posted to API, status code: {response.status_code}")

πŸ•’ Scheduling

Use cron, systemd, or a schedule loop for local runs.


Would you like help generating the cron job setup, or a main.py that runs both tasks based on time with a scheduler?

Query 1:

i have a windows system - i think i should leverage Task Schedule to call powershell on a clock, to launch the python code, if it is off.

Also the program can run continuously. Would this be computationally expensive, to call three sensor values every five minutes, and then to transmit them each hour?

ChatGPT 1:

You're spot-on, George β€” on Windows, you'd use Task Scheduler instead of cron. And you're absolutely right: you can use Task Scheduler to run a PowerShell script every 5 minutes or hourly to launch your Python code only if needed.


βœ… Answering Your Main Concern:

Would this be computationally expensive?

Nope, not at all.

You're only:

This is very lightweight, especially if:

Even continuously running a Python loop with sleep() and occasional I/O would use minimal CPU and memory β€” especially if the process is I/O-bound (waiting on sensor response or writing files).


βœ… Alternatives

You have two options:

A. Scheduled Tasks with PowerShell

Let Python quit after running.

Example PowerShell command:

python "C:\path\to\rjn-clarity\snapshot.py"

Schedule that in Task Scheduler with:

B. Python Daemon That Runs All Day

Write a loop that:

This gives you a single running program β€” super clean if you’re deploying it on a local machine or VM and want log control.


Want Help With Either?

Let me know your preference and I’ll build you the right setup.

Query 2:

Let's go with the python daemon, that is operating system agnostic, so that it can ultimately be set up on a server.

I would also like powershell commands to check if the python daemon is running or not, and if it is not, to be able to start it.

ChatGPT 2:

Perfect β€” a Python daemon is a great way to make your system cross-platform and self-contained. Here's how we can structure it:


🐍 Python Daemon: rjn_daemon.py

This script runs indefinitely:

βœ… rjn_daemon.py (basic version)

import time
import datetime
from calls import get_points_live
from ingestion import run_hourly_ingestion
from project_manager import ProjectManager

pm = ProjectManager("/path/to/rjn-clarity")

def log(msg):
    print(f"[{datetime.datetime.now().isoformat()}] {msg}")

def run_daemon():
    last_hour = None
    while True:
        now = datetime.datetime.utcnow()
        minute = now.minute
        second = now.second

        # Every 5 min snapshot
        if minute % 5 == 0 and second < 5:
            log("Running 5-minute snapshot")
            snapshot = {
                "timestamp": now.isoformat(),
                "data": get_points_live()
            }
            with open(pm.get_live_snapshot_path(now), "w") as f:
                import json
                json.dump(snapshot, f, indent=2)

        # Hourly aggregation
        if now.minute == 0 and now.hour != last_hour:
            last_hour = now.hour
            log("Running hourly ingestion and API push")
            run_hourly_ingestion(pm)

        time.sleep(5)  # Check every 5 seconds

Make sure ingestion.py exposes a function like:

def run_hourly_ingestion(pm: ProjectManager):
    # ... your ingestion logic from earlier ...

Then use:

if __name__ == "__main__":
    run_daemon()

πŸͺŸ PowerShell Utilities

βœ… 1. Check if rjn_daemon.py is running

Get-Process | Where-Object { $_.Path -like "*rjn_daemon.py*" }

Or, if launched via python.exe:

Get-Process python | Where-Object { $_.Path -like "*rjn_daemon.py*" -or $_.CommandLine -like "*rjn_daemon.py*" }

βœ… 2. Start the daemon if not running

$daemonRunning = Get-Process python | Where-Object { $_.CommandLine -like "*rjn_daemon.py*" }

if (-not $daemonRunning) {
    Start-Process -NoNewWindow -FilePath "python.exe" -ArgumentList "C:\path\to\rjn-clarity\rjn_daemon.py"
    Write-Output "Daemon started."
} else {
    Write-Output "Daemon is already running."
}

You could save this check/start script as check_daemon.ps1.


πŸ’‘ Optional Enhancements


Want me to write a full rjn_daemon.py + ingestion.py with importable structure and logging?

Query 3:

sure, show me what you've got, with rigorous comments and a clear jargon glossary, targeted for a person who has never set up a daemon.

The purpose of this daemon is to retrieve data from the EDS server(s) every five minutes, and then POST is to RJN.

I want the code to be modular, non-specific to these API's mentioned, to work with for yet-unknown API's down the line.

Using a name like RJNCaptureDaemon in the code would be overly specific. Make it agnostic.

Also leverage the existing ProjectManager class. Here is that code:

(paste, 8:05 AM, 25 April 2025, entire projectmanager.py)

Chat GPT 3:

Here's your clean, modular, and extensible Python daemon framework ready to drop into your project. Key points:


πŸ”§ How to Use This

  1. Implement your live query as retrieve_snapshot_data() β€” return a dictionary like:

    {"sensor_1": {"value": 5.2, "unit": "mg/L"}, "sensor_2": {...}}
    
  2. Implement your ingestion logic as run_hourly_ingestion(pm) β€” e.g.:

    • Load past 12 JSON snapshots

    • Compile them into one

    • POST to an external API

  3. Run the daemon:

    python rjn_daemon.py
    

🧠 Why This Design?


If you'd like, I can build:

Want that next?

Query 3:

Excellent, I like that you left the retrieve_snapshot_data() functions to be developed by me, based on the rest of my code base it might be appropriate to do a dynamic import , (at startup of the daemon, not continuously), from the active project directory. in

projects/<project>/scripts

is where the main script lives for the current API hookup.

Is this poor design?

# These should be implemented elsewhere in your project to stay modular 
def retrieve_snapshot_data() -> dict: 
	""" Return live data as a dictionary. 
	Should include timestamp and sensor data. """ 
	psuedocode:
	import script from current project, in the hypomain.py file at pm.get_scripts_dir() 
	raise NotImplementedError("You must implement retrieve_snapshot_data()")

Chat GPT 3:

That's actually a smart and flexible design β€” using a dynamic import from projects/<project>/scripts/hypomain.py makes the daemon reusable across multiple projects without hardcoding logic.

Here’s how you can cleanly and safely do it using importlib, so the daemon dynamically loads the correct hook at runtime:


βœ… Modify the Daemon Startup

Replace the hardcoded retrieve_snapshot_data function with a dynamic loader:

import importlib.util
import os

def load_snapshot_hook(pm: ProjectManager) -> Callable[[], dict]:
    """
    Dynamically load retrieve_snapshot_data() from the project's hypomain.py script.
    """
    script_path = pm.get_scripts_file_path("hypomain.py")

    if not os.path.exists(script_path):
        raise FileNotFoundError(f"No hypomain.py found at: {script_path}")

    spec = importlib.util.spec_from_file_location("hypomain", script_path)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)

    if not hasattr(module, "retrieve_snapshot_data"):
        raise AttributeError(f"'retrieve_snapshot_data()' not found in {script_path}")

    return getattr(module, "retrieve_snapshot_data")

βœ… Update your daemon’s __main__ block:

if __name__ == "__main__":
    project_name = "eds_to_stantec_2025"
    pm = ProjectManager(project_name)

    snapshot_hook = load_snapshot_hook(pm)

    daemon = DataCaptureDaemon(
        project_manager=pm,
        snapshot_hook=snapshot_hook,
        hourly_hook=run_hourly_ingestion  # You can keep this modular too if you want
    )

    daemon.run()

🧠 Why this works well:

Let me know if you'd like to extend this pattern to run_hourly_ingestion() as well!

Query 4:

Excellent

I want to make the main block more modular by having it pull the project name from a toml file, called default_project.toml, which is found in the "projects" directory and can point to any of the project directories found there.

this is the toml structure i have chosen

[default project]
project = "eds_to_rjn"

i need to add functionality to the project manager class,

def identify_default_project(self):
	# look here for the toml file, default_project.toml
	self.get_projects_dir()

def get_projects_dir(self):
	return os.path.join(self.base_dir, 'projects')

def get_project_dir(self):
	#
	pass

Chat GPT 4:

Perfect approach β€” using a default_project.toml file to decouple hardcoded project names is clean and extensible. Let’s flesh this out by:

  1. Adding TOML parsing to load the default project.

  2. Adding get_projects_dir() and get_project_dir() methods to ProjectManager.

  3. Updating your __main__ logic to be driven by that TOML file.


βœ… Full ProjectManager Updates (Additions Only)

import toml  # Requires `toml` or `tomli` for Python < 3.11

class ProjectManager:
    # ... existing methods ...

    def get_projects_dir(self):
        return os.path.join(self.base_dir, 'projects')

    def get_project_dir(self):
        return os.path.join(self.get_projects_dir(), self.project_name)

    @classmethod
    def identify_default_project(cls):
        """
        Class method that reads default_project.toml to identify the default project.
        """
        base_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
        projects_dir = os.path.join(base_dir, 'projects')
        default_toml_path = os.path.join(projects_dir, 'default_project.toml')

        if not os.path.exists(default_toml_path):
            raise FileNotFoundError(f"Missing default_project.toml in {projects_dir}")

        with open(default_toml_path, 'r') as f:
            config = toml.load(f)

        try:
            return config['default project']['project']
        except KeyError as e:
            raise KeyError(f"Missing key in default_project.toml: {e}")

βœ… Example default_project.toml

[default project]
project = "eds_to_rjn"

βœ… Cleaner __main__ Block Using the Default Project

if __name__ == "__main__":
    # Dynamically identify the default project from TOML
    project_name = ProjectManager.identify_default_project()
    pm = ProjectManager(project_name)

    # Dynamically load the hook from the default project's script
    snapshot_hook = load_snapshot_hook(pm)

    # Pass all into the daemon
    daemon = DataCaptureDaemon(
        project_manager=pm,
        snapshot_hook=snapshot_hook,
        hourly_hook=run_hourly_ingestion
    )

    daemon.run()

🧠 Benefits Recap:

Let me know if you want to enforce .toml schema validation or include project-specific logging next.