from . import currency, auth, data
soupify = lambda cont: soup(cont, "html.parser")
-apibase = "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/"
-loginurl = "https://online.swedbank.se/app/privat/login"
-serviceid = "B7dZHQcY78VRVz9l"
+apicall = {
+ "base": "https://app.swedbank.se/api/",
+ "headers": [
+ ("X-Api-Key", "x7X8h9nePgYEUHs7"),
+ ("X-Client", "fdp-internet-bank/227.1.0"),
+ ],
+}
+tdecall = {
+ "base": "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/",
+ "headers": [
+ ("X-Api-Key", "x7X8h9nePgYEUHs7"),
+ ("X-Client", "fdp-internet-bank/227.1.0"),
+ ],
+ "serviceid": "B7dZHQcY78VRVz9l",
+}
+loginurl = "https://online.swedbank.se/app/ib/logga-in"
class fmterror(Exception):
pass
data = json.loads(err.read().decode(cs))
return cls(err.code, data, err.headers)
+def base64(data):
+ return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=")
+
def resolve(d, keys, default=fmterror):
+ if isinstance(keys, str):
+ keys = [keys]
def err(key):
if default is fmterror:
raise fmterror(key)
return err(keys[0])
return rec(d, keys)
-def linkurl(ln):
- if ln[0] != '/':
- raise fmterror("unexpected link url: " + ln)
- return parse.urljoin(apibase, ln[1:])
-
-def getdsid():
- with request.urlopen(loginurl) as resp:
- if resp.code != 200:
- raise fmterror("Unexpected HTTP status code: " + str(resp.code))
- doc = soupify(resp.read())
- dsel = doc.find("div", id="cust-sess-id")
- if not dsel or not dsel.has_attr("value"):
- raise fmterror("DSID DIV not on login page")
- return dsel["value"]
-
-def base64(data):
- return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=")
+def cktyp(d, typ, name, default=None):
+ if d is None:
+ d = default
+ if not isinstance(d, typ):
+ raise fmterror("unexpected type for " + name + ": " + repr(d))
+ return d
+
+def tget(d, keys, typ, name, default=None):
+ return cktyp(resolve(d, keys), typ, name, default)
+
+def linkurl(ln, method):
+ if method is not None:
+ mth = tget(ln, "method", str, "link method", "")
+ if mth != method:
+ raise fmterror("unexpected method for link: " + mth)
+ uri = tget(ln, "uri", str, "link target")
+ if uri[0] != '/':
+ raise fmterror("unexpected link url: " + uri)
+ return parse.urljoin(tdecall["base"], uri[1:])
class transaction(data.transaction):
def __init__(self, account, data):
@property
def data(self):
if self._data is None:
- self._data = self.sess._jreq("v5/engagement/account/" + self.id)
+ self._data = self.sess._jreq(tdecall, "v5/engagement/account/" + self.id)
return self._data
@property
pagesz = 50
page = 1
while True:
- data = self.sess._jreq("v5/engagement/transactions/" + self.id, transactionsPerPage=pagesz, page=page)
+ data = self.sess._jreq(tdecall, "v5/engagement/transactions/" + self.id, query={"transactionsPerPage": pagesz, "page": page})
txlist = resolve(data, ("transactions",))
if len(txlist) < 1:
break
@property
def data(self):
if self._data is None:
- self._data = self.sess._jreq("v5/engagement/cardaccount/" + self.id)
+ self._data = self.sess._jreq(tdecall, "v5/engagement/cardaccount/" + self.id)
return self._data
@property
pagesz = 50
page = 1
while True:
- data = self.sess._jreq("v5/engagement/cardaccount/" + self.id, transactionsPerPage=pagesz, page=page)
+ data = self.sess._jreq(tdecall, "v5/engagement/cardaccount/" + self.id, query={"transactionsPerPage": pagesz, "page": page})
txlist = resolve(data, ("transactions",))
if len(txlist) < 1:
break
page += 1
class session(data.session):
- def __init__(self, dsid):
- self.dsid = dsid
- self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii"))
+ def __init__(self):
+ self.authtime = time.time()
self.jar = request.HTTPCookieProcessor()
- self.jar.cookiejar.set_cookie(http.cookiejar.Cookie(
- version=0, name="dsid", value=dsid, path="/", path_specified=True,
- domain=".online.swedbank.se", domain_specified=True, domain_initial_dot=True,
- port=None, port_specified=False, secure=False, expires=None,
- discard=True, comment=None, comment_url=None,
- rest={}, rfc2109=False))
+ self.usertoken = None
+ self.engagement = None
self.userid = None
self._accounts = None
- def _req(self, url, data=None, ctype=None, headers={}, method=None, **kws):
- if "dsid" not in kws:
- kws["dsid"] = self.dsid
- kws = {k: v for (k, v) in kws.items() if v is not None}
- url = parse.urljoin(apibase, url + "?" + parse.urlencode(kws))
+ def _req(self, reqtype, url, data=None, ctype=None, headers={}, method=None, query=None):
+ if query is None:
+ query = {}
+ query = {k: v for (k, v) in query.items() if v is not None}
+ url = parse.urljoin(reqtype["base"], url + "?" + parse.urlencode(query))
if isinstance(data, dict):
data = json.dumps(data).encode("utf-8")
- ctype = "application/json;charset=UTF-8"
+ if ctype is None:
+ ctype = "application/json"
req = request.Request(url, data=data, method=method)
+ for hnam, hval in reqtype.get("headers", []):
+ req.add_header(hnam, hval)
for hnam, hval in headers.items():
req.add_header(hnam, hval)
if ctype is not None:
req.add_header("Content-Type", ctype)
- req.add_header("Authorization", self.auth)
+ if "serviceid" in reqtype:
+ req.add_header("Authorization", base64((reqtype["serviceid"] + ":" + str(int(self.authtime * 1000))).encode("ascii")))
+ if self.usertoken is not None:
+ req.add_header("X-User", self.usertoken)
+ if self.engagement is not None:
+ req.add_header("X-Engagement", self.engagement)
self.jar.https_request(req)
with request.urlopen(req) as resp:
if resp.code != 200 and resp.code != 201:
try:
ret = self._req(*args, headers=headers, **kwargs)
except urllib.error.HTTPError as e:
- if e.headers.get_content_type() == "application/json":
+ if e.headers.get_content_type() in {"application/json", "application/problem+json"}:
raise jsonerror.fromerr(e)
return json.loads(ret.decode("utf-8"))
def _postlogin(self):
- auth = self._jreq("v5/user/authenticationinfo")
+ aprof = self._jreq(tdecall, "v5/profile/activeprofile")
+
+ prof = self._jreq(tdecall, "v5/profile/")
+ if len(prof["banks"]) != 1:
+ raise fmterror("do not know the meaning of multiple banks")
+ profc = self._jreq(tdecall,
+ linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next")), "POST"),
+ method="POST")
+ self.engagement = tget(profc, ("selectedProfile", "andromedaProfileToken"), str, "engagement token")
+
+ auth = self._jreq(tdecall, "v5/user/authenticationinfo")
uid = auth.get("identifiedUser", "")
if uid == "":
raise fmterror("no identified user even after successful authentication")
self.userid = uid
- prof = self._jreq("v5/profile/")
- if len(prof["banks"]) != 1:
- raise fmterror("do not know the meaning of multiple banks")
- rolesw = linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next", "uri")))
- self._jreq(rolesw, method="POST")
def auth_token(self, user, conv=None):
if conv is None:
conv = auth.default()
try:
- data = self._jreq("v5/identification/securitytoken/challenge", data = {
+ data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/securitytoken/authentication", data={
"userId": user,
- "useEasyLogin": "false",
- "generateEasyLoginId": "false"})
+ "corporateToken": False,
+ "generateEasyLogin": False,
+ "mockedAuthentication": False,
+ })
except jsonerror as e:
if e.code == 400:
- flds = resolve(e.data, ("errorMessages", "fields"), False)
- if isinstance(flds, list):
- for fld in flds:
- if resolve(fld, ("field",), None) == "userId":
- raise autherror(fld["message"])
- raise
- if data.get("useOneTimePassword"):
- raise fmterror("unexpectedly found useOneTimePassword")
- if data.get("challenge") != "":
- raise fmterror("unexpected challenge: " + str(data.get("challenge")))
- if not isinstance(data.get("imageChallenge"), dict) or resolve(data, ("imageChallenge", "method")) != "GET":
- raise fmterror("invalid image challenge: " + str(data.get("imageChallenge")))
- iurl = linkurl(resolve(data, ("imageChallenge", "uri")))
- vfy = linkurl(resolve(data, ("links", "next", "uri")))
- img = Image.open(io.BytesIO(self._req(iurl)))
- conv.image(img)
- response = conv.prompt("Token response: ", True)
- try:
- data = self._jreq(vfy, data={"response": response})
- except jsonerror as e:
- msgs = resolve(e.data, ("errorMessages", "general"), False)
- if isinstance(msgs, list):
- for msg in msgs:
- if msg.get("message"):
- raise autherror(msg.get("message"))
+ for fld in cktyp(resolve(e.data, ("errorMessages", "fields"), None), list, "field messages", []):
+ if resolve(fld, ("field",), None) == "userId":
+ raise autherror(fld["message"])
raise
- if not data.get("authenticationRole", ""):
- raise fmterror("authentication appears to have succeded, but there is no authenticationRole: " + str(data))
+ if data.get("type") == "PHOTOTAN":
+ aid = tget(data, "authenticationId", str, "authentication id")
+ idata = tget(data, "imageData", str, "challenge image-data")
+ img = Image.open(io.BytesIO(self._req(apicall, "cross-channel/customer-security/authentication/v1/challenge-image", query={"data": idata, "size": "M"})))
+ conv.image(img)
+ response = conv.prompt("Token response: ", True)
+ try:
+ data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/securitytoken/authentication/" + aid + "/response", data={
+ "verificationCode": response,
+ }, method="PUT")
+ except jsonerror as e:
+ if e.code in {400, 401}:
+ for msg in tget(e.data, ("errorMessages", "general"), list, "error messages", []):
+ if "message" in msg:
+ raise autherror(tget(msg, "message", str, "error message"))
+ raise
+ self.usertoken = tget(data, "userToken", str, "user token")
+ else:
+ raise fmterror("unexpected token challenge: type: " + data.get("type"))
self._postlogin()
def auth_bankid(self, user, conv=None):
if conv is None:
conv = auth.default()
try:
- data = self._jreq("v5/identification/bankid/mobile", data = {
+ data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/bankid/authentication", data={
"userId": user,
- "useEasyLogin": False,
- "generateEasyLoginId": False})
+ "idMethod": "MOBILE_BANKID",
+ "generateEasyLogin": False,
+ "mockedAuthencation": False,
+ "sameDevice": False,
+ })
except jsonerror as e:
if e.code == 400:
- flds = resolve(e.data, ("errorMessages", "fields"), False)
- if isinstance(flds, list):
- for fld in flds:
- if resolve(fld, ("field",), None) == "userId":
- raise autherror(fld["message"])
+ for fld in cktyp(resolve(e.data, ("errorMessages", "fields"), None), list, "field messages", []):
+ if resolve(fld, ("field",), None) == "userId":
+ raise autherror(fld["message"])
raise
- st = data.get("status")
- vfy = linkurl(resolve(data, ("links", "next", "uri")))
+ aid = tget(data, "authenticationId", str, "authentication id")
+ idata = tget(data, "imageData", str, "challenge image data")
+ img = Image.open(io.BytesIO(self._req(apicall, "cross-channel/customer-security/authentication/v1/challenge-image", query={"data": idata, "size": "M"})))
+ conv.image(img)
fst = None
while True:
- if st in {"USER_SIGN", "CLIENT_NOT_STARTED"}:
- if st != fst:
- conv.message("Status: %s" % (st,), auth.conv.msg_info)
- fst = st
- elif st == "COMPLETE":
- self._postlogin()
- return
- elif st == "CANCELLED":
- raise autherror("authentication cancelled")
- elif st == "OUTSTANDING_TRANSACTION":
- raise autherror("another bankid transaction already in progress")
- else:
- raise fmterror("unexpected bankid status: " + str(st))
- time.sleep(3)
- vdat = self._jreq(vfy)
- st = vdat.get("status")
+ data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/bankid/authentication/" + aid)
+ match tget(data, "status", str, "authentication status"):
+ case "SUCCESSFUL":
+ self.usertoken = tget(data, "userToken", str, "user token")
+ self._postlogin()
+ return
+ case "IN_PROGRESS":
+ st = tget(data, "bankIdStatus", str, "authenticator status")
+ if st != fst:
+ conv.message("Status: %s" % (st,), auth.conv.msg_info)
+ fst = st
+ case st:
+ raise fmterror("unexpected authentication status: " + st)
+ time.sleep(1)
def keepalive(self):
- data = self._jreq("v5/framework/clientsession")
+ data = self._jreq(tdecall, "v5/framework/clientsession")
return data["timeoutInMillis"] / 1000
@property
def accounts(self):
if self._accounts is None:
- txndata = self._jreq("v5/engagement/overview")
- crddata = self._jreq("v5/card/creditcard")
+ txndata = self._jreq(tdecall, "v5/engagement/overview")
+ crddata = self._jreq(tdecall, "v5/card/creditcard")
accounts = []
for acct in resolve(txndata, ("transactionAccounts",)):
accounts.append(txnaccount(self, resolve(acct, ("id",)), acct))
def logout(self):
if self.userid is not None:
- self._jreq("v5/identification/logout", method="PUT")
+ self._jreq(tdecall, "v5/identification/logout", method="PUT")
+ self.usertoken = None
+ self.engagement = None
self.userid = None
def close(self):
self.logout()
- self._req("v5/framework/clientsession", method="DELETE")
+ self._req(tdecall, "v5/framework/clientsession", method="DELETE")
def __getstate__(self):
state = dict(self.__dict__)
@classmethod
def create(cls):
- return cls(getdsid())
+ self = cls()
+ self._req(tdecall, loginurl)
+ cookies = {cookie.name for cookie in self.jar.cookiejar}
+ if "SWBTC" not in cookies:
+ raise fmterror("did not get SWBTC cookie from initial request")
+ if "dsid" not in cookies:
+ raise fmterror("did not get SWBTC cookie from initial request")
+ return self