import sys, os, getopt, binascii, json, pprint, signal, time, calendar, threading
import urllib.request
-import Crypto.PublicKey.RSA, Crypto.Random, Crypto.Hash.SHA256, Crypto.Signature.PKCS1_v1_5
### General utilities
### Crypto utilities
+_cryptobke = None
+def cryptobke():
+ global _cryptobke
+ if _cryptobke is None:
+ from cryptography.hazmat import backends
+ _cryptobke = backends.default_backend()
+ return _cryptobke
+
class dererror(Exception):
pass
self.key = key
def authdata(self):
- return {"jwk": {"kty": "RSA", "e": ebignum(self.key.e), "n": ebignum(self.key.n)}}
+ pub = self.key.public_key().public_numbers()
+ return {"jwk": {"kty": "RSA", "e": ebignum(pub.e), "n": ebignum(pub.n)}}
def sign(self, data):
- dig = Crypto.Hash.SHA256.new()
- dig.update(data)
- return Crypto.Signature.PKCS1_v1_5.new(self.key).sign(dig)
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives.asymmetric import padding
+ return self.key.sign(data, padding.PKCS1v15(), hashes.SHA256())
class account(object):
def __init__(self, uri, key):
return {"kid": self.uri}
def sign(self, data):
- dig = Crypto.Hash.SHA256.new()
- dig.update(data)
- return Crypto.Signature.PKCS1_v1_5.new(self.key).sign(dig)
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives.asymmetric import padding
+ return self.key.sign(data, padding.PKCS1v15(), hashes.SHA256())
def getinfo(self):
data, headers = jreq(self.uri, None, self)
raise Exception("account is not valid: %s" % (data.get("status", "\"\"")))
def write(self, out):
+ from cryptography.hazmat.primitives import serialization
out.write("%s\n" % (self.uri,))
- out.write("%s\n" % (self.key.exportKey().decode("us-ascii"),))
+ out.write("%s\n" % (self.key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()
+ ).decode("us-ascii"),))
@classmethod
def read(cls, fp):
+ from cryptography.hazmat.primitives import serialization
uri = fp.readline()
if uri == "":
raise Exception("missing account URI")
uri = uri.strip()
- key = Crypto.PublicKey.RSA.importKey(fp.read())
+ key = serialization.load_pem_private_key(fp.read().encode("us-ascii"), password=None, backend=cryptobke())
return cls(uri, key)
### ACME protocol
return _directory
def register(keysize=4096):
- key = Crypto.PublicKey.RSA.generate(keysize, Crypto.Random.new().read)
+ from cryptography.hazmat.primitives.asymmetric import rsa
+ key = rsa.generate_private_key(public_exponent=65537, key_size=keysize, backend=cryptobke())
data, headers = jreq(directory()["newAccount"], {"termsOfServiceAgreed": True}, jwkauth(key))
return account(headers["Location"], key)
return data
def httptoken(acct, ch):
+ from cryptography.hazmat.primitives import hashes
jwk = {"kty": "RSA", "e": ebignum(acct.key.e), "n": ebignum(acct.key.n)}
- dig = Crypto.Hash.SHA256.new()
+ dig = hashes.Hash(hashes.SHA256())
dig.update(json.dumps(jwk, separators=(',', ':'), sort_keys=True).encode("us-ascii"))
- khash = base64url(dig.digest())
+ khash = base64url(dig.finalize())
return ch["token"], ("%s.%s" % (ch["token"], khash))
def finalize(acct, csr, orderid):