# Tools to manage metadata of NR simulation from various catalogs
# Loads metadata files and store a large list of dictionaries, one per simulation
#
# SB 2023.05.24 (started)
import sys, os, re, datetime
import glob, itertools
import json, csv
import logging
import numpy as np
import math
import subprocess
# ---------------------
# Basic unit conversion
# ---------------------
[docs]
ufact = {
"Msun_sec": 4.925794970773135e-06,
"Msun_meter": 1.476625061404649406193430731479084713e3,
}
[docs]
class geom:
"""
Geometric units + Msun = 1
"""
[docs]
class cgs:
"""
CGS units
"""
[docs]
grav_constant = 6.673e-8
[docs]
light_speed = 29979245800.0
[docs]
solar_mass = 1.988409902147041637325262574352366540e33
[docs]
class metric:
"""
Standard SI units
"""
[docs]
grav_constant = 6.673e-11
[docs]
light_speed = 299792458.0
[docs]
solar_mass = 1.988409902147041637325262574352366540e30
# --------------
# bash interface
# --------------
[docs]
def runcmd(cmd, workdir, out=None):
"""
Execute cmd in workdir
"""
base = os.getcwd()
os.makedirs(workdir, exist_ok=True)
os.chdir(workdir)
os.system(cmd)
os.chdir(base)
return
# ------------------
# ADM to EOB trafo
# ------------------
# TODO: test this
[docs]
def adm_to_eob(q_vec, p_vec, nu):
"""
Convert q_ADM, p_ADM to EOB, 2PN trafo.
(E18) and (E19) of https://arxiv.org/pdf/1210.2834.pdf
q_vec: ADM puncture relative distance, x_p - x_m, cartesian
p_vec: ADM puncture relative linear momentum, p_p, cartesian, nu-normalized
nu : symmetric mass ratio
"""
# shorthands
q_vec = np.array(q_vec)
p_vec = np.array(p_vec)
q2 = np.dot(q_vec, q_vec) # x, y
q = np.sqrt(q2)
q3 = q * q2
p2 = np.dot(p_vec, p_vec)
p = np.sqrt(p2)
p3 = p * p2
p4 = p * p3
qdotp = np.dot(q_vec, p_vec)
qdotp2 = qdotp * qdotp
nu2 = nu * nu
# coefficients for coordinates
cQ_1PN_q = -nu / 2 * p2 + 1 / q * (1 + nu / 2)
cQ_1PN_p = -qdotp * nu
cQ_2PN_q = (
nu / 8 * (1 - nu) * p4
+ nu / 4 * (5 - nu / 2) * p2 / q
+ nu * (1 + nu / 8) * qdotp2 / q3
+ 1 / 4 * (1 - 7 * nu + nu2) / q2
)
cQ_2PN_p = qdotp * (nu / 2 * (1 + nu) * p2 + 3 / 2 * nu * (1 - nu / 2) / q)
# coefficients for momenta
cP_1PN_q = qdotp / q3 * (1 + nu / 2)
cP_1PN_p = nu / 2 * p2 - 1 / q * (1 + nu / 2)
cP_2PN_q = (
qdotp
/ q3
* (
nu / 8 * (10 - nu) * p2
+ 3 / 8 * nu * (8 + 3 * nu) * qdotp2 / q2
+ 1 / 4 * (-2 - 18 * nu + nu2) / q
)
)
cP_2PN_p = (
nu / 8 * (-1 + 3 * nu) * p4
- 3 / 4 * nu * (3 + nu / 2) * p2 / q
- nu / 8 * (16 + 5 * nu) * qdotp2 / q3
+ 1 / 4 * (3 + 11 * nu) / q2
)
# Put all together
Q_vec = (
q_vec
+ cQ_1PN_q * q_vec
+ cQ_1PN_p * p_vec
+ cQ_2PN_q * q_vec
+ cQ_2PN_p * p_vec
)
P_vec = (
p_vec
+ cP_1PN_q * q_vec
+ cP_1PN_p * p_vec
+ cP_2PN_q * q_vec
+ cP_2PN_p * p_vec
)
return Q_vec, P_vec
[docs]
def polar_to_cartesian(q_p, p_p):
"""
q_p : [r , phi ]
p_p : [pr, pphi]
"""
# unpack
r, phi = q_p[0], q_p[1]
pr, pphi = p_p[0], p_p[1]
# coordinates
x = r * np.cos(phi)
y = r * np.sin(phi)
q_c = np.array([x, y])
# momenta
px = -y / r**2 * pphi + x / r * pr
py = x / r**2 * pphi + y / r * pr
p_c = np.array([px, py])
return q_c, p_c
[docs]
def cartesian_to_polar(q_c, p_c):
"""
q_c : [x , y ]
p_c : [px, py]
"""
# unpack
x, y = q_c[0], q_c[1]
px, py = p_c[0], p_c[1]
# coordinates
r = np.sqrt(x**2 + y**2)
phi = math.atan2(y, x) if x >= 0 else math.atan2(y, x) + math.pi
q_p = np.array([r, phi])
# momenta
pphi = x * py - y * px
p = np.sqrt(py**2 + px**2)
arg = p**2 - (pphi / r) ** 2
if (abs(arg)) < 1e-15:
pr = 0
else:
pr = -np.sqrt(p**2 - (pphi / r) ** 2)
p_p = np.array([pr, pphi])
return q_p, p_p
# ---------------------
# Simulation class
# ---------------------
[docs]
KEYMAP = {
"m1": ["m1", "M1_i", "m_plus", "initial-mass1", "initial_mass1"],
"m2": ["m2", "M2_i", "m_minus", "initial-mass2", "initial_mass2"],
"q": ["q", "mass-ratio", "initial_mass_ratio"],
"chi1z": ["chi1z", "s1z", "initial-bh-chi1z", "initial_dimensionless_spin1"],
"chi2z": ["chi2z", "s2z", "initial-bh-chi2z", "initial_dimensionless_spin2"],
"e": ["e", "eccentricity"],
"Mf": ["Mf", "M_f", "final-mass", "remnant_mass"],
"af": ["af", "a_f", "final-chi", "remnant_dimensionless_spin"],
"Jf": ["Jf", "J_f"], # = af/Mf**2
"E0": ["E0", "E_adm", "initial-ADM-energy", "initial_ADM_energy"],
"J0": [
"J0",
"J_adm",
"initial-ADM-angular-momentum-z",
"initial_ADM_angular_momentum",
],
"r0": ["r0", "initial-separation", "initial_separation"],
"f0": ["f0", "initial_orbital_frequency"],
"fgw0": ["fgw0", "freq-start-22"],
}
[docs]
class Simulations:
"""
Class representing simulation data
path : path to the metadata
search is recursive from this level
metadata_ext : extension(s) of metadata files
comments_ignore : list of characters to ignore
keymap : dictionary to map key names
"""
def __init__(
self,
path="./",
metadata_ext=METADATA_EXTENSION,
comments_ignore=COMMENTS_IGNORE,
keymap=KEYMAP,
sxs_unpack=True,
rit_ids_unpack=False,
):
[docs]
self.files = list(
itertools.chain.from_iterable(
[glob.glob(path + "/**/*" + x, recursive=True) for x in metadata_ext]
)
)
if rit_ids_unpack:
# read the big ID file only once
d_ics = self.rit_tex_read_punctures_id(path + "/rit_id_table.tex")
for f in self.files:
basename, ext = os.path.splitext(f)
catalog = basename.split("/")[1]
if ext == ".txt":
d = self.read_txt(f)
elif ext == ".json":
d = self.read_json(f)
else:
logging.warning(
f"Skip file {f}, do not know how to read data from {ext}"
)
continue
if not d:
continue # dict is empty, do not store
if sxs_unpack and catalog == "SXS":
# We read the large json files:
# extract each simulation and append!
self.sxs_json_extract_append(d, catalog, f)
else:
d["CATALOG"] = catalog
d["METADATA-FILE"] = f
self.data.append(d)
if rit_ids_unpack and catalog == "RIT":
self.rit_extract_punctures_id(d, d_ics)
logging.info(f"Read file {f} from catalog {catalog}")
self.parse_metadata(keymap=keymap)
[docs]
def info(self):
catalog = [d["CATALOG"] for d in self.data]
logging.info("CATALOG : DATASETS")
for c in set(catalog):
logging.info(f"{c} : {catalog.count(c)}")
[docs]
def read_txt(self, fname):
"""
Read metadata from a txt file
"""
data = {}
if os.path.isfile(fname):
with open(fname, "r") as f:
lines = f.readlines()
for line in lines:
line = line.replace(" ", "")
if line[0] in self.comments_ignore:
continue
line = line.rstrip("\n")
kv = line.split("=")
if len(kv) > 1:
data[kv[0]] = kv[1]
return data
[docs]
def read_json(self, fname):
"""
Read metadata from a json file
"""
with open(fname) as f:
return json.load(f)
[docs]
def rit_tex_read_punctures_id(self, fname):
"""
Special method to read punctures ID in ADM coordinates
from `rit_id_table.txt`
Headers:
Run, x_1/m, x_2/m, P_r/m, P_t/m, m^p_1/m, m^p_2/m, |S_1/m^2|, |S_2/m^2|, m^H_1/m, m^H_2/m, M_{\rm ADM}/m,|a_1/m_1^H|,|a_2/m_2^H|
or
Run, x_1/m, x_2/m, P_r/m, P_t/m, m^p_1/m, m^p_2/m, m^H_1/m, m^H_2/m, M_{\rm ADM}/m, e, N_{orb}
"""
# read the ID file
d = {}
if os.path.isfile(fname):
with open(fname, "r") as f:
lines = f.readlines()
for line in lines:
if line[:3] != "RIT":
continue
line = line.rstrip("\n").rstrip("\\")
ids = line.split("&")
if (len(ids) != 14) and (len(ids) != 12):
continue
tag = ids[0].rstrip()
ics = self.cast_to_float(ids[1:], ReturnLast=False)
d[tag] = ics
logging.info(f"Read RIT punctures ID from {fname}")
return d
# ...
#
[docs]
def cast_to_float(self, x, ReturnLast=True):
if type(x) is list:
try:
x = [float(e) for e in x]
if ReturnLast:
return x[-1]
else:
return x
except:
return []
else:
try:
return float(x)
except:
return []
[docs]
def remove_empty_vals(self):
return [d for d in self.data if d]
[docs]
def search(self, key, val, bound=0.0):
"""
Return list of simulations matching
a particular key-value pair
"""
s = []
if bound > 0.0:
s = [
self.data[i]
for i, item in enumerate(self.data)
if abs(item[key] - val) < bound
]
else:
s = [self.data[i] for i, item in enumerate(self.data) if item[key] == val]
return s
[docs]
def q_to_nu(self, q):
return q / (1 + q) ** 2
# TODO EOB-ADM trafo
[docs]
def ADM_to_EOB(self, nu, q_adm, p_adm, polar=False):
"""
Transform the initial conditions q_adm, p_adm to EOB coordinates.
"""
if polar:
q_adm_c, p_adm_c = polar_to_cartesian(q_adm, p_adm)
else:
q_adm_c, p_adm_c = q_adm, p_adm
q_eob_c, p_eob_c = adm_to_eob(q_adm_c, p_adm_c, nu)
q_eob_p, p_eob_p = cartesian_to_polar(q_eob_c, p_eob_c)
return q_eob_p, p_eob_p
[docs]
def www_get_RIT(self, simulation):
"""
Return command to get RIT data from www
RIT catalog offers 3 type of files,
names are based on a few metadata entries
Example:
catalog-tag = RIT:BBH:0001
resolution-tag = n100
id-tag = id3
'Metadata/RIT:BBH:0001-n100-id3_Metadata.txt'
'Data/ExtrapPsi4_RIT-BBH-0001-n100-id3.tar.gz
'Data/ExtrapStrain_RIT-BBH-0001-n100.h5'
Note: Currently relies on the system call 'wget'.
There python packages (see below), but would need installation.
What is available within anaconda?
"""
WWW = "https://ccrgpages.rit.edu/~RITCatalog/"
GET = "wget --no-check-certificate " # w/ white space
ctag = simulation["catalog-tag"].strip()
rtag = simulation["resolution-tag"].strip()
itag = simulation["id-tag"].strip()
ctag_ = ctag.replace(":", "-")
DAT = [
"Metadata/{}-{}-{}_Metadata.txt".format(ctag, rtag, itag),
"Data/ExtrapPsi4_{}-{}-{}.tar.gz".format(ctag_, rtag, itag),
"Data/ExtrapStrain_{}-{}.h5".format(ctag_, rtag),
]
return [GET + WWW + d for d in DAT]
# return [WWW+d for d in DAT] # urls only
[docs]
def www_get_SXS(self, simulation):
"""
Return command to get SXS data from www
TODO
SXS metadata contains a 'url' key to Zenodo
can use wget, curl or the various pyton-based packages:
https://pypi.org/project/requests/
https://pypi.org/project/zenodo-get/
"""
[docs]
def www_get(self, data=None, outdir="Data/", dryrun=False):
"""
Get data corresponding to the input metadata
Websites and www get are set based on the CATALOG key
Outdir is pre-pended by path and CATALOG
"""
# import wget # other option
cmd = {
"RIT": self.www_get_RIT,
#'SXS': self.www_get_SXS,
}
if data is None:
data = self.data
for s in data:
if s["CATALOG"] not in cmd.keys():
logging.warning("Skip {} : do not know how to get the data".format())
continue
for c in cmd[s["CATALOG"]](s):
if dryrun:
logging.info(c)
else:
path_outdir = "{}/{}/{}/".format(self.path, s["CATALOG"], outdir)
runcmd(c, path_outdir)
# wget.download( <url> ,out=path_output)
if __name__ == "__main__":
# s = Simulations()
# s.info()
# s = Simulations('./SXS',sxs_unpack=True)
# s.info()
# s = Simulations('./RIT')
# s.info()
# for k,v in s.data[0].items():
# print(k,v)
# example ADM to EOB for RIT
[docs]
s = Simulations("./RIT", rit_ids_unpack=True)
nu = s.data[0]["nu"]
xp = s.data[0]["x_p"]
pp = s.data[0]["P_p"] # already nu-normalized
q_eob, p_eob = s.ADM_to_EOB(nu, -2 * np.array(xp), np.array(pp), polar=False)
logging.info(f"{q_eob}, {p_eob}")
# for k,v in s.data[0].items():
# print(k,v)
# s1 = s.search('m1',0.5)
# s2 = s.search('m1',0.5,0.1)
# s3 = s.search('m1',77)
# print(s1)
# print(s2)
# print(s3)
# s.www_get(data=[s.data[0]])