import json
import os
import pathlib
import ase.io
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import zntrack
from ase import Atom, Atoms
from ase.build import molecule, surface
from ase.constraints import FixAtoms, FixedLine
from ase.data import atomic_numbers, covalent_radii
from ase.geometry import get_distances
from ase.io import read, write
from ase.optimize import BFGS
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
# from copy import deepcopy
from tqdm import tqdm
from mlipx.abc import ComparisonResults, NodeWithCalculator
def gas_phase(mol, vacuum=10):
return molecule(mol, vacuum=vacuum)
def gen_surf(atoms, miller=(1, 1, 1), layers=8, vacuum=10, supercell=[2, 2, 1]):
surf = surface(atoms, miller, layers, vacuum=vacuum, periodic=True)
surf.center(vacuum=vacuum, axis=2)
return surf.repeat(supercell)
def drop_atom(current_pos, atoms, R):
while True:
distance_matrix = get_distances(
current_pos, atoms.positions, cell=atoms.cell, pbc=True
)[1]
drop = distance_matrix.min() / (4 * R.max())
n_top_covalent_bonds = np.sum(distance_matrix < R)
n_valley_covalent_bonds = np.sum(distance_matrix < R * 1.2)
if n_top_covalent_bonds > 0 or n_valley_covalent_bonds > 1:
break
current_pos -= np.array([0, 0, drop])
return current_pos
def grid_placement(atoms, atom="C", grid_step=0.4):
_atoms = atoms.copy()
cell = _atoms.cell
x = np.arange(0, cell[0, 0], grid_step)
y = np.arange(0, cell[1, 1], grid_step)
height = cell[2, 2]
R_atom = covalent_radii[atomic_numbers[atom]]
R = np.array(
[covalent_radii[atomic_numbers[symbol]] + R_atom for symbol in _atoms.symbols]
)
pos = []
for i in x:
for j in y:
current_pos = np.array(
[
i + j * (cell[1, 0] / cell[1, 1]),
j + i * (cell[0, 1] / cell[0, 0]),
height,
]
)
current_pos = drop_atom(current_pos, _atoms, R)
pos.append(current_pos)
_atoms += Atoms(symbols=["X"] * len(pos), positions=pos)
return pos, _atoms
def relocate_atom(atoms, idx):
atoms = atoms.copy()
_atoms = atoms.copy()
_atoms.set_constraint()
del _atoms[idx]
atom = atoms[idx]
current_pos = atom.position
R_atom = covalent_radii[atomic_numbers[atom.symbol]]
R = np.array(
[covalent_radii[atomic_numbers[symbol]] + R_atom for symbol in _atoms.symbols]
)
current_pos = drop_atom(current_pos, _atoms, R)
atoms[idx].position = current_pos
return atoms
def gen_images(atoms, pos, mol="C"):
traj = []
for xyz in pos:
_atoms = atoms.copy()
if mol == "CO":
_atoms += Atoms(
symbols=["C", "O"], positions=[xyz, xyz + np.array([0, 0, 1.2])]
)
elif mol == "COH":
_atoms += Atoms(
symbols=["C", "O", "H"],
positions=[
xyz,
xyz + np.array([0, 0, 1.2]),
xyz + np.array([0, 0.8, 2.0]),
],
)
elif mol == "OH":
_atoms += Atoms(
symbols=["O", "H"], positions=[xyz, xyz + np.array([0, 0.8, 0.8])]
)
else:
_atoms.append(Atom(mol, position=xyz))
traj.append(_atoms)
return traj
def combine_constraints(atoms, index_from_last=0, nats_mol=1, freeze_ratio=0.5):
idx = np.argsort(atoms.positions[:, 2])[
: int((len(atoms) - nats_mol) * freeze_ratio)
]
fa = FixAtoms(indices=idx)
fl = FixedLine(indices=len(atoms) - index_from_last - 1, direction=[0, 0, 1])
atoms.set_constraint([fa, fl])
return atoms
def merge_traj(atoms1, atoms2):
atoms = atoms1.copy()
to_attach = atoms2.copy()
to_attach.positions += 2 * to_attach.cell[1]
atoms += to_attach
return atoms
def calculate(
traj, generic_calculator, node_path, index_from_last=0, nats_mol=1, freeze_ratio=0.5
):
results = []
for image in tqdm(traj, desc="calculation progress"):
_image = image.copy()
_image = combine_constraints(
_image,
index_from_last=index_from_last,
nats_mol=nats_mol,
freeze_ratio=freeze_ratio,
)
calc = generic_calculator.get_calculator()
_image.calc = calc
dyn = BFGS(
_image,
trajectory=f"{node_path}/current_relax.traj",
logfile=f"{node_path}/current_relax.log",
)
dyn.run(fmax=0.01, steps=200)
_image.get_potential_energy()
_image.get_forces()
results.append(_image)
return results
def pre_optimize(atoms, calc, path, relax_path, nats_mol=0, freeze_ratio=0.5):
filename = f"{path}.traj"
relax_name = f"{relax_path}_relaxation"
if os.path.exists(filename):
_atoms = read(filename)
else:
_atoms = atoms.copy()
if freeze_ratio:
idx = np.argsort(atoms.positions[:, 2])[
: int((len(atoms) - nats_mol) * freeze_ratio)
]
fa = FixAtoms(indices=idx)
_atoms.set_constraint([fa])
else:
_atoms.set_constraint()
_atoms = property_implemented(_atoms, calc)
dyn = BFGS(_atoms, trajectory=f"{relax_name}.traj", logfile=f"{relax_name}.log")
dyn.run(fmax=0.01, steps=200)
_atoms.write(filename)
return _atoms
def property_implemented(atoms, generic_calculator):
calc = atoms.calc
if (
calc
and not calc.calculation_required(atoms, ["energy"])
and not calc.calculation_required(atoms, ["forces"])
):
atoms.get_potential_energy()
atoms.get_forces()
print("calculator already implemented")
else:
try:
calc = generic_calculator.get_calculator()
print("got calculator from get_calculator() method")
except Exception:
calc = generic_calculator
print("got calculator from generic_calculator")
atoms.calc = calc
atoms.get_potential_energy()
atoms.get_forces()
return atoms
def individual_run(
traj,
model,
generic_calculator,
node_path,
basename,
mol,
freeze_ratio=0.5,
index_from_last=0,
nats_mol=1,
el="",
):
print(f"running for {model} {el}")
print(f"......{mol}")
if os.path.exists(f"{node_path}/{mol}_{basename}_{model}_{el}.traj"):
results = read(f"{node_path}/{mol}_{basename}_{model}_{el}.traj", index=":")
for idx, atoms in enumerate(results):
atoms = property_implemented(atoms, generic_calculator)
results[idx] = atoms
else:
results = calculate(
traj,
generic_calculator,
node_path,
index_from_last=index_from_last,
nats_mol=nats_mol,
freeze_ratio=freeze_ratio,
)
write(f"{node_path}/{mol}_{basename}_{model}_{el}.traj", results)
return results
def heatmap(model, Z, x1, x2, el, xrange, yrange):
fig, ax = plt.subplots(figsize=(8, 6))
heatmap = ax.imshow(
Z,
extent=(xrange[0], xrange[1], yrange[0], yrange[1]),
origin="lower",
cmap="viridis",
alpha=0.7,
)
# sns.heatmap(Z, cmap='viridis', alpha=0.7, ax=ax)
ax.scatter(x1, x2, s=100, edgecolor="black")
texts = []
for i, label in enumerate(el):
sign = 0
y_offset = -15
if label in ["Ni", "Co"]:
sign = 1
if label in ["Rh", "Ir"]:
sign = -1
if label in ["Rh", "Ni"]:
y_offset = 5
if label == "Pt":
y_offset = 0
sign = 3
text = ax.annotate(
label,
(x1[i], x2[i]),
textcoords="offset points",
xytext=(2.5 * sign, y_offset),
ha="center",
fontsize=10,
color="black",
)
texts.append(text)
ax.set_title(f"CO splitting on 211-surf - {model}")
ax.set_xlabel("C* (eV)")
ax.set_ylabel("O* (eV)")
ax.set_xlim(xrange)
ax.set_ylim(yrange)
ax.grid(False)
plt.colorbar(heatmap, ax=ax, label="barrier CO* + H* -> C*+OH* (eV)")
plt.savefig(f"{model}_heatmap.png", dpi=300)
[docs]
class COSplitting(zntrack.Node):
"""
CO Splitting Node
This node performs CO splitting analysis on the provided bulk structures.
Parameters
----------
data : list[ase.Atoms]
List of images to perform CO splitting analysis on.
model : NodeWithCalculator
Model node with calculator to perform CO splitting analysis.
miller : list[int], default=[2,1,1]
Miller index of the surface to be generated.
supercell : list[int], default=[2,2,1]
Supercell in a list format.
layers : int, default=8
Number of layers.
vacuum : float, default=10.0
Vacuum size in Angst.
grid_step : float, default=0.3
Step size of the grid for the placement of intermediates.
freeze_ratio: float, default=0.5
Ratio of bottom layers to be frozen for relaxations.
frames_path : pathlib.Path
Path to save frames.
node_path : pathlib.Path
Path to save additional information such as cache and heatmaps.
Attributes
----------
results : pd.DataFrame
Results of CO splitting analysis.
"""
data: list[ase.Atoms] = zntrack.deps()
model: NodeWithCalculator = zntrack.deps()
# adding more parameters
miller: list[int] = zntrack.params(None)
supercell: list[int] = zntrack.params(None)
layers: int = zntrack.params(8)
vacuum: float = zntrack.params(10.0)
grid_step: float = zntrack.params(3.0)
freeze_ratio: float = zntrack.params(0.5)
node_path: pathlib.Path = zntrack.outs_path(zntrack.nwd / "outputs")
frames_path: str = zntrack.outs_path(zntrack.nwd / "frames.xyz")
results: pd.DataFrame = zntrack.plots(y="dE", x="Formula")
def run(self): # noqa C901
model = "model"
if not self.miller:
self.miller = [2, 1, 1]
if not self.supercell:
self.supercell = [2, 2, 1]
# RPBE (C, O) referenced to CH4, H2O --> doi.org/10.1007/s11244-013-0169-0
reference_data = {
"Rh": [1.41, 0.21],
"Cu": [3.53, 1.02],
"Co": [1.70, -0.11],
"Fe": [1.28, -0.8],
"Ni": [1.53, 0.17],
"Pt": [2.12, 1.30],
"Ru": [1.26, -0.08],
"Ir": [1.60, -0.08],
"Pd": [1.53, 1.52],
"Au": [4.75, 2.63],
"Ag": [5.05, 1.90],
}
x1 = [reference_data[el][0] for el in reference_data]
x2 = [reference_data[el][1] for el in reference_data]
el = list(reference_data)
# LSR obtained in doi.org/10.1007/s11244-013-0169-0
def reaction(c, o):
return 0.34 * c + 0.552 * o - 0.40
def barrier_bep(reaction_step):
return 1.12 * reaction_step + 1.20
xrange = [-1.1, 6.1]
yrange = [-1.1, 6.1]
xgrid = np.linspace(xrange[0], xrange[1], 100)
ygrid = np.linspace(yrange[0], yrange[1], 100)
Xgrid, Ygrid = np.meshgrid(xgrid, ygrid)
Zref = barrier_bep(reaction(Xgrid, Ygrid))
heatmap_path = self.node_path / "heatmaps"
cache_relaxations_path = self.node_path / "cached_relaxations"
heatmap_path.mkdir(parents=True, exist_ok=True)
cache_relaxations_path.mkdir(parents=True, exist_ok=True)
heatmap(heatmap_path / "RPBE", Zref, x1, x2, el, xrange, yrange)
mols = {}
surfs = {}
model_results = {}
traj_dict = {}
surfs[model] = {}
mols[model] = {}
mols[model]["CH4"] = pre_optimize(
gas_phase("CH4", vacuum=self.vacuum),
self.model.get_calculator(),
self.node_path / f"{model}_CH4_gas",
cache_relaxations_path / f"{model}_CH4_gas",
freeze_ratio=0,
).get_potential_energy()
mols[model]["H2O"] = pre_optimize(
gas_phase("H2O", vacuum=self.vacuum),
self.model.get_calculator(),
self.node_path / f"{model}_H2O_gas",
cache_relaxations_path / f"{model}_H2O_gas",
freeze_ratio=0,
).get_potential_energy()
mols[model]["H2"] = pre_optimize(
gas_phase("H2", vacuum=self.vacuum),
self.model.get_calculator(),
self.node_path / f"{model}_H2_gas",
cache_relaxations_path / f"{model}_H2_gas",
freeze_ratio=0,
).get_potential_energy()
for current_frame, bulk in tqdm(enumerate(self.data)):
formula = bulk.get_chemical_formula(empirical=True)
surf = gen_surf(
bulk,
miller=self.miller,
layers=self.layers,
vacuum=self.vacuum,
supercell=self.supercell,
)
calc = self.model.get_calculator()
surf = pre_optimize(
surf,
calc,
self.node_path / f"{model}_{formula}_clean_surface",
cache_relaxations_path / f"{model}_{formula}_clean_surface",
freeze_ratio=self.freeze_ratio,
)
surfs[model][formula] = surf
model_results[model] = {}
traj_dict[model] = {}
x1, x2, _formula, y = [], [], [], []
for formula, surf in surfs[model].items():
model_results[model][formula] = {}
pos, _atoms = grid_placement(surf, atom="C", grid_step=self.grid_step)
co_traj = gen_images(surf, pos, mol="CO")
results = individual_run(
co_traj,
model,
self.model,
self.node_path,
"",
freeze_ratio=self.freeze_ratio,
mol="CO",
index_from_last=1,
nats_mol=2,
el=formula,
)
co_energies = [_image.get_potential_energy() for _image in results]
np.min(co_energies)
_co_bound_surf = results[np.argmin(co_energies)]
_c_bound_surf = _co_bound_surf[:-1]
c_bound_surf = relocate_atom(_c_bound_surf, idx=len(_c_bound_surf) - 1)
_co_bound_surf = c_bound_surf + Atom(
symbol="O", position=c_bound_surf[-1].position + (0, 0, 1.2)
)
co_bound_surf = pre_optimize(
_co_bound_surf,
self.model.get_calculator(),
self.node_path / f"{model}_{formula}_CO_final",
cache_relaxations_path / f"{model}_{formula}_CO_final",
nats_mol=2,
freeze_ratio=self.freeze_ratio,
)
co_bound_surf.get_potential_energy()
_c_temp = co_bound_surf[:-1]
_c_temp_2 = relocate_atom(_c_temp, idx=len(_c_temp) - 1)
co_bound_surf = _c_temp_2 + Atom(
symbol="O", position=_c_temp_2[-1].position + (0, 0, 1.2)
)
pos, _atoms = grid_placement(
co_bound_surf, atom="H", grid_step=self.grid_step
)
h_traj = gen_images(co_bound_surf, pos, mol="H")
results = individual_run(
h_traj,
model,
self.model,
self.node_path,
"",
freeze_ratio=self.freeze_ratio,
mol="H",
index_from_last=0,
nats_mol=3,
el=formula,
)
co_h_energies = [_image.get_potential_energy() for _image in results]
_co_h_bound_surf = results[np.argmin(co_h_energies)]
co_h_bound_surf = pre_optimize(
_co_h_bound_surf,
self.model.get_calculator(),
self.node_path / f"{model}_{formula}_CO_H_final",
cache_relaxations_path / f"{model}_{formula}_CO_H_final",
nats_mol=3,
freeze_ratio=self.freeze_ratio,
)
co_h_en = co_h_bound_surf.get_potential_energy()
c_bound_surf = co_h_bound_surf[:-2]
c_bound_surf = relocate_atom(c_bound_surf, idx=len(c_bound_surf) - 1)
pos, _atoms = grid_placement(
c_bound_surf, atom="O", grid_step=self.grid_step
)
oh_traj = gen_images(c_bound_surf, pos, mol="OH")
results = individual_run(
oh_traj,
model,
self.model,
self.node_path,
"",
freeze_ratio=self.freeze_ratio,
mol="OH",
index_from_last=1,
nats_mol=3,
el=formula,
)
c_oh_energies = []
for _image in results:
distance = get_distances(
_image[-3].position, _image[-2].position, cell=_image.cell, pbc=True
)[1][0][0]
if distance > 2:
c_oh_energies.append(_image.get_potential_energy())
else:
c_oh_energies.append(np.inf)
_c_oh_bound_surf = results[np.argmin(c_oh_energies)]
_c_o_temp = _c_oh_bound_surf[:-1]
_c_o_temp_2 = relocate_atom(_c_o_temp, idx=len(_c_o_temp) - 1)
_c_oh_bound_surf = _c_o_temp_2 + Atom(
symbol="H", position=_c_o_temp_2[-1].position + (0, 0.8, 0.8)
)
c_oh_bound_surf = pre_optimize(
_c_oh_bound_surf,
self.model.get_calculator(),
self.node_path / f"{model}_{formula}_C_OH_final",
cache_relaxations_path / f"{model}_{formula}_C_OH_final",
nats_mol=3,
freeze_ratio=self.freeze_ratio,
)
to_append = merge_traj(co_h_bound_surf, c_oh_bound_surf)
traj_dict[model][formula] = to_append
c_oh_en = c_oh_bound_surf.get_potential_energy()
distance = get_distances(
c_oh_bound_surf[-3].position,
c_oh_bound_surf[-2].position,
cell=c_oh_bound_surf.cell,
pbc=True,
)[1][0][0]
if distance > 2:
bond_broken = True
else:
bond_broken = False
pos, _atoms = grid_placement(surf, atom="C", grid_step=self.grid_step)
c_traj = gen_images(surf, pos, mol="C")
results = individual_run(
c_traj,
model,
self.model,
self.node_path,
"",
freeze_ratio=self.freeze_ratio,
mol="C",
index_from_last=0,
nats_mol=1,
el=formula,
)
_c_bound_surf = results[
np.argmin([_image.get_potential_energy() for _image in results])
]
_c_bound_surf = relocate_atom(_c_bound_surf, idx=len(_c_bound_surf) - 1)
c_bound_surf = pre_optimize(
_c_bound_surf,
self.model.get_calculator(),
self.node_path / f"{model}_{formula}_C_final",
cache_relaxations_path / f"{model}_{formula}_C_final",
nats_mol=1,
freeze_ratio=self.freeze_ratio,
)
c_en = c_bound_surf.get_potential_energy()
pos, _atoms = grid_placement(surf, atom="O", grid_step=self.grid_step)
o_traj = gen_images(surf, pos, mol="O")
results = individual_run(
o_traj,
model,
self.model,
self.node_path,
"",
freeze_ratio=self.freeze_ratio,
mol="O",
index_from_last=0,
nats_mol=1,
el=formula,
)
_o_bound_surf = results[
np.argmin([_image.get_potential_energy() for _image in results])
]
_o_bound_surf = relocate_atom(_o_bound_surf, idx=len(_o_bound_surf) - 1)
o_bound_surf = pre_optimize(
_o_bound_surf,
self.model.get_calculator(),
self.node_path / f"{model}_{formula}_O_final",
cache_relaxations_path / f"{model}_{formula}_O_final",
nats_mol=1,
freeze_ratio=self.freeze_ratio,
)
o_en = o_bound_surf.get_potential_energy()
model_results[model][formula]["CO*+H*->C*+OH*"] = float(c_oh_en - co_h_en)
model_results[model][formula]["barrier(from bep rpbe)"] = float(
barrier_bep(c_oh_en - co_h_en)
)
model_results[model][formula]["CH4->C*+2H2"] = float(
c_en
- (
surf.get_potential_energy()
+ mols[model]["CH4"]
- 2 * mols[model]["H2"]
)
)
model_results[model][formula]["H2O->O*+H2"] = float(
o_en
- (surf.get_potential_energy() + mols[model]["H2O"] - mols[model]["H2"])
)
model_results[model][formula]["bond_broken"] = bond_broken
x1.append(
c_en
- (
surf.get_potential_energy()
+ mols[model]["CH4"]
- 2 * mols[model]["H2"]
)
)
x2.append(
o_en
- (surf.get_potential_energy() + mols[model]["H2O"] - mols[model]["H2"])
)
_formula.append(formula)
y.append(c_oh_en - co_h_en)
data = {"x1": x1, "x2": x2, "el": _formula, "y": y}
df = pd.DataFrame(data)
X = df[["x1", "x2"]]
y = df["y"]
LR = LinearRegression()
LR.fit(X, y)
y_pred = LR.predict(X)
r_squared = float(r2_score(y, y_pred))
rmse = float(np.sqrt(mean_squared_error(y, y_pred)))
m1 = float(LR.coef_[0])
m2 = float(LR.coef_[1])
b = float(LR.intercept_)
model_results[model]["interpolation"] = {
"formula": f"co_h->c_oh = {m1:.2f} * C + {m2:.2f} * O + {b:.2f}",
"m1": m1,
"m2": m2,
"b": b,
"r2": r_squared,
"rmse": rmse,
}
def f(c, o):
return m1 * c + m2 * o + b
Z = barrier_bep(f(Xgrid, Ygrid))
heatmap(heatmap_path / model, Z, x1, x2, _formula, xrange, yrange)
with open(self.node_path / "full_data.json", "w") as json_file:
json.dump(model_results, json_file, indent=4)
results = []
line = model_results[model]
for el in line:
if el != "interpolation":
results.append(
{"Formula": el, "dE": model_results[model][el]["CO*+H*->C*+OH*"]}
)
ase.io.write(self.frames_path, traj_dict[model][el], append=True)
self.results = pd.DataFrame(results)
@property
def frames(self) -> list[ase.Atoms]:
with self.state.fs.open(self.frames_path, "r") as f:
return list(ase.io.iread(f, format="extxyz"))
@property
def figures(self) -> dict[str, go.Figure]:
fig = px.line(self.results, x="Formula", y="dE", markers=True)
fig.update_layout(
title="Reaction Energy for H-assisted CO Splitting",
xaxis_title="Formula",
yaxis_title="dE / eV",
)
fig.update_traces(customdata=np.stack([np.arange(len(self.results))], axis=-1))
return {"ReactionEnergy": fig}
@staticmethod
def compare(*nodes: "COSplitting") -> ComparisonResults:
frames = sum([node.frames for node in nodes], [])
offset = 0
fig = go.Figure() # px.scatter()
for i, node in enumerate(nodes):
trajectory = node.data.copy()
fig.add_trace(
go.Bar(
x=node.results["Formula"],
y=node.results["dE"],
name=node.name,
)
)
fig.add_trace(
go.Scatter(
x=node.results["Formula"],
y=node.results["dE"],
mode="markers",
name=node.name,
marker={"color": "red"},
showlegend=False,
customdata=np.stack(
[np.arange(len(node.results["dE"])) + offset], axis=1
),
)
)
offset += len(node.results["dE"])
results = []
for atoms in trajectory:
formula = atoms.get_chemical_formula(empirical=True)
if "dE" in atoms.info:
results.append({"Formula": formula, "dE": atoms.info["dE"]})
if results:
results = pd.DataFrame(results)
fig.add_trace(
go.Bar(
x=results["Formula"],
y=results["dE"],
name="reference",
)
)
fig.update_layout(
title="Comparison of Reaction Energies for H-assisted CO Splitting",
xaxis_title="Formula",
yaxis_title="dE / eV",
barmode="group",
scattermode="group",
plot_bgcolor="rgba(0, 0, 0, 0)",
paper_bgcolor="rgba(0, 0, 0, 0)",
)
fig.update_xaxes(
showgrid=True,
gridwidth=1,
gridcolor="rgba(120, 120, 120, 0.3)",
zeroline=False,
)
fig.update_yaxes(
showgrid=True,
gridwidth=1,
gridcolor="rgba(120, 120, 120, 0.3)",
zeroline=False,
)
return ComparisonResults(frames=frames, figures={"Reaction-Comparison": fig})