From b56a005f198f6504c4782153d31d38d582f63034 Mon Sep 17 00:00:00 2001 From: Fredrik Tolf Date: Sun, 26 Oct 2025 03:27:13 +0100 Subject: [PATCH] fsb: Update general API and login handling. --- fulbank/fsb.py | 253 ++++++++++++++++++++++++++++--------------------- 1 file changed, 144 insertions(+), 109 deletions(-) diff --git a/fulbank/fsb.py b/fulbank/fsb.py index 7a4a4a8..ba8f4f4 100644 --- a/fulbank/fsb.py +++ b/fulbank/fsb.py @@ -5,9 +5,22 @@ from bs4 import BeautifulSoup as soup from . import currency, auth, data soupify = lambda cont: soup(cont, "html.parser") -apibase = "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/" -loginurl = "https://online.swedbank.se/app/privat/login" -serviceid = "B7dZHQcY78VRVz9l" +apicall = { + "base": "https://app.swedbank.se/api/", + "headers": [ + ("X-Api-Key", "x7X8h9nePgYEUHs7"), + ("X-Client", "fdp-internet-bank/227.1.0"), + ], +} +tdecall = { + "base": "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/", + "headers": [ + ("X-Api-Key", "x7X8h9nePgYEUHs7"), + ("X-Client", "fdp-internet-bank/227.1.0"), + ], + "serviceid": "B7dZHQcY78VRVz9l", +} +loginurl = "https://online.swedbank.se/app/ib/logga-in" class fmterror(Exception): pass @@ -29,7 +42,12 @@ class jsonerror(Exception): data = json.loads(err.read().decode(cs)) return cls(err.code, data, err.headers) +def base64(data): + return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=") + def resolve(d, keys, default=fmterror): + if isinstance(keys, str): + keys = [keys] def err(key): if default is fmterror: raise fmterror(key) @@ -49,23 +67,25 @@ def resolve(d, keys, default=fmterror): return err(keys[0]) return rec(d, keys) -def linkurl(ln): - if ln[0] != '/': - raise fmterror("unexpected link url: " + ln) - return parse.urljoin(apibase, ln[1:]) - -def getdsid(): - with request.urlopen(loginurl) as resp: - if resp.code != 200: - raise fmterror("Unexpected HTTP status code: " + str(resp.code)) - doc = soupify(resp.read()) - dsel = doc.find("div", id="cust-sess-id") - if not dsel or not dsel.has_attr("value"): - raise fmterror("DSID DIV not on login page") - return dsel["value"] - -def base64(data): - return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=") +def cktyp(d, typ, name, default=None): + if d is None: + d = default + if not isinstance(d, typ): + raise fmterror("unexpected type for " + name + ": " + repr(d)) + return d + +def tget(d, keys, typ, name, default=None): + return cktyp(resolve(d, keys), typ, name, default) + +def linkurl(ln, method): + if method is not None: + mth = tget(ln, "method", str, "link method", "") + if mth != method: + raise fmterror("unexpected method for link: " + mth) + uri = tget(ln, "uri", str, "link target") + if uri[0] != '/': + raise fmterror("unexpected link url: " + uri) + return parse.urljoin(tdecall["base"], uri[1:]) class transaction(data.transaction): def __init__(self, account, data): @@ -93,7 +113,7 @@ class txnaccount(data.txnaccount): @property def data(self): if self._data is None: - self._data = self.sess._jreq("v5/engagement/account/" + self.id) + self._data = self.sess._jreq(tdecall, "v5/engagement/account/" + self.id) return self._data @property @@ -111,7 +131,7 @@ class txnaccount(data.txnaccount): pagesz = 50 page = 1 while True: - data = self.sess._jreq("v5/engagement/transactions/" + self.id, transactionsPerPage=pagesz, page=page) + data = self.sess._jreq(tdecall, "v5/engagement/transactions/" + self.id, query={"transactionsPerPage": pagesz, "page": page}) txlist = resolve(data, ("transactions",)) if len(txlist) < 1: break @@ -147,7 +167,7 @@ class cardaccount(data.cardaccount): @property def data(self): if self._data is None: - self._data = self.sess._jreq("v5/engagement/cardaccount/" + self.id) + self._data = self.sess._jreq(tdecall, "v5/engagement/cardaccount/" + self.id) return self._data @property @@ -163,7 +183,7 @@ class cardaccount(data.cardaccount): pagesz = 50 page = 1 while True: - data = self.sess._jreq("v5/engagement/cardaccount/" + self.id, transactionsPerPage=pagesz, page=page) + data = self.sess._jreq(tdecall, "v5/engagement/cardaccount/" + self.id, query={"transactionsPerPage": pagesz, "page": page}) txlist = resolve(data, ("transactions",)) if len(txlist) < 1: break @@ -172,33 +192,36 @@ class cardaccount(data.cardaccount): page += 1 class session(data.session): - def __init__(self, dsid): - self.dsid = dsid - self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii")) + def __init__(self): + self.authtime = time.time() self.jar = request.HTTPCookieProcessor() - self.jar.cookiejar.set_cookie(http.cookiejar.Cookie( - version=0, name="dsid", value=dsid, path="/", path_specified=True, - domain=".online.swedbank.se", domain_specified=True, domain_initial_dot=True, - port=None, port_specified=False, secure=False, expires=None, - discard=True, comment=None, comment_url=None, - rest={}, rfc2109=False)) + self.usertoken = None + self.engagement = None self.userid = None self._accounts = None - def _req(self, url, data=None, ctype=None, headers={}, method=None, **kws): - if "dsid" not in kws: - kws["dsid"] = self.dsid - kws = {k: v for (k, v) in kws.items() if v is not None} - url = parse.urljoin(apibase, url + "?" + parse.urlencode(kws)) + def _req(self, reqtype, url, data=None, ctype=None, headers={}, method=None, query=None): + if query is None: + query = {} + query = {k: v for (k, v) in query.items() if v is not None} + url = parse.urljoin(reqtype["base"], url + "?" + parse.urlencode(query)) if isinstance(data, dict): data = json.dumps(data).encode("utf-8") - ctype = "application/json;charset=UTF-8" + if ctype is None: + ctype = "application/json" req = request.Request(url, data=data, method=method) + for hnam, hval in reqtype.get("headers", []): + req.add_header(hnam, hval) for hnam, hval in headers.items(): req.add_header(hnam, hval) if ctype is not None: req.add_header("Content-Type", ctype) - req.add_header("Authorization", self.auth) + if "serviceid" in reqtype: + req.add_header("Authorization", base64((reqtype["serviceid"] + ":" + str(int(self.authtime * 1000))).encode("ascii"))) + if self.usertoken is not None: + req.add_header("X-User", self.usertoken) + if self.engagement is not None: + req.add_header("X-Engagement", self.engagement) self.jar.https_request(req) with request.urlopen(req) as resp: if resp.code != 200 and resp.code != 201: @@ -212,108 +235,111 @@ class session(data.session): try: ret = self._req(*args, headers=headers, **kwargs) except urllib.error.HTTPError as e: - if e.headers.get_content_type() == "application/json": + if e.headers.get_content_type() in {"application/json", "application/problem+json"}: raise jsonerror.fromerr(e) return json.loads(ret.decode("utf-8")) def _postlogin(self): - auth = self._jreq("v5/user/authenticationinfo") + aprof = self._jreq(tdecall, "v5/profile/activeprofile") + + prof = self._jreq(tdecall, "v5/profile/") + if len(prof["banks"]) != 1: + raise fmterror("do not know the meaning of multiple banks") + profc = self._jreq(tdecall, + linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next")), "POST"), + method="POST") + self.engagement = tget(profc, ("selectedProfile", "andromedaProfileToken"), str, "engagement token") + + auth = self._jreq(tdecall, "v5/user/authenticationinfo") uid = auth.get("identifiedUser", "") if uid == "": raise fmterror("no identified user even after successful authentication") self.userid = uid - prof = self._jreq("v5/profile/") - if len(prof["banks"]) != 1: - raise fmterror("do not know the meaning of multiple banks") - rolesw = linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next", "uri"))) - self._jreq(rolesw, method="POST") def auth_token(self, user, conv=None): if conv is None: conv = auth.default() try: - data = self._jreq("v5/identification/securitytoken/challenge", data = { + data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/securitytoken/authentication", data={ "userId": user, - "useEasyLogin": "false", - "generateEasyLoginId": "false"}) + "corporateToken": False, + "generateEasyLogin": False, + "mockedAuthentication": False, + }) except jsonerror as e: if e.code == 400: - flds = resolve(e.data, ("errorMessages", "fields"), False) - if isinstance(flds, list): - for fld in flds: - if resolve(fld, ("field",), None) == "userId": - raise autherror(fld["message"]) - raise - if data.get("useOneTimePassword"): - raise fmterror("unexpectedly found useOneTimePassword") - if data.get("challenge") != "": - raise fmterror("unexpected challenge: " + str(data.get("challenge"))) - if not isinstance(data.get("imageChallenge"), dict) or resolve(data, ("imageChallenge", "method")) != "GET": - raise fmterror("invalid image challenge: " + str(data.get("imageChallenge"))) - iurl = linkurl(resolve(data, ("imageChallenge", "uri"))) - vfy = linkurl(resolve(data, ("links", "next", "uri"))) - img = Image.open(io.BytesIO(self._req(iurl))) - conv.image(img) - response = conv.prompt("Token response: ", True) - try: - data = self._jreq(vfy, data={"response": response}) - except jsonerror as e: - msgs = resolve(e.data, ("errorMessages", "general"), False) - if isinstance(msgs, list): - for msg in msgs: - if msg.get("message"): - raise autherror(msg.get("message")) + for fld in cktyp(resolve(e.data, ("errorMessages", "fields"), None), list, "field messages", []): + if resolve(fld, ("field",), None) == "userId": + raise autherror(fld["message"]) raise - if not data.get("authenticationRole", ""): - raise fmterror("authentication appears to have succeded, but there is no authenticationRole: " + str(data)) + if data.get("type") == "PHOTOTAN": + aid = tget(data, "authenticationId", str, "authentication id") + idata = tget(data, "imageData", str, "challenge image-data") + img = Image.open(io.BytesIO(self._req(apicall, "cross-channel/customer-security/authentication/v1/challenge-image", query={"data": idata, "size": "M"}))) + conv.image(img) + response = conv.prompt("Token response: ", True) + try: + data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/securitytoken/authentication/" + aid + "/response", data={ + "verificationCode": response, + }, method="PUT") + except jsonerror as e: + if e.code in {400, 401}: + for msg in tget(e.data, ("errorMessages", "general"), list, "error messages", []): + if "message" in msg: + raise autherror(tget(msg, "message", str, "error message")) + raise + self.usertoken = tget(data, "userToken", str, "user token") + else: + raise fmterror("unexpected token challenge: type: " + data.get("type")) self._postlogin() def auth_bankid(self, user, conv=None): if conv is None: conv = auth.default() try: - data = self._jreq("v5/identification/bankid/mobile", data = { + data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/bankid/authentication", data={ "userId": user, - "useEasyLogin": False, - "generateEasyLoginId": False}) + "idMethod": "MOBILE_BANKID", + "generateEasyLogin": False, + "mockedAuthencation": False, + "sameDevice": False, + }) except jsonerror as e: if e.code == 400: - flds = resolve(e.data, ("errorMessages", "fields"), False) - if isinstance(flds, list): - for fld in flds: - if resolve(fld, ("field",), None) == "userId": - raise autherror(fld["message"]) + for fld in cktyp(resolve(e.data, ("errorMessages", "fields"), None), list, "field messages", []): + if resolve(fld, ("field",), None) == "userId": + raise autherror(fld["message"]) raise - st = data.get("status") - vfy = linkurl(resolve(data, ("links", "next", "uri"))) + aid = tget(data, "authenticationId", str, "authentication id") + idata = tget(data, "imageData", str, "challenge image data") + img = Image.open(io.BytesIO(self._req(apicall, "cross-channel/customer-security/authentication/v1/challenge-image", query={"data": idata, "size": "M"}))) + conv.image(img) fst = None while True: - if st in {"USER_SIGN", "CLIENT_NOT_STARTED"}: - if st != fst: - conv.message("Status: %s" % (st,), auth.conv.msg_info) - fst = st - elif st == "COMPLETE": - self._postlogin() - return - elif st == "CANCELLED": - raise autherror("authentication cancelled") - elif st == "OUTSTANDING_TRANSACTION": - raise autherror("another bankid transaction already in progress") - else: - raise fmterror("unexpected bankid status: " + str(st)) - time.sleep(3) - vdat = self._jreq(vfy) - st = vdat.get("status") + data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/bankid/authentication/" + aid) + match tget(data, "status", str, "authentication status"): + case "SUCCESSFUL": + self.usertoken = tget(data, "userToken", str, "user token") + self._postlogin() + return + case "IN_PROGRESS": + st = tget(data, "bankIdStatus", str, "authenticator status") + if st != fst: + conv.message("Status: %s" % (st,), auth.conv.msg_info) + fst = st + case st: + raise fmterror("unexpected authentication status: " + st) + time.sleep(1) def keepalive(self): - data = self._jreq("v5/framework/clientsession") + data = self._jreq(tdecall, "v5/framework/clientsession") return data["timeoutInMillis"] / 1000 @property def accounts(self): if self._accounts is None: - txndata = self._jreq("v5/engagement/overview") - crddata = self._jreq("v5/card/creditcard") + txndata = self._jreq(tdecall, "v5/engagement/overview") + crddata = self._jreq(tdecall, "v5/card/creditcard") accounts = [] for acct in resolve(txndata, ("transactionAccounts",)): accounts.append(txnaccount(self, resolve(acct, ("id",)), acct)) @@ -324,12 +350,14 @@ class session(data.session): def logout(self): if self.userid is not None: - self._jreq("v5/identification/logout", method="PUT") + self._jreq(tdecall, "v5/identification/logout", method="PUT") + self.usertoken = None + self.engagement = None self.userid = None def close(self): self.logout() - self._req("v5/framework/clientsession", method="DELETE") + self._req(tdecall, "v5/framework/clientsession", method="DELETE") def __getstate__(self): state = dict(self.__dict__) @@ -357,4 +385,11 @@ class session(data.session): @classmethod def create(cls): - return cls(getdsid()) + self = cls() + self._req(tdecall, loginurl) + cookies = {cookie.name for cookie in self.jar.cookiejar} + if "SWBTC" not in cookies: + raise fmterror("did not get SWBTC cookie from initial request") + if "dsid" not in cookies: + raise fmterror("did not get SWBTC cookie from initial request") + return self -- 2.39.5