global _directory
if _directory is None:
with req(service) as resp:
- _directory = json.loads(resp.read().decode("utf-8"))
+ _directory = json.load(resp)
return _directory
def base64url(dat):
req.add_header("Content-Type", ctype)
return urllib.request.urlopen(req)
+class problem(msgerror):
+ def __init__(self, code, data, *args, url=None, **kw):
+ super().__init__(*args, **kw)
+ self.code = code
+ self.data = data
+ self.url = url
+ if not isinstance(data, dict):
+ raise ValueError("unexpected problem object type: %r" % (data,))
+
+ @property
+ def type(self):
+ return self.data.get("type", "about:blank")
+ @property
+ def title(self):
+ return self.data.get("title")
+ @property
+ def detail(self):
+ return self.data.get("detail")
+
+ def report(self, out):
+ extra = None
+ if self.title is None:
+ msg = self.detail
+ if "\n" in msg:
+ extra, msg = msg, None
+ else:
+ msg = self.title
+ extra = self.detail
+ if msg is None:
+ msg = self.data.get("type")
+ if msg is not None:
+ out.write("acemcert: %s: %s\n" % (
+ ("remote service error" if self.url is None else self.url),
+ ("unspecified error" if msg is None else msg)))
+ if extra is not None:
+ out.write("%s\n" % (extra,))
+
+ @classmethod
+ def read(cls, err, **kw):
+ self = cls(err.code, json.load(err), **kw)
+ return self
+
def jreq(url, data, auth):
authdata = {"alg": "RS256", "url": url, "nonce": getnonce()}
authdata.update(auth.authdata())
data = base64url(json.dumps(data).encode("us-ascii"))
seal = base64url(auth.sign(("%s.%s" % (authdata, data)).encode("us-ascii")))
enc = {"protected": authdata, "payload": data, "signature": seal}
- with req(url, data=enc) as resp:
- return json.loads(resp.read().decode("utf-8")), resp.headers
+ try:
+ with req(url, data=enc) as resp:
+ return json.load(resp), resp.headers
+ except urllib.error.HTTPError as exc:
+ if exc.headers["Content-Type"] == "application/problem+json":
+ raise problem.read(exc, url=url)
+ raise
class certificate(object):
@property