Tools exposed by the OpenHEXA MCP server
OpenHEXA v0.0.1 · Protocol 2025-03-26 · 27 tools
InstallList connections (external data sources) configured in a workspace. Returns connection name, slug, type (S3, GCS, POSTGRESQL, DHIS2, IASO, CUSTOM), and their fields. Field values are not visible to not be tempted to use them directly. Connections are used as parameters when running pipelines — use the connection slug as the parameter value.
List datasets in a workspace. Returns dataset summaries. Use get_dataset with the dataset slug to get full details including versions and files.
Get full details of a dataset: metadata, permissions, all versions with their files, and the latest version's file list. Use a file 'id' from the response with preview_dataset_file to see sample data. Use the dataset 'id' with create_dataset_version to add a new version.
Preview the content of a dataset file by its ID (from get_dataset's file list). Returns a sample of the data for tabular files (CSV, Parquet, etc.), file properties, and metadata. The sample status can be PROCESSING (still generating), FINISHED (sample ready), or FAILED.
Create a new dataset in a workspace with an initial version (v1) containing the provided files. The files_json parameter is a JSON array of {uri, contentType, content} objects, e.g. '[{"uri": "data.csv", "contentType": "text/csv", "content": "a,b\n1,2"}]'. Use create_dataset_version to add more versions later.
Create a new version of a dataset with optional inline files. Requires the dataset ID (from get_dataset or create_dataset) and a version name (e.g. 'v1', '2024-01'). Optionally provide a changelog describing what changed. To include files, provide files_json as a JSON array of {uri, contentType, content} objects, e.g. '[{"uri": "data.csv", "contentType": "text/csv", "content": "a,b\n1,2"}]'.
List files and directories in a workspace bucket. Use prefix to browse subdirectories (e.g. 'data/'). Returns file name, path, type (file/directory), size, and last update. Use read_file with the file path to read text content.
Read the content of a text file from a workspace bucket. Only works for UTF-8 text files up to 1 MB (includes .py, .csv, .json, .ipynb, .sql, .txt, etc.). Use list_files first to check file size and path. For Jupyter notebooks (.ipynb), the content is JSON that you can parse to read/modify cells. Use start_line and end_line (1-indexed, inclusive) to read a specific range of lines instead of the entire file.
Write text content to a new file in a workspace bucket. Fails if the file already exists. Maximum 1 MB. For Jupyter notebooks, provide valid .ipynb JSON content. Requires createObject permission on the workspace.
Call this tool when you are stuck, unsure what to do next, or need guidance on OpenHEXA. - Leave topic empty for an overview (orientation, common workflows, tips). Pass a reason describing what you are stuck on (e.g. 'unsure which tool to use', 'pipeline failed', 'cannot find dataset'). - Pass a topic name to fetch that doc page in full. Reason is optional here. Available topics: - cli: OpenHEXA comes with a CLI you can install globally on your system. This CLI allows you to interact with the OpenHEXA API and perform various tasks such as creating and managing pipelines, running local jobs and more. - notebooks-advanced: OpenHEXA notebooks are a customized [Jupyter](https://jupyter.org/) environment preloaded with the OpenHEXA SDK and toolboxes — this guide covers the workspace filesystem, the workspace Postgres from Python and R, and S3/GCS access. - sdk: The `openhexa.sdk` Python library lets you write pipelines and notebook code against OpenHEXA: a `workspace` helper for files, database, connections, datasets and webapps, plus a typed `OpenHexaClient` for programmatic and GraphQL access. - static-webapps: Static webapps host an HTML/CSS/JavaScript bundle inside an OpenHEXA workspace, served on its own subdomain, and can query OpenHEXA on behalf of viewing users via a pre-authenticated GraphQL endpoint — great for dashboards and custom forms. - toolbox-dhis2: The `openhexa.toolbox.dhis2` library connects to a DHIS2 instance with metadata caching, queries org units, data elements and indicators, fetches dataValueSets and analytics, and handles ISO periods and pyramid enrichment. - toolbox-hexa: The OpenHEXA SDK ships a typed `OpenHexaClient` to interact programmatically with the OpenHEXA platform — workspaces, pipelines, runs, datasets and webapps — with full type hints. It replaces the legacy `openhexa.toolbox.hexa` client. - toolbox-iaso: The `openhexa.toolbox.iaso` `IASO` class authenticates against an IASO instance (staging or production) and fetches projects, organisation units, forms, and form submissions — with optional pandas DataFrame output and filters. - writing-pipelines: End-to-end guide to authoring OpenHEXA pipelines: `@pipeline`/`@task` DAGs, parallelism and timeouts, typed parameters (datasets, files, secrets, connections, dynamic choices), `workspace.yaml`, GitHub Actions deploys, and Docker runs.
List pipelines in a workspace. Returns pipeline summaries (id, code, name, description). Use get_pipeline with the pipeline code to get full details including source code and run history.
Get full details of a pipeline: metadata, schedule, permissions, current version source code with all files, parameters, and recent runs. Use the returned 'id' field when calling run_pipeline or update_pipeline. Use a run 'id' from the runs list with get_pipeline_run to inspect outputs and logs.
Get detailed information about a specific pipeline run. Returns status, configuration used, messages (warnings/errors), outputs (files, database tables), and execution logs. Use this after run_pipeline to check results, or to inspect any run from get_pipeline's runs list.
Run a pipeline. Requires the pipeline UUID (from get_pipeline's 'id' field) and a JSON config string mapping parameter codes to values. Check the pipeline's parameters with get_pipeline first to see required parameters and their types. Example config: '{"param1": "value1", "param2": 42}'. Returns the created run's ID — use get_pipeline_run to monitor progress and get results.
Update a pipeline's properties. Provide the pipeline UUID (from get_pipeline's 'id' field) and any fields to change. For schedule, use a CRON expression (minute hour day-of-month month day-of-week), e.g. '0 6 * * 1' for Mondays at 6AM, '0 */2 * * *' for every 2 hours. Pass schedule='none' to disable scheduling. Only provided non-empty fields are updated.
Create a new pipeline in the current workspace and upload its files as the first version (v1). Always provide a meaningful description summarizing what the pipeline does. If the pipeline has no clear purpose or is blank, use "" as the description. Provide files_json as a JSON array of {path, content, encoding?} objects, e.g. '[{"path": "pipeline.py", "content": "from openhexa.sdk import pipeline\n..."}, {"path": "helpers.py", "content": "def clean(df): ..."}, {"path": "requirements.txt", "content": "pandas\nrequests"}]'. A file named 'pipeline.py' is required at the root and must follow the OpenHEXA SDK structure (use @pipeline and @task decorators). Other files (helper modules, requirements.txt, READMEs, config) can be added alongside it. `encoding` defaults to TEXT (UTF-8 string). Use BASE64 for binary assets bundled with the pipeline (rare — workspace files are usually the right place for data). OpenHEXA pipeline authoring cheat-sheet --------------------------------------- Imports come from `openhexa.sdk`: from openhexa.sdk import current_run, pipeline, parameter, workspace Structure: - The pipeline function uses `@pipeline("code")` and ONLY orchestrates tasks (no data work in it). - Tasks use `@<pipeline_name>.task` and contain the actual work. - Task return values can be passed as args to other tasks — that defines the DAG and enables parallel execution of independent tasks. - Task return values must be picklable. Pass each value as its own argument (no list/dict wrapping of return values). - You CANNOT use task return values inside the @pipeline-decorated function — they are proxies resolved only when consumed by another task. Logging (surfaces in the run UI): current_run.log_debug / log_info / log_warning / log_error / log_critical Workspace files (read/write under `workspace.files_path`): with open(f"{workspace.files_path}/data.csv") as f: ... current_run.add_file_output(output_path) # register file output Workspace database: from sqlalchemy import create_engine engine = create_engine(workspace.database_url) df.to_sql("transformed", con=engine, if_exists="replace") current_run.add_database_output("transformed") # register table output Pipeline parameters (declared with `@parameter` decorators stacked above `@pipeline`): - First positional arg `code` becomes the function argument name. - `type` is one of: `int`, `float`, `str`, `bool`, a connection class (`DHIS2Connection`, `PostgreSQLConnection`, `IASOConnection`, ...), `Dataset`, `File`, or `Secret` (for tokens/passwords). - Optional kwargs: `name`, `help`, `choices`, `default`, `required` (default True), `multiple` (default False), `widget`, `connection`. Connections: declare via `@parameter("conn", type=DHIS2Connection)` — the connection instance is passed to the pipeline function automatically. Timeouts: pass `timeout=<seconds>` to `@pipeline(...)` if the default 4h is not enough (max is instance-configured, typically 12h). Minimal valid skeleton: from openhexa.sdk import current_run, pipeline @pipeline("simple-etl") def simple_etl(): count = task_1() task_2(count) @simple_etl.task def task_1(): current_run.log_info("In task 1...") return 42 @simple_etl.task def task_2(count): current_run.log_info(f"In task 2... count is {count}") if __name__ == "__main__": simple_etl() Advanced example (parameters, connection, parallel tasks, file + database outputs): import pandas as pd from sqlalchemy import create_engine from openhexa.sdk import ( current_run, pipeline, parameter, workspace, DHIS2Connection, ) @parameter("dhis2", type=DHIS2Connection, name="DHIS2 instance") @parameter("org_unit", type=str, name="Org unit ID", required=True) @parameter("year", type=int, default=2024) @pipeline("dhis2-monthly-report", timeout=3600) def dhis2_monthly_report(dhis2, org_unit, year): raw = extract(dhis2, org_unit, year) cleaned = transform(raw) # Independent tasks — run in parallel because they only depend on `cleaned`: save_file(cleaned) save_table(cleaned) @dhis2_monthly_report.task def extract(dhis2, org_unit, year): current_run.log_info(f"Fetching data for {org_unit} ({year})") # dhis2 is a ready-to-use openhexa.toolbox.dhis2 client return dhis2.analytics.get(org_unit=org_unit, period=str(year)) @dhis2_monthly_report.task def transform(raw): df = pd.DataFrame(raw) return df.dropna() @dhis2_monthly_report.task def save_file(df): path = f"{workspace.files_path}/reports/monthly.csv" df.to_csv(path, index=False) current_run.add_file_output(path) @dhis2_monthly_report.task def save_table(df): engine = create_engine(workspace.database_url) df.to_sql("monthly_report", con=engine, if_exists="replace", index=False) current_run.add_database_output("monthly_report") if __name__ == "__main__": dhis2_monthly_report() For the full reference (scheduling, widgets, secrets, datasets, debugging), call get_help_or_doc(topic="writing-pipelines"). For SDK details, use topic="sdk".
Upload a new version of an existing pipeline. Requires the workspace slug, the pipeline code (from get_pipeline), and the files that make up the new version. Optionally provide a version name and description. The version number is auto-incremented. Provide files_json as a JSON array of {path, content, encoding?} objects, e.g. '[{"path": "pipeline.py", "content": "from openhexa.sdk import pipeline\n..."}, {"path": "helpers.py", "content": "def clean(df): ..."}, {"path": "requirements.txt", "content": "pandas\nrequests"}]'. A file named 'pipeline.py' is required at the root and must follow the OpenHEXA SDK structure (use @pipeline and @task decorators). Other files (helper modules, requirements.txt, READMEs, config) can be added alongside it. `encoding` defaults to TEXT (UTF-8 string). Use BASE64 for binary assets bundled with the pipeline (rare — workspace files are usually the right place for data). Use get_pipeline first to read the current files, then modify them and pass the full set here — each upload replaces the previous version's files entirely. Returns the created version details including id, version number, and parsed parameters. OpenHEXA pipeline authoring cheat-sheet --------------------------------------- Imports come from `openhexa.sdk`: from openhexa.sdk import current_run, pipeline, parameter, workspace Structure: - The pipeline function uses `@pipeline("code")` and ONLY orchestrates tasks (no data work in it). - Tasks use `@<pipeline_name>.task` and contain the actual work. - Task return values can be passed as args to other tasks — that defines the DAG and enables parallel execution of independent tasks. - Task return values must be picklable. Pass each value as its own argument (no list/dict wrapping of return values). - You CANNOT use task return values inside the @pipeline-decorated function — they are proxies resolved only when consumed by another task. Logging (surfaces in the run UI): current_run.log_debug / log_info / log_warning / log_error / log_critical Workspace files (read/write under `workspace.files_path`): with open(f"{workspace.files_path}/data.csv") as f: ... current_run.add_file_output(output_path) # register file output Workspace database: from sqlalchemy import create_engine engine = create_engine(workspace.database_url) df.to_sql("transformed", con=engine, if_exists="replace") current_run.add_database_output("transformed") # register table output Pipeline parameters (declared with `@parameter` decorators stacked above `@pipeline`): - First positional arg `code` becomes the function argument name. - `type` is one of: `int`, `float`, `str`, `bool`, a connection class (`DHIS2Connection`, `PostgreSQLConnection`, `IASOConnection`, ...), `Dataset`, `File`, or `Secret` (for tokens/passwords). - Optional kwargs: `name`, `help`, `choices`, `default`, `required` (default True), `multiple` (default False), `widget`, `connection`. Connections: declare via `@parameter("conn", type=DHIS2Connection)` — the connection instance is passed to the pipeline function automatically. Timeouts: pass `timeout=<seconds>` to `@pipeline(...)` if the default 4h is not enough (max is instance-configured, typically 12h). Minimal valid skeleton: from openhexa.sdk import current_run, pipeline @pipeline("simple-etl") def simple_etl(): count = task_1() task_2(count) @simple_etl.task def task_1(): current_run.log_info("In task 1...") return 42 @simple_etl.task def task_2(count): current_run.log_info(f"In task 2... count is {count}") if __name__ == "__main__": simple_etl() Advanced example (parameters, connection, parallel tasks, file + database outputs): import pandas as pd from sqlalchemy import create_engine from openhexa.sdk import ( current_run, pipeline, parameter, workspace, DHIS2Connection, ) @parameter("dhis2", type=DHIS2Connection, name="DHIS2 instance") @parameter("org_unit", type=str, name="Org unit ID", required=True) @parameter("year", type=int, default=2024) @pipeline("dhis2-monthly-report", timeout=3600) def dhis2_monthly_report(dhis2, org_unit, year): raw = extract(dhis2, org_unit, year) cleaned = transform(raw) # Independent tasks — run in parallel because they only depend on `cleaned`: save_file(cleaned) save_table(cleaned) @dhis2_monthly_report.task def extract(dhis2, org_unit, year): current_run.log_info(f"Fetching data for {org_unit} ({year})") # dhis2 is a ready-to-use openhexa.toolbox.dhis2 client return dhis2.analytics.get(org_unit=org_unit, period=str(year)) @dhis2_monthly_report.task def transform(raw): df = pd.DataFrame(raw) return df.dropna() @dhis2_monthly_report.task def save_file(df): path = f"{workspace.files_path}/reports/monthly.csv" df.to_csv(path, index=False) current_run.add_file_output(path) @dhis2_monthly_report.task def save_table(df): engine = create_engine(workspace.database_url) df.to_sql("monthly_report", con=engine, if_exists="replace", index=False) current_run.add_database_output("monthly_report") if __name__ == "__main__": dhis2_monthly_report() For the full reference (scheduling, widgets, secrets, datasets, debugging), call get_help_or_doc(topic="writing-pipelines"). For SDK details, use topic="sdk".
List available pipeline templates. Optionally filter by search query. Templates are reusable pipeline blueprints. Workflow: list_pipeline_templates -> get_pipeline_template (to review code) -> create_pipeline_from_template (to instantiate in a workspace).
Get full details of a pipeline template including its description, config, version history, and the current version's source code and parameters. Use the currentVersion.id as the template_version_id when calling create_pipeline_from_template.
Create a new pipeline in a workspace from a template version. Use get_pipeline_template first to find the template_version_id (the currentVersion.id). The new pipeline will have the template's code, parameters, and configuration pre-configured.
List static web apps in a workspace. The returned URL can be used to access each webapp in a browser. Use get_static_webapp with a webapp slug to inspect its files and configuration.
Get full details of a static web app: metadata, allowed API operations, and the current files with their contents. Use the slug from list_static_webapps (not the UUID). File contents are returned with an `encoding` field — TEXT for UTF-8 strings, BASE64 for binary files. Returns the webapp's `id` which can be passed to update_static_webapp.
Create a static web app in a workspace. Provide files_json as a JSON array of {path, content} objects, e.g. '[{"path": "index.html", "content": "<html>...</html>"}, {"path": "style.css", "content": "body { ... }"}]'. An index.html file is required at minimum. Returns the webapp URL to access it in a browser. Private static webapps can also call OpenHEXA's GraphQL API directly from their JS via a same-origin proxy at POST /graphql/ (auth handled by the webapp session, no token needed). Pass allowed_operations as a comma-separated list of scopes to grant API access at creation time — valid values: PIPELINES_READ, PIPELINES_RUN, FILES_READ, FILES_WRITE, DATASETS_READ, DATASETS_WRITE, USER_READ. Leave empty to create with no API access (you can grant it later via update_static_webapp). When generating the webapp's HTML/JS, you can include fetch('/graphql/', {method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({query, variables})}) calls. For the full reference (auth model, all top-level fields per scope, sample queries and mutations), call get_help_or_doc(topic="static-webapps").
Update an existing static web app. Provide the webapp UUID (from list_static_webapps) and any fields to change. Only provided non-empty fields are updated. Pass files_json as a JSON array of {path, content} objects to replace all files (e.g. '[{"path": "index.html", "content": "<html>...</html>"}]'). Pass name to change the human-readable name, and description to change the description. Pass allowed_operations as a comma-separated list of API scopes the webapp's JS may call via the same-origin /graphql/ proxy — valid values: PIPELINES_READ, PIPELINES_RUN, FILES_READ, FILES_WRITE, DATASETS_READ, DATASETS_WRITE, USER_READ. Leave empty to leave the current scopes untouched, or pass "NONE" to revoke all access. For the full reference (auth model, all top-level fields per scope, sample queries and mutations), call get_help_or_doc(topic="static-webapps").
List workspaces accessible to the current user. Optionally filter by name using the query parameter. This is typically the first tool to call to discover available workspaces before accessing pipelines, datasets, or files.
Get details of a specific workspace by its slug. Returns workspace metadata, countries, dockerImage, and permissions (update, manageMembers). Use list_workspaces first if you don't know the slug.
Update a workspace's properties. Provide the workspace slug and any fields to change. - name: new display name - description: new description - countries: JSON array of ISO alpha-2 country codes, e.g. '["US", "FR", "KE"]' - docker_image: custom Docker image for pipeline execution, e.g. 'eu.gcr.io/my-org/my-image:latest' Only provided non-empty fields are updated. Requires update permission on the workspace (ADMIN role or higher).