907 lines
32 KiB
Python
907 lines
32 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ifc-commit — slice, inspect, and compose IFC files.
|
|
|
|
Usage:
|
|
uv run ifccommit.py list <input.ifc>
|
|
uv run ifccommit.py info <input.ifc> <IfcType>
|
|
uv run ifccommit.py extract <input.ifc> <output.ifc> <IfcType|preset> [...]
|
|
uv run ifccommit.py insert <base.ifc> <part.ifc> <output.ifc>
|
|
uv run ifccommit.py replace <base.ifc> <space> <part.ifc> <output.ifc>
|
|
uv run ifccommit.py split <input.ifc> <outdir> [IfcType|preset ...]
|
|
uv run ifccommit.py space <input.ifc> <output.ifc> <name> [--by name|longname]
|
|
uv run ifccommit.py move <input.ifc> <output.ifc> [entity_id] [--name X] [--x N] [--y N] [--z N]
|
|
uv run ifccommit.py copy <input.ifc> <output.ifc> [--entity-ids N ...] [--tags X ...] [--x N] [--y N] [--z N]
|
|
uv run ifccommit.py diff <source.ifc> <target.ifc> [-v]
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import tempfile
|
|
import argparse
|
|
import ifcopenshell
|
|
import ifcopenshell.util.placement
|
|
import ifcopenshell.util.selector
|
|
import ifcopenshell.api
|
|
import ifcpatch
|
|
|
|
PRESETS = {
|
|
"walls": ["IfcWall", "IfcWallStandardCase"],
|
|
"storey": ["IfcBuildingStorey"],
|
|
"furnitures": ["IfcFurnishingElement"],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def resolve_types(tokens):
|
|
"""Expand preset names to IFC type lists; pass through raw IfcType tokens."""
|
|
types = []
|
|
for token in tokens:
|
|
if token in PRESETS:
|
|
types.extend(PRESETS[token])
|
|
else:
|
|
types.append(token)
|
|
return types
|
|
|
|
|
|
def extract_elements(input_path, model, query):
|
|
"""Run ExtractElements and return the resulting ifcopenshell.file."""
|
|
return ifcpatch.execute(
|
|
{
|
|
"input": input_path,
|
|
"file": model,
|
|
"recipe": "ExtractElements",
|
|
"arguments": [query],
|
|
}
|
|
)
|
|
|
|
|
|
def merge_models(input_path, base, parts):
|
|
"""Merge a list of ifcopenshell.file objects into base and return result."""
|
|
return ifcpatch.execute(
|
|
{
|
|
"input": input_path,
|
|
"file": base,
|
|
"recipe": "MergeProjects",
|
|
"arguments": [parts],
|
|
}
|
|
)
|
|
|
|
|
|
def write_and_report(result, path):
|
|
"""Write result to path and print product count."""
|
|
ifcpatch.write(result, path)
|
|
out = ifcopenshell.open(path)
|
|
count = sum(1 for _ in out.by_type("IfcProduct"))
|
|
print(f"Written : {path} ({count} products)")
|
|
|
|
|
|
def extract_with_location(input_path, ifc_types, location):
|
|
"""Extract each type separately with a location filter, merge, return output path."""
|
|
tmp_files = []
|
|
for ifc_type in ifc_types:
|
|
src = ifcopenshell.open(input_path)
|
|
result = extract_elements(
|
|
input_path, src, f'{ifc_type}, location = "{location}"'
|
|
)
|
|
tmp = tempfile.mktemp(suffix=".ifc")
|
|
ifcpatch.write(result, tmp)
|
|
tmp_files.append(tmp)
|
|
|
|
out_tmp = tempfile.mktemp(suffix=".ifc")
|
|
if len(tmp_files) == 1:
|
|
os.rename(tmp_files[0], out_tmp)
|
|
else:
|
|
base = ifcopenshell.open(tmp_files[0])
|
|
extra = [ifcopenshell.open(f) for f in tmp_files[1:]]
|
|
result = merge_models(tmp_files[0], base, extra)
|
|
ifcpatch.write(result, out_tmp)
|
|
for f in tmp_files:
|
|
os.unlink(f)
|
|
|
|
return out_tmp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Commands
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def cmd_list(args):
|
|
"""List all IFC types present in the file and their element counts."""
|
|
model = ifcopenshell.open(args.input)
|
|
types = {}
|
|
for element in model:
|
|
t = element.is_a()
|
|
types[t] = types.get(t, 0) + 1
|
|
for t, count in sorted(types.items(), key=lambda x: -x[1]):
|
|
print(f" {count:>6} {t}")
|
|
|
|
|
|
def cmd_info(args):
|
|
"""Print attributes of each element matching the given IFC type."""
|
|
model = ifcopenshell.open(args.input)
|
|
elements = model.by_type(args.ifc_type)
|
|
print(f"{len(elements)} element(s) of type {args.ifc_type}\n")
|
|
for el in elements:
|
|
info = el.get_info()
|
|
print(f" #{el.id()} {el.is_a()}")
|
|
for k, v in info.items():
|
|
if k not in ("id", "type"):
|
|
print(f" {k}: {v}")
|
|
print()
|
|
|
|
|
|
def cmd_extract(args):
|
|
"""Extract elements of given IFC types (or presets) into a new IFC file."""
|
|
ifc_types = resolve_types(args.ifc_types)
|
|
query = ", ".join(ifc_types)
|
|
print(f"Opening : {args.input}")
|
|
print(f"Extracting: {query}")
|
|
model = ifcopenshell.open(args.input)
|
|
result = extract_elements(args.input, model, query)
|
|
write_and_report(result, args.output)
|
|
|
|
|
|
def cmd_insert(args):
|
|
"""Insert (merge) a part IFC file into a base IFC file."""
|
|
print(f"Base : {args.base}")
|
|
print(f"Part : {args.part}")
|
|
base_model = ifcopenshell.open(args.base)
|
|
part_model = ifcopenshell.open(args.part)
|
|
result = merge_models(args.base, base_model, [part_model])
|
|
write_and_report(result, args.output)
|
|
|
|
|
|
def cmd_replace(args):
|
|
"""Remove a space and its contents from base, then merge the part file in."""
|
|
print(f"Base : {args.base}")
|
|
print(f"Space : {args.space}")
|
|
print(f"Part : {args.part}")
|
|
|
|
model = ifcopenshell.open(args.base)
|
|
|
|
spaces = [s for s in model.by_type("IfcSpace") if s.Name == args.space]
|
|
if not spaces:
|
|
print(f"No IfcSpace with Name = {args.space!r}")
|
|
return
|
|
|
|
contained = ifcopenshell.util.selector.filter_elements(
|
|
model, f'IfcElement, location = "{args.space}"'
|
|
)
|
|
to_remove = list(contained) + [spaces[0]]
|
|
print(f"Removing : {len(to_remove)} elements (space + contents)")
|
|
for el in to_remove:
|
|
ifcopenshell.api.run("root.remove_product", model, product=el)
|
|
|
|
tmp = tempfile.mktemp(suffix=".ifc")
|
|
model.write(tmp)
|
|
stripped = ifcopenshell.open(tmp)
|
|
part_model = ifcopenshell.open(args.part)
|
|
result = merge_models(tmp, stripped, [part_model])
|
|
write_and_report(result, args.output)
|
|
os.unlink(tmp)
|
|
|
|
|
|
def cmd_split(args):
|
|
"""Split an IFC file into one file per storey, optionally filtered by type."""
|
|
print(f"Opening : {args.input}")
|
|
model = ifcopenshell.open(args.input)
|
|
storeys = model.by_type("IfcBuildingStorey")
|
|
if not storeys:
|
|
print("No IfcBuildingStorey found.")
|
|
return
|
|
|
|
ifc_types = resolve_types(args.ifc_types) if args.ifc_types else ["IfcElement"]
|
|
stem = os.path.splitext(os.path.basename(args.input))[0]
|
|
os.makedirs(args.outdir, exist_ok=True)
|
|
|
|
for storey in storeys:
|
|
name = storey.Name or storey.GlobalId
|
|
safe = re.sub(r"[^\w-]", "_", name)
|
|
output = os.path.join(args.outdir, f"{stem}_{safe}.ifc")
|
|
|
|
# location filter breaks with + operator — extract each type separately
|
|
tmp = extract_with_location(args.input, ifc_types, name)
|
|
os.rename(tmp, output)
|
|
|
|
out_model = ifcopenshell.open(output)
|
|
count = sum(1 for _ in out_model.by_type("IfcProduct"))
|
|
print(f" {name:<20} → {output} ({count} products)")
|
|
|
|
|
|
def cmd_space(args):
|
|
"""Extract an IfcSpace and all objects contained within it."""
|
|
print(f"Opening : {args.input}")
|
|
model = ifcopenshell.open(args.input)
|
|
|
|
if args.by == "longname":
|
|
matches = [s for s in model.by_type("IfcSpace") if s.LongName == args.name]
|
|
if not matches:
|
|
print(f"No IfcSpace with LongName = {args.name!r}")
|
|
return
|
|
location_names = [s.Name for s in matches]
|
|
else:
|
|
location_names = [args.name]
|
|
|
|
stem, ext = os.path.splitext(args.output)
|
|
|
|
for loc in location_names:
|
|
output = f"{stem}_{loc}{ext}" if len(location_names) > 1 else args.output
|
|
print(f'Extracting: IfcElement, location = "{loc}"')
|
|
src = ifcopenshell.open(args.input)
|
|
result = extract_elements(args.input, src, f'IfcElement, location = "{loc}"')
|
|
ifcpatch.write(result, output)
|
|
out_model = ifcopenshell.open(output)
|
|
spaces = list(out_model.by_type("IfcSpace"))
|
|
count = sum(1 for _ in out_model.by_type("IfcProduct"))
|
|
print(f"Written : {output} ({count} products)")
|
|
for s in spaces:
|
|
print(f" {s.Name} {s.LongName or ''}")
|
|
|
|
|
|
def cmd_history(args):
|
|
"""Write or read Pset_GitCommit on IFC elements.
|
|
|
|
Write mode (write_psets=True): stamp Pset_GitCommit on every IfcProduct
|
|
in each operation's output IFC, using the current HEAD commit info.
|
|
|
|
Read mode (input present): collect Pset_GitCommit from the given IFC
|
|
file and emit the result as JSON.
|
|
"""
|
|
import json
|
|
import subprocess
|
|
import yaml as _yaml
|
|
|
|
write_psets = getattr(args, "write_psets", False)
|
|
input_path = getattr(args, "input", None)
|
|
|
|
# ── Read mode ─────────────────────────────────────────────────────────────
|
|
if input_path:
|
|
model = ifcopenshell.open(input_path)
|
|
records = []
|
|
for element in model.by_type("IfcProduct"):
|
|
for rel in getattr(element, "IsDefinedBy", None) or []:
|
|
if not rel.is_a("IfcRelDefinesByProperties"):
|
|
continue
|
|
pset = rel.RelatingPropertyDefinition
|
|
if pset.Name != "Pset_GitCommit":
|
|
continue
|
|
props = {p.Name: p.NominalValue.wrappedValue for p in pset.HasProperties}
|
|
records.append({
|
|
"id": element.id(),
|
|
"name": element.Name or "",
|
|
"type": element.is_a(),
|
|
**props,
|
|
})
|
|
output = getattr(args, "output", None)
|
|
if output:
|
|
dirpart = os.path.dirname(output)
|
|
if dirpart:
|
|
os.makedirs(dirpart, exist_ok=True)
|
|
with open(output, "w") as f:
|
|
json.dump(records, f, indent=2)
|
|
print(f"Written : {output} ({len(records)} record(s))")
|
|
else:
|
|
print(json.dumps(records, indent=2))
|
|
return
|
|
|
|
# ── Write mode ────────────────────────────────────────────────────────────
|
|
if not write_psets:
|
|
print("history: nothing to do (no input and write_psets not set)")
|
|
return
|
|
|
|
workdir = getattr(args, "workdir", ".")
|
|
yaml_rel = getattr(args, "yaml", None)
|
|
|
|
def _git(*cmd):
|
|
return subprocess.run(
|
|
["git"] + list(cmd),
|
|
capture_output=True, text=True, cwd=workdir,
|
|
).stdout.strip()
|
|
|
|
commit_hash = _git("rev-parse", "HEAD")
|
|
commit_message = _git("log", "-1", "--pretty=%s")
|
|
commit_author = _git("log", "-1", "--pretty=%an <%ae>")
|
|
commit_date = _git("log", "-1", "--pretty=%cI")
|
|
commit_branch = _git("rev-parse", "--abbrev-ref", "HEAD")
|
|
|
|
if not commit_hash:
|
|
print("history: no git commits found — skipping Pset write")
|
|
return
|
|
|
|
if yaml_rel:
|
|
yaml_file = yaml_rel if os.path.isabs(yaml_rel) else os.path.join(workdir, yaml_rel)
|
|
with open(yaml_file) as f:
|
|
config = _yaml.safe_load(f)
|
|
else:
|
|
config = {"operations": []}
|
|
|
|
ifc_ops = [
|
|
op for op in config.get("operations", [])
|
|
if (op.get("output") or "").endswith(".ifc") and op.get("command") != "history"
|
|
]
|
|
|
|
def _resolve(p):
|
|
return p if os.path.isabs(p) else os.path.join(workdir, p)
|
|
|
|
def _changed_ids(before_path, after_path):
|
|
"""Return file-local IDs (in after) of elements added or moved vs before.
|
|
|
|
Uses GlobalId for matching so that entity IDs rewritten by ifcpatch
|
|
(e.g. after ExtractElements) don't produce false positives.
|
|
"""
|
|
src = ifcopenshell.open(before_path)
|
|
tgt = ifcopenshell.open(after_path)
|
|
src_map = {e.GlobalId: e for e in src.by_type("IfcProduct") if getattr(e, "GlobalId", None)}
|
|
tgt_map = {e.GlobalId: e for e in tgt.by_type("IfcProduct") if getattr(e, "GlobalId", None)}
|
|
changed = set()
|
|
for gid, tgt_el in tgt_map.items():
|
|
if gid not in src_map:
|
|
changed.add(tgt_el.id()) # truly new element
|
|
continue
|
|
try:
|
|
sp = ifcopenshell.util.placement.get_local_placement(src_map[gid].ObjectPlacement)
|
|
tp = ifcopenshell.util.placement.get_local_placement(tgt_el.ObjectPlacement)
|
|
if any(abs(sp[i, 3] - tp[i, 3]) > 0.001 for i in range(3)):
|
|
changed.add(tgt_el.id())
|
|
except Exception:
|
|
pass
|
|
return changed
|
|
|
|
total = 0
|
|
for op in ifc_ops:
|
|
out_abs = _resolve(op["output"])
|
|
if not os.path.exists(out_abs):
|
|
print(f" skip (not found): {op['output']}")
|
|
continue
|
|
|
|
# Determine comparison baseline: input or base
|
|
before_rel = op.get("input") or op.get("base")
|
|
before_abs = _resolve(before_rel) if before_rel else None
|
|
|
|
if before_abs and os.path.exists(before_abs):
|
|
changed = _changed_ids(before_abs, out_abs)
|
|
else:
|
|
# No baseline — skip (nothing to compare against)
|
|
print(f" skip (no baseline): {op['output']}")
|
|
continue
|
|
|
|
if not changed:
|
|
print(f" no changes: {op['output']}")
|
|
continue
|
|
|
|
model = ifcopenshell.open(out_abs)
|
|
# Clear any existing Pset_GitCommit left from prior runs
|
|
for rel in list(model.by_type("IfcRelDefinesByProperties")):
|
|
pset = rel.RelatingPropertyDefinition
|
|
if getattr(pset, "Name", None) == "Pset_GitCommit":
|
|
for product in rel.RelatedObjects:
|
|
ifcopenshell.api.run("pset.remove_pset", model, product=product, pset=pset)
|
|
count = 0
|
|
for eid in changed:
|
|
element = model.by_id(eid)
|
|
if element is None:
|
|
continue
|
|
pset = ifcopenshell.api.run("pset.add_pset", model, product=element, name="Pset_GitCommit")
|
|
ifcopenshell.api.run("pset.edit_pset", model, pset=pset, properties={
|
|
"CommitHash": commit_hash,
|
|
"CommitMessage": commit_message,
|
|
"CommitAuthor": commit_author,
|
|
"CommitDate": commit_date,
|
|
"CommitBranch": commit_branch,
|
|
"OperationName": op["name"],
|
|
})
|
|
count += 1
|
|
model.write(out_abs)
|
|
total += count
|
|
print(f" Pset_GitCommit: {op['output']} ({count} changed product(s))")
|
|
|
|
print(f"Written : Pset_GitCommit on {total} changed product(s) across {len(ifc_ops)} file(s)")
|
|
|
|
|
|
def cmd_move(args):
|
|
"""Translate an element by (x, y, z) metres and write to a new file."""
|
|
print(f"Opening : {args.input}")
|
|
model = ifcopenshell.open(args.input)
|
|
|
|
if args.name:
|
|
matches = [
|
|
e for e in model.by_type("IfcProduct") if args.name in (e.Name or "")
|
|
]
|
|
if not matches:
|
|
print(f"No IfcProduct with name containing {args.name!r}")
|
|
return
|
|
if len(matches) > 1:
|
|
print(f"Multiple matches for {args.name!r}:")
|
|
for m in matches:
|
|
print(f" #{m.id()} {m.Name}")
|
|
return
|
|
el = matches[0]
|
|
elif args.entity_id is not None:
|
|
el = model.by_id(args.entity_id)
|
|
else:
|
|
print("No element specified (name or entity_id required)")
|
|
return
|
|
if el is None:
|
|
print(f"No element found with id={args.entity_id}")
|
|
return
|
|
print(f"Element : #{el.id()} {el.Name}")
|
|
|
|
matrix = ifcopenshell.util.placement.get_local_placement(el.ObjectPlacement)
|
|
print(
|
|
f"Before : X={matrix[0, 3]:.4f} Y={matrix[1, 3]:.4f} Z={matrix[2, 3]:.4f}"
|
|
)
|
|
|
|
matrix[0, 3] += args.x
|
|
matrix[1, 3] += args.y
|
|
matrix[2, 3] += args.z
|
|
ifcopenshell.api.run(
|
|
"geometry.edit_object_placement", model, product=el, matrix=matrix
|
|
)
|
|
|
|
after = ifcopenshell.util.placement.get_local_placement(el.ObjectPlacement)
|
|
print(f"After : X={after[0, 3]:.4f} Y={after[1, 3]:.4f} Z={after[2, 3]:.4f}")
|
|
|
|
model.write(args.output)
|
|
out_model = ifcopenshell.open(args.output)
|
|
count = sum(1 for _ in out_model.by_type("IfcProduct"))
|
|
print(f"Written : {args.output} ({count} products)")
|
|
|
|
|
|
def cmd_copy(args):
|
|
"""Copy elements by entity_ids or tags and translate copies by (x, y, z) metres."""
|
|
print(f"Opening : {args.input}")
|
|
model = ifcopenshell.open(args.input)
|
|
|
|
elements = []
|
|
if args.tags:
|
|
for tag in args.tags:
|
|
matches = []
|
|
for e in model.by_type("IfcProduct"):
|
|
try:
|
|
if e.Tag == tag:
|
|
matches.append(e)
|
|
except AttributeError:
|
|
pass
|
|
if not matches:
|
|
print(f"Warning: No element found with tag={tag}")
|
|
continue
|
|
elements.extend(matches)
|
|
elif args.entity_ids:
|
|
for eid in args.entity_ids:
|
|
el = model.by_id(eid)
|
|
if el is None:
|
|
print(f"Warning: No element found with id={eid}")
|
|
continue
|
|
elements.append(el)
|
|
else:
|
|
print("No elements specified (tags or entity_ids required)")
|
|
return
|
|
|
|
if not elements:
|
|
print("No elements found")
|
|
return
|
|
|
|
copies = []
|
|
for el in elements:
|
|
print(f"Copying : #{el.id()} {el.Name or el.is_a()}")
|
|
new_el = ifcopenshell.api.run("root.copy_class", model, product=el)
|
|
copies.append(new_el)
|
|
|
|
matrix = ifcopenshell.util.placement.get_local_placement(new_el.ObjectPlacement)
|
|
matrix[0, 3] += args.x
|
|
matrix[1, 3] += args.y
|
|
matrix[2, 3] += args.z
|
|
ifcopenshell.api.run(
|
|
"geometry.edit_object_placement", model, product=new_el, matrix=matrix
|
|
)
|
|
|
|
after = ifcopenshell.util.placement.get_local_placement(new_el.ObjectPlacement)
|
|
print(
|
|
f" After : X={after[0, 3]:.4f} Y={after[1, 3]:.4f} Z={after[2, 3]:.4f}"
|
|
)
|
|
|
|
model.write(args.output)
|
|
out_model = ifcopenshell.open(args.output)
|
|
count = sum(1 for _ in out_model.by_type("IfcProduct"))
|
|
print(f"Written : {args.output} ({count} products)")
|
|
|
|
|
|
def cmd_diff(args):
|
|
"""Show differences between two IFC files and write to output file."""
|
|
print(f"Source : {args.source}")
|
|
print(f"Target : {args.target}")
|
|
|
|
src_model = ifcopenshell.open(args.source)
|
|
tgt_model = ifcopenshell.open(args.target)
|
|
|
|
src_elements = {e.id(): e for e in src_model.by_type("IfcProduct")}
|
|
tgt_elements = {e.id(): e for e in tgt_model.by_type("IfcProduct")}
|
|
|
|
src_ids = set(src_elements.keys())
|
|
tgt_ids = set(tgt_elements.keys())
|
|
|
|
added = tgt_ids - src_ids
|
|
removed = src_ids - tgt_ids
|
|
common = src_ids & tgt_ids
|
|
|
|
lines = []
|
|
lines.append(f"Source : {args.source}")
|
|
lines.append(f"Target : {args.target}")
|
|
lines.append("")
|
|
lines.append("Summary:")
|
|
lines.append(f" Added : {len(added)} element(s)")
|
|
lines.append(f" Removed : {len(removed)} element(s)")
|
|
lines.append(f" Modified: {len(common)} element(s)")
|
|
|
|
if args.verbose and (added or removed):
|
|
lines.append("")
|
|
lines.append("Added elements:")
|
|
for eid in sorted(added):
|
|
el = tgt_elements[eid]
|
|
lines.append(f" +{eid} {el.is_a()} {el.Name or ''}")
|
|
|
|
lines.append("")
|
|
lines.append("Removed elements:")
|
|
for eid in sorted(removed):
|
|
el = src_elements[eid]
|
|
lines.append(f" -{eid} {el.is_a()} {el.Name or ''}")
|
|
|
|
if args.verbose:
|
|
lines.append("")
|
|
lines.append("Modified elements:")
|
|
for eid in sorted(common):
|
|
src_el = src_elements[eid]
|
|
tgt_el = tgt_elements[eid]
|
|
|
|
src_placement = None
|
|
tgt_placement = None
|
|
|
|
try:
|
|
src_placement = ifcopenshell.util.placement.get_local_placement(
|
|
src_el.ObjectPlacement
|
|
)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
tgt_placement = ifcopenshell.util.placement.get_local_placement(
|
|
tgt_el.ObjectPlacement
|
|
)
|
|
except:
|
|
pass
|
|
|
|
if src_placement is not None and tgt_placement is not None:
|
|
src_coords = (
|
|
src_placement[0, 3],
|
|
src_placement[1, 3],
|
|
src_placement[2, 3],
|
|
)
|
|
tgt_coords = (
|
|
tgt_placement[0, 3],
|
|
tgt_placement[1, 3],
|
|
tgt_placement[2, 3],
|
|
)
|
|
|
|
if (
|
|
abs(src_coords[0] - tgt_coords[0]) > 0.001
|
|
or abs(src_coords[1] - tgt_coords[1]) > 0.001
|
|
or abs(src_coords[2] - tgt_coords[2]) > 0.001
|
|
):
|
|
lines.append(f" #{eid} {tgt_el.is_a()} {tgt_el.Name or ''}")
|
|
lines.append(
|
|
f" X: {src_coords[0]:.4f} -> {tgt_coords[0]:.4f} (delta: {tgt_coords[0] - src_coords[0]:+.4f})"
|
|
)
|
|
lines.append(
|
|
f" Y: {src_coords[1]:.4f} -> {tgt_coords[1]:.4f} (delta: {tgt_coords[1] - src_coords[1]:+.4f})"
|
|
)
|
|
lines.append(
|
|
f" Z: {src_coords[2]:.4f} -> {tgt_coords[2]:.4f} (delta: {tgt_coords[2] - src_coords[2]:+.4f})"
|
|
)
|
|
|
|
output = "\n".join(lines)
|
|
print(output)
|
|
|
|
if args.output:
|
|
with open(args.output, "w") as f:
|
|
f.write(output + "\n")
|
|
print(f"\nWritten : {args.output}")
|
|
|
|
|
|
def cmd_remove(args):
|
|
"""Remove all elements contained within each instance of the given IFC type."""
|
|
print(f"Opening : {args.input}")
|
|
model = ifcopenshell.open(args.input)
|
|
|
|
containers = model.by_type(args.ifc_type)
|
|
if not containers:
|
|
print(f"No {args.ifc_type} found in {args.input}")
|
|
return
|
|
|
|
removed_ids = set()
|
|
total = 0
|
|
for container in containers:
|
|
name = container.Name or container.GlobalId
|
|
contained = [
|
|
el
|
|
for el in ifcopenshell.util.selector.filter_elements(
|
|
model, f'IfcElement, location = "{name}"'
|
|
)
|
|
if el.id() not in removed_ids and not el.is_a("IfcOpeningElement")
|
|
]
|
|
print(f" {args.ifc_type} {name!r}: {len(contained)} element(s) removed")
|
|
for el in contained:
|
|
removed_ids.add(el.id())
|
|
ifcopenshell.api.run("root.remove_product", model, product=el)
|
|
total += len(contained)
|
|
|
|
stem, ext = os.path.splitext(args.input)
|
|
type_safe = re.sub(r"[^\w]", "", args.ifc_type)
|
|
output = args.output or f"{stem}_{type_safe}_removed{ext}"
|
|
|
|
model.write(output)
|
|
out_model = ifcopenshell.open(output)
|
|
count = sum(1 for _ in out_model.by_type("IfcProduct"))
|
|
print(f"Removed : {total} element(s) total")
|
|
print(f"Written : {output} ({count} products)")
|
|
|
|
|
|
def cmd_run(args):
|
|
"""Fetch yaml pipelines from the demo repo, let user pick one, and execute it."""
|
|
import base64
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
from urllib.request import urlopen
|
|
from urllib.error import URLError
|
|
|
|
DEMO_REPO = "rvba/ifc-commit"
|
|
FORGE_API = "https://gitaec.org/api/v1"
|
|
|
|
# 1. List yaml files in yaml/ on the demo repo
|
|
url = f"{FORGE_API}/repos/{DEMO_REPO}/contents/yaml"
|
|
try:
|
|
with urlopen(url) as resp:
|
|
entries = json.loads(resp.read())
|
|
except URLError as exc:
|
|
print(f"Cannot reach demo repo: {exc}")
|
|
return
|
|
|
|
yaml_files = [
|
|
e
|
|
for e in entries
|
|
if e.get("type") == "file" and e["name"].endswith((".yaml", ".yml"))
|
|
]
|
|
if not yaml_files:
|
|
print("No yaml files found in yaml/ on demo repo.")
|
|
return
|
|
|
|
# 2. Let user pick
|
|
print("Pipelines available on demo repo:")
|
|
for i, f in enumerate(yaml_files, 1):
|
|
print(f" {i}. {f['name']}")
|
|
|
|
try:
|
|
choice = input("\nSelect (number): ").strip()
|
|
idx = int(choice) - 1
|
|
if not (0 <= idx < len(yaml_files)):
|
|
raise ValueError
|
|
selected = yaml_files[idx]
|
|
except (ValueError, EOFError):
|
|
print("Invalid selection.")
|
|
return
|
|
|
|
yaml_path = f"yaml/{selected['name']}"
|
|
print(f"Selected : {yaml_path}")
|
|
|
|
# 3. Fetch yaml content via API
|
|
url = f"{FORGE_API}/repos/{DEMO_REPO}/contents/{yaml_path}"
|
|
try:
|
|
with urlopen(url) as resp:
|
|
data = json.loads(resp.read())
|
|
content = base64.b64decode(data["content"]).decode()
|
|
except (URLError, KeyError) as exc:
|
|
print(f"Cannot fetch yaml: {exc}")
|
|
return
|
|
|
|
# 4. Shallow-clone demo repo into a temp dir
|
|
clone_url = f"https://gitaec.org/{DEMO_REPO}.git"
|
|
workdir = tempfile.mkdtemp(prefix="ifccommit_run_")
|
|
print(f"Cloning demo repo...")
|
|
try:
|
|
subprocess.run(
|
|
["git", "clone", "--depth=1", clone_url, workdir],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
except subprocess.CalledProcessError as exc:
|
|
print(f"Clone failed:\n{exc.stdout}")
|
|
shutil.rmtree(workdir, ignore_errors=True)
|
|
return
|
|
|
|
# 5. Parse yaml and run pipeline
|
|
import sys
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from webapp.pipeline import parse_yaml, run_pipeline
|
|
|
|
try:
|
|
config = parse_yaml(content)
|
|
except Exception as exc:
|
|
print(f"Invalid yaml: {exc}")
|
|
shutil.rmtree(workdir, ignore_errors=True)
|
|
return
|
|
|
|
print(f"Running : {selected['name']} ({len(config.operations)} operation(s))\n")
|
|
run_pipeline(config, workdir, print, yaml_path=yaml_path)
|
|
shutil.rmtree(workdir, ignore_errors=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog="ifccommit",
|
|
description="ifc-commit — slice, inspect, and compose IFC files",
|
|
)
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
p_list = sub.add_parser("list", help="List IFC types in file")
|
|
p_list.add_argument("input", help="Input IFC file")
|
|
|
|
p_info = sub.add_parser("info", help="Show element attributes for a type")
|
|
p_info.add_argument("input", help="Input IFC file")
|
|
p_info.add_argument("ifc_type", metavar="IfcType", help="e.g. IfcWall")
|
|
|
|
p_ext = sub.add_parser("extract", help="Extract elements to a new IFC file")
|
|
p_ext.add_argument("input", help="Input IFC file")
|
|
p_ext.add_argument("output", help="Output IFC file")
|
|
p_ext.add_argument(
|
|
"ifc_types",
|
|
metavar="IfcType|preset",
|
|
nargs="+",
|
|
help=f"IFC types or presets: {', '.join(PRESETS)} (e.g. walls, IfcSlab)",
|
|
)
|
|
|
|
p_ins = sub.add_parser("insert", help="Insert a part IFC file into a base IFC file")
|
|
p_ins.add_argument("base", help="Base IFC file")
|
|
p_ins.add_argument("part", help="Part IFC file to insert")
|
|
p_ins.add_argument("output", help="Output IFC file")
|
|
|
|
p_rep = sub.add_parser(
|
|
"replace", help="Remove a space from base and merge a part in its place"
|
|
)
|
|
p_rep.add_argument("base", help="Base IFC file")
|
|
p_rep.add_argument("space", help="Space Name to remove (e.g. A102)")
|
|
p_rep.add_argument("part", help="Part IFC file to merge in")
|
|
p_rep.add_argument("output", help="Output IFC file")
|
|
|
|
p_spl = sub.add_parser("split", help="Split into one IFC file per storey")
|
|
p_spl.add_argument("input", help="Input IFC file")
|
|
p_spl.add_argument("outdir", help="Output directory")
|
|
p_spl.add_argument(
|
|
"ifc_types",
|
|
metavar="IfcType|preset",
|
|
nargs="*",
|
|
help=f"Optional type filter: presets {', '.join(PRESETS)} or raw IFC types",
|
|
)
|
|
|
|
p_spc = sub.add_parser("space", help="Extract a space and all its contents")
|
|
p_spc.add_argument("input", help="Input IFC file")
|
|
p_spc.add_argument("output", help="Output IFC file")
|
|
p_spc.add_argument("name", help="Space name to match")
|
|
p_spc.add_argument(
|
|
"--by",
|
|
choices=["name", "longname"],
|
|
default="name",
|
|
help="Match against Name (default) or LongName",
|
|
)
|
|
|
|
p_his = sub.add_parser(
|
|
"history", help="Write or read Pset_GitCommit on IFC elements"
|
|
)
|
|
p_his.add_argument(
|
|
"--input", help="IFC file to read Pset_GitCommit from (read mode)"
|
|
)
|
|
p_his.add_argument(
|
|
"--output", help="Output JSON file (read mode) or unused in write mode"
|
|
)
|
|
p_his.add_argument(
|
|
"--write-psets", dest="write_psets", action="store_true",
|
|
help="Stamp Pset_GitCommit on all output IFC elements using HEAD (write mode)",
|
|
)
|
|
p_his.add_argument(
|
|
"--workdir", default=".", help="Git repository root (default: cwd)"
|
|
)
|
|
p_his.add_argument(
|
|
"--yaml",
|
|
default="yaml/duplex.yaml",
|
|
help="Repo-relative path to the pipeline yaml (write mode)",
|
|
)
|
|
|
|
p_rem = sub.add_parser(
|
|
"remove",
|
|
help="Remove all elements contained within each instance of an IFC type",
|
|
)
|
|
p_rem.add_argument("input", help="Input IFC file")
|
|
p_rem.add_argument(
|
|
"ifc_type", metavar="IfcType", help="Container type (e.g. IfcSpace)"
|
|
)
|
|
p_rem.add_argument(
|
|
"--output", help="Output IFC file (default: <input>_<IfcType>_removed.ifc)"
|
|
)
|
|
|
|
sub.add_parser("run", help="Browse and run a pipeline yaml from the demo repo")
|
|
|
|
p_mov = sub.add_parser("move", help="Translate an element by (x, y, z) metres")
|
|
p_mov.add_argument("input", help="Input IFC file")
|
|
p_mov.add_argument("output", help="Output IFC file")
|
|
p_mov.add_argument("--name", help="Find element by name substring")
|
|
p_mov.add_argument(
|
|
"entity_id", type=int, nargs="?", help="IFC entity id (e.g. 17902)"
|
|
)
|
|
p_mov.add_argument("--x", type=float, default=0.0, help="X offset in metres")
|
|
p_mov.add_argument("--y", type=float, default=0.0, help="Y offset in metres")
|
|
p_mov.add_argument("--z", type=float, default=0.0, help="Z offset in metres")
|
|
|
|
p_cpy = sub.add_parser(
|
|
"copy",
|
|
help="Copy elements by ids or tags and translate copies by (x, y, z) metres",
|
|
)
|
|
p_cpy.add_argument("input", help="Input IFC file")
|
|
p_cpy.add_argument("output", help="Output IFC file")
|
|
p_cpy.add_argument(
|
|
"--entity-ids",
|
|
type=int,
|
|
nargs="+",
|
|
help="IFC entity ids to copy (e.g. 17902 17903)",
|
|
)
|
|
p_cpy.add_argument(
|
|
"--tags",
|
|
type=str,
|
|
nargs="+",
|
|
help="IFC element tags to copy (e.g. ABC123 DEF456)",
|
|
)
|
|
p_cpy.add_argument("--x", type=float, default=0.0, help="X offset in metres")
|
|
p_cpy.add_argument("--y", type=float, default=0.0, help="Y offset in metres")
|
|
p_cpy.add_argument("--z", type=float, default=0.0, help="Z offset in metres")
|
|
|
|
p_diff = sub.add_parser("diff", help="Show differences between two IFC files")
|
|
p_diff.add_argument("source", help="Source IFC file")
|
|
p_diff.add_argument("target", help="Target IFC file to compare against")
|
|
p_diff.add_argument(
|
|
"--verbose", "-v", action="store_true", help="Show detailed changes"
|
|
)
|
|
p_diff.add_argument("--output", "-o", help="Output file to write diff results")
|
|
|
|
args = parser.parse_args()
|
|
|
|
dispatch = {
|
|
"list": cmd_list,
|
|
"info": cmd_info,
|
|
"extract": cmd_extract,
|
|
"insert": cmd_insert,
|
|
"replace": cmd_replace,
|
|
"split": cmd_split,
|
|
"space": cmd_space,
|
|
"move": cmd_move,
|
|
"copy": cmd_copy,
|
|
"diff": cmd_diff,
|
|
"history": cmd_history,
|
|
"remove": cmd_remove,
|
|
"run": cmd_run,
|
|
}
|
|
dispatch[args.command](args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|