267 lines
8.1 KiB
Python
267 lines
8.1 KiB
Python
"""
|
|
Pipeline: parse ifccommit.yaml and execute IFC operations by calling
|
|
ifccommit.cmd_* functions directly.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
from typing import Callable
|
|
|
|
import yaml
|
|
|
|
# ifccommit.py lives at the repo root — ensure it is importable
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
import ifccommit
|
|
|
|
from webapp.schema import IfcCommitYaml, Operation
|
|
|
|
|
|
def parse_yaml(raw: str) -> IfcCommitYaml:
|
|
"""Parse raw YAML text into a validated IfcCommitYaml model."""
|
|
data = yaml.safe_load(raw)
|
|
return IfcCommitYaml.model_validate(data)
|
|
|
|
|
|
def has_runnable_ops(config: IfcCommitYaml) -> bool:
|
|
"""Return True if the pipeline has operations that modify IFC files.
|
|
Pure history-read pipelines (ReadHistory only) are display-only and don't run.
|
|
"""
|
|
return any(
|
|
op.command not in ("history", "list", "info") or op.write_psets
|
|
for op in config.operations
|
|
)
|
|
|
|
|
|
def run_pipeline(
|
|
config: IfcCommitYaml,
|
|
workdir: str,
|
|
log: Callable[[str], None],
|
|
overrides: dict[str, str] | None = None,
|
|
id_overrides: dict[str, str] | None = None,
|
|
element_overrides: dict[str, str] | None = None,
|
|
yaml_path: str | None = None,
|
|
) -> bool:
|
|
"""
|
|
Execute all operations in config against files in workdir.
|
|
Calls log(line) for each status/error line.
|
|
overrides maps op name → ifc_type (applied to info operations).
|
|
Returns True if all operations succeeded, False if any failed.
|
|
"""
|
|
overrides = overrides or {}
|
|
id_overrides = id_overrides or {}
|
|
element_overrides = element_overrides or {}
|
|
all_ok = True
|
|
|
|
for op in config.operations:
|
|
if op.command == "info" and op.name in overrides:
|
|
op = op.model_copy(update={"ifc_type": overrides[op.name]})
|
|
if op.command == "extract" and op.id and op.name in id_overrides:
|
|
op = op.model_copy(update={"id": id_overrides[op.name]})
|
|
if (
|
|
op.command == "modify"
|
|
and op.name in element_overrides
|
|
and element_overrides[op.name]
|
|
):
|
|
op = op.model_copy(update={"element": element_overrides[op.name]})
|
|
log(f"--- [{op.name}] command={op.command} ---")
|
|
try:
|
|
ns = _build_namespace(op, workdir, yaml_path=yaml_path)
|
|
_dispatch(op, ns)
|
|
log(f" OK")
|
|
except Exception as exc:
|
|
log(f" ERROR: {exc}")
|
|
all_ok = False
|
|
|
|
return all_ok
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _abs(workdir: str, path: str | None) -> str | None:
|
|
"""Resolve a repo-relative path to an absolute path inside workdir."""
|
|
if path is None:
|
|
return None
|
|
if os.path.isabs(path):
|
|
return path
|
|
return os.path.join(workdir, path)
|
|
|
|
|
|
def _build_namespace(
|
|
op: Operation, workdir: str, yaml_path: str | None = None
|
|
) -> argparse.Namespace:
|
|
"""
|
|
Translate an Operation into the argparse.Namespace that the matching
|
|
cmd_* function expects.
|
|
"""
|
|
cmd = op.command
|
|
|
|
if cmd == "list":
|
|
return argparse.Namespace(input=_abs(workdir, op.input))
|
|
|
|
if cmd == "info":
|
|
return argparse.Namespace(
|
|
input=_abs(workdir, op.input),
|
|
ifc_type=op.ifc_type,
|
|
)
|
|
|
|
if cmd == "extract":
|
|
outdir = os.path.dirname(_abs(workdir, op.output))
|
|
os.makedirs(outdir, exist_ok=True)
|
|
if op.id:
|
|
# Specific space extraction: extract type=IfcSpace id=A102
|
|
return argparse.Namespace(
|
|
input=_abs(workdir, op.input),
|
|
output=_abs(workdir, op.output),
|
|
name=op.id,
|
|
by="name",
|
|
)
|
|
return argparse.Namespace(
|
|
input=_abs(workdir, op.input),
|
|
output=_abs(workdir, op.output),
|
|
ifc_types=[op.query] if op.query else op.types,
|
|
)
|
|
|
|
if cmd == "insert":
|
|
outdir = os.path.dirname(_abs(workdir, op.output))
|
|
os.makedirs(outdir, exist_ok=True)
|
|
return argparse.Namespace(
|
|
base=_abs(workdir, op.base),
|
|
part=_abs(workdir, op.part),
|
|
output=_abs(workdir, op.output),
|
|
)
|
|
|
|
if cmd == "replace":
|
|
outdir = os.path.dirname(_abs(workdir, op.output))
|
|
os.makedirs(outdir, exist_ok=True)
|
|
return argparse.Namespace(
|
|
base=_abs(workdir, op.base),
|
|
space=op.space,
|
|
part=_abs(workdir, op.part),
|
|
output=_abs(workdir, op.output),
|
|
)
|
|
|
|
if cmd == "split":
|
|
return argparse.Namespace(
|
|
input=_abs(workdir, op.input),
|
|
outdir=_abs(workdir, op.outdir),
|
|
ifc_types=op.types or [],
|
|
)
|
|
|
|
if cmd == "space":
|
|
outdir = os.path.dirname(_abs(workdir, op.output))
|
|
os.makedirs(outdir, exist_ok=True)
|
|
return argparse.Namespace(
|
|
input=_abs(workdir, op.input),
|
|
output=_abs(workdir, op.output),
|
|
name=op.space,
|
|
by=op.by,
|
|
)
|
|
|
|
if cmd == "move":
|
|
outdir = os.path.dirname(_abs(workdir, op.output))
|
|
os.makedirs(outdir, exist_ok=True)
|
|
return argparse.Namespace(
|
|
input=_abs(workdir, op.input),
|
|
output=_abs(workdir, op.output),
|
|
name=op.element,
|
|
entity_id=op.entity_id,
|
|
x=op.x,
|
|
y=op.y,
|
|
z=op.z,
|
|
)
|
|
|
|
if cmd == "copy":
|
|
outdir = os.path.dirname(_abs(workdir, op.output))
|
|
os.makedirs(outdir, exist_ok=True)
|
|
return argparse.Namespace(
|
|
input=_abs(workdir, op.input),
|
|
output=_abs(workdir, op.output),
|
|
entity_ids=op.entity_ids,
|
|
tags=op.tags,
|
|
x=op.x,
|
|
y=op.y,
|
|
z=op.z,
|
|
)
|
|
|
|
if cmd == "diff":
|
|
target_ifc = op.target if op.target else op.output
|
|
return argparse.Namespace(
|
|
source=_abs(workdir, op.input),
|
|
target=_abs(workdir, target_ifc),
|
|
verbose=op.verbose if hasattr(op, "verbose") else False,
|
|
output=_abs(workdir, op.output),
|
|
)
|
|
|
|
if cmd == "modify":
|
|
outdir = os.path.dirname(_abs(workdir, op.output))
|
|
os.makedirs(outdir, exist_ok=True)
|
|
return argparse.Namespace(
|
|
input=_abs(workdir, op.input),
|
|
output=_abs(workdir, op.output),
|
|
name=op.element,
|
|
entity_id=op.entity_id,
|
|
x=op.x,
|
|
y=op.y,
|
|
z=op.z,
|
|
)
|
|
|
|
if cmd == "merge":
|
|
outdir = os.path.dirname(_abs(workdir, op.output))
|
|
os.makedirs(outdir, exist_ok=True)
|
|
return argparse.Namespace(
|
|
base=_abs(workdir, op.base),
|
|
space=op.space,
|
|
part=_abs(workdir, op.part),
|
|
output=_abs(workdir, op.output),
|
|
)
|
|
|
|
if cmd == "remove":
|
|
return argparse.Namespace(
|
|
input=_abs(workdir, op.input),
|
|
ifc_type=op.type,
|
|
output=_abs(workdir, op.output) if op.output else None,
|
|
)
|
|
|
|
if cmd == "history":
|
|
resolved_yaml = yaml_path or op.yaml_src or "yaml/duplex.yaml"
|
|
return argparse.Namespace(
|
|
input=_abs(workdir, op.input) if op.input else None,
|
|
output=_abs(workdir, op.output) if op.output else None,
|
|
write_psets=op.write_psets,
|
|
workdir=workdir,
|
|
yaml=resolved_yaml,
|
|
)
|
|
|
|
raise ValueError(f"Unknown command: {cmd}")
|
|
|
|
|
|
_DISPATCH = {
|
|
"list": ifccommit.cmd_list,
|
|
"info": ifccommit.cmd_info,
|
|
"extract": ifccommit.cmd_extract,
|
|
"insert": ifccommit.cmd_insert,
|
|
"replace": ifccommit.cmd_replace,
|
|
"split": ifccommit.cmd_split,
|
|
"space": ifccommit.cmd_space,
|
|
"move": ifccommit.cmd_move,
|
|
"copy": ifccommit.cmd_copy,
|
|
"diff": ifccommit.cmd_diff,
|
|
"history": ifccommit.cmd_history,
|
|
"remove": ifccommit.cmd_remove,
|
|
}
|
|
|
|
|
|
def _dispatch(op: Operation, ns: argparse.Namespace) -> None:
|
|
cmd = op.command
|
|
if cmd == "extract" and op.id:
|
|
ifccommit.cmd_space(ns)
|
|
elif cmd == "modify":
|
|
ifccommit.cmd_move(ns)
|
|
elif cmd == "merge":
|
|
ifccommit.cmd_replace(ns)
|
|
else:
|
|
_DISPATCH[cmd](ns)
|