]> git.dolda2000.com Git - fulbank.git/commitdiff
fsb: Update general API and login handling.
authorFredrik Tolf <fredrik@dolda2000.com>
Sun, 26 Oct 2025 02:27:13 +0000 (03:27 +0100)
committerFredrik Tolf <fredrik@dolda2000.com>
Sun, 26 Oct 2025 02:27:13 +0000 (03:27 +0100)
fulbank/fsb.py

index 7a4a4a89dfe2126a536a3a2d553930e9799476f8..ba8f4f4b2419a17aa598fbeb02cc7143b67b3564 100644 (file)
@@ -5,9 +5,22 @@ from bs4 import BeautifulSoup as soup
 from . import currency, auth, data
 soupify = lambda cont: soup(cont, "html.parser")
 
-apibase = "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/"
-loginurl = "https://online.swedbank.se/app/privat/login"
-serviceid = "B7dZHQcY78VRVz9l"
+apicall = {
+    "base": "https://app.swedbank.se/api/",
+    "headers": [
+        ("X-Api-Key", "x7X8h9nePgYEUHs7"),
+        ("X-Client", "fdp-internet-bank/227.1.0"),
+    ],
+}
+tdecall = {
+    "base": "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/",
+    "headers": [
+        ("X-Api-Key", "x7X8h9nePgYEUHs7"),
+        ("X-Client", "fdp-internet-bank/227.1.0"),
+    ],
+    "serviceid": "B7dZHQcY78VRVz9l",
+}
+loginurl = "https://online.swedbank.se/app/ib/logga-in"
 
 class fmterror(Exception):
     pass
@@ -29,7 +42,12 @@ class jsonerror(Exception):
         data = json.loads(err.read().decode(cs))
         return cls(err.code, data, err.headers)
 
+def base64(data):
+    return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=")
+
 def resolve(d, keys, default=fmterror):
+    if isinstance(keys, str):
+        keys = [keys]
     def err(key):
         if default is fmterror:
             raise fmterror(key)
@@ -49,23 +67,25 @@ def resolve(d, keys, default=fmterror):
             return err(keys[0])
     return rec(d, keys)
 
-def linkurl(ln):
-    if ln[0] != '/':
-        raise fmterror("unexpected link url: " + ln)
-    return parse.urljoin(apibase, ln[1:])
-
-def getdsid():
-    with request.urlopen(loginurl) as resp:
-        if resp.code != 200:
-            raise fmterror("Unexpected HTTP status code: " + str(resp.code))
-        doc = soupify(resp.read())
-    dsel = doc.find("div", id="cust-sess-id")
-    if not dsel or not dsel.has_attr("value"):
-        raise fmterror("DSID DIV not on login page")
-    return dsel["value"]
-
-def base64(data):
-    return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=")
+def cktyp(d, typ, name, default=None):
+    if d is None:
+        d = default
+    if not isinstance(d, typ):
+        raise fmterror("unexpected type for " + name + ": " + repr(d))
+    return d
+
+def tget(d, keys, typ, name, default=None):
+    return cktyp(resolve(d, keys), typ, name, default)
+
+def linkurl(ln, method):
+    if method is not None:
+        mth = tget(ln, "method", str, "link method", "")
+        if mth != method:
+            raise fmterror("unexpected method for link: " + mth)
+    uri = tget(ln, "uri", str, "link target")
+    if uri[0] != '/':
+        raise fmterror("unexpected link url: " + uri)
+    return parse.urljoin(tdecall["base"], uri[1:])
 
 class transaction(data.transaction):
     def __init__(self, account, data):
@@ -93,7 +113,7 @@ class txnaccount(data.txnaccount):
     @property
     def data(self):
         if self._data is None:
-            self._data = self.sess._jreq("v5/engagement/account/" + self.id)
+            self._data = self.sess._jreq(tdecall, "v5/engagement/account/" + self.id)
         return self._data
 
     @property
@@ -111,7 +131,7 @@ class txnaccount(data.txnaccount):
         pagesz = 50
         page = 1
         while True:
-            data = self.sess._jreq("v5/engagement/transactions/" + self.id, transactionsPerPage=pagesz, page=page)
+            data = self.sess._jreq(tdecall, "v5/engagement/transactions/" + self.id, query={"transactionsPerPage": pagesz, "page": page})
             txlist = resolve(data, ("transactions",))
             if len(txlist) < 1:
                 break
@@ -147,7 +167,7 @@ class cardaccount(data.cardaccount):
     @property
     def data(self):
         if self._data is None:
-            self._data = self.sess._jreq("v5/engagement/cardaccount/" + self.id)
+            self._data = self.sess._jreq(tdecall, "v5/engagement/cardaccount/" + self.id)
         return self._data
 
     @property
@@ -163,7 +183,7 @@ class cardaccount(data.cardaccount):
         pagesz = 50
         page = 1
         while True:
-            data = self.sess._jreq("v5/engagement/cardaccount/" + self.id, transactionsPerPage=pagesz, page=page)
+            data = self.sess._jreq(tdecall, "v5/engagement/cardaccount/" + self.id, query={"transactionsPerPage": pagesz, "page": page})
             txlist = resolve(data, ("transactions",))
             if len(txlist) < 1:
                 break
@@ -172,33 +192,36 @@ class cardaccount(data.cardaccount):
             page += 1
 
 class session(data.session):
-    def __init__(self, dsid):
-        self.dsid = dsid
-        self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii"))
+    def __init__(self):
+        self.authtime = time.time()
         self.jar = request.HTTPCookieProcessor()
-        self.jar.cookiejar.set_cookie(http.cookiejar.Cookie(
-            version=0, name="dsid", value=dsid, path="/", path_specified=True,
-            domain=".online.swedbank.se", domain_specified=True, domain_initial_dot=True,
-            port=None, port_specified=False, secure=False, expires=None,
-            discard=True, comment=None, comment_url=None,
-            rest={}, rfc2109=False))
+        self.usertoken = None
+        self.engagement = None
         self.userid = None
         self._accounts = None
 
-    def _req(self, url, data=None, ctype=None, headers={}, method=None, **kws):
-        if "dsid" not in kws:
-            kws["dsid"] = self.dsid
-        kws = {k: v for (k, v) in kws.items() if v is not None}
-        url = parse.urljoin(apibase, url + "?" + parse.urlencode(kws))
+    def _req(self, reqtype, url, data=None, ctype=None, headers={}, method=None, query=None):
+        if query is None:
+            query = {}
+        query = {k: v for (k, v) in query.items() if v is not None}
+        url = parse.urljoin(reqtype["base"], url + "?" + parse.urlencode(query))
         if isinstance(data, dict):
             data = json.dumps(data).encode("utf-8")
-            ctype = "application/json;charset=UTF-8"
+            if ctype is None:
+                ctype = "application/json"
         req = request.Request(url, data=data, method=method)
+        for hnam, hval in reqtype.get("headers", []):
+            req.add_header(hnam, hval)
         for hnam, hval in headers.items():
             req.add_header(hnam, hval)
         if ctype is not None:
             req.add_header("Content-Type", ctype)
-        req.add_header("Authorization", self.auth)
+        if "serviceid" in reqtype:
+            req.add_header("Authorization", base64((reqtype["serviceid"] + ":" + str(int(self.authtime * 1000))).encode("ascii")))
+        if self.usertoken is not None:
+            req.add_header("X-User", self.usertoken)
+        if self.engagement is not None:
+            req.add_header("X-Engagement", self.engagement)
         self.jar.https_request(req)
         with request.urlopen(req) as resp:
             if resp.code != 200 and resp.code != 201:
@@ -212,108 +235,111 @@ class session(data.session):
         try:
             ret = self._req(*args, headers=headers, **kwargs)
         except urllib.error.HTTPError as e:
-            if e.headers.get_content_type() == "application/json":
+            if e.headers.get_content_type() in {"application/json", "application/problem+json"}:
                 raise jsonerror.fromerr(e)
         return json.loads(ret.decode("utf-8"))
 
     def _postlogin(self):
-        auth = self._jreq("v5/user/authenticationinfo")
+        aprof = self._jreq(tdecall, "v5/profile/activeprofile")
+
+        prof = self._jreq(tdecall, "v5/profile/")
+        if len(prof["banks"]) != 1:
+            raise fmterror("do not know the meaning of multiple banks")
+        profc = self._jreq(tdecall,
+                           linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next")), "POST"),
+                           method="POST")
+        self.engagement = tget(profc, ("selectedProfile", "andromedaProfileToken"), str, "engagement token")
+
+        auth = self._jreq(tdecall, "v5/user/authenticationinfo")
         uid = auth.get("identifiedUser", "")
         if uid == "":
             raise fmterror("no identified user even after successful authentication")
         self.userid = uid
-        prof = self._jreq("v5/profile/")
-        if len(prof["banks"]) != 1:
-            raise fmterror("do not know the meaning of multiple banks")
-        rolesw = linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next", "uri")))
-        self._jreq(rolesw, method="POST")
 
     def auth_token(self, user, conv=None):
         if conv is None:
             conv = auth.default()
         try:
-            data = self._jreq("v5/identification/securitytoken/challenge", data = {
+            data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/securitytoken/authentication", data={
                 "userId": user,
-                "useEasyLogin": "false",
-                "generateEasyLoginId": "false"})
+                "corporateToken": False,
+                "generateEasyLogin": False,
+                "mockedAuthentication": False,
+            })
         except jsonerror as e:
             if e.code == 400:
-                flds = resolve(e.data, ("errorMessages", "fields"), False)
-                if isinstance(flds, list):
-                    for fld in flds:
-                        if resolve(fld, ("field",), None) == "userId":
-                            raise autherror(fld["message"])
-            raise
-        if data.get("useOneTimePassword"):
-            raise fmterror("unexpectedly found useOneTimePassword")
-        if data.get("challenge") != "":
-            raise fmterror("unexpected challenge: " + str(data.get("challenge")))
-        if not isinstance(data.get("imageChallenge"), dict) or resolve(data, ("imageChallenge", "method")) != "GET":
-            raise fmterror("invalid image challenge: " + str(data.get("imageChallenge")))
-        iurl = linkurl(resolve(data, ("imageChallenge", "uri")))
-        vfy = linkurl(resolve(data, ("links", "next", "uri")))
-        img = Image.open(io.BytesIO(self._req(iurl)))
-        conv.image(img)
-        response = conv.prompt("Token response: ", True)
-        try:
-            data = self._jreq(vfy, data={"response": response})
-        except jsonerror as e:
-            msgs = resolve(e.data, ("errorMessages", "general"), False)
-            if isinstance(msgs, list):
-                for msg in msgs:
-                    if msg.get("message"):
-                        raise autherror(msg.get("message"))
+                for fld in cktyp(resolve(e.data, ("errorMessages", "fields"), None), list, "field messages", []):
+                    if resolve(fld, ("field",), None) == "userId":
+                        raise autherror(fld["message"])
             raise
-        if not data.get("authenticationRole", ""):
-            raise fmterror("authentication appears to have succeded, but there is no authenticationRole: " + str(data))
+        if data.get("type") == "PHOTOTAN":
+            aid = tget(data, "authenticationId", str, "authentication id")
+            idata = tget(data, "imageData", str, "challenge image-data")
+            img = Image.open(io.BytesIO(self._req(apicall, "cross-channel/customer-security/authentication/v1/challenge-image", query={"data": idata, "size": "M"})))
+            conv.image(img)
+            response = conv.prompt("Token response: ", True)
+            try:
+                data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/securitytoken/authentication/" + aid + "/response", data={
+                    "verificationCode": response,
+                }, method="PUT")
+            except jsonerror as e:
+                if e.code in {400, 401}:
+                    for msg in tget(e.data, ("errorMessages", "general"), list, "error messages", []):
+                        if "message" in msg:
+                            raise autherror(tget(msg, "message", str, "error message"))
+                raise
+            self.usertoken = tget(data, "userToken", str, "user token")
+        else:
+            raise fmterror("unexpected token challenge: type: " + data.get("type"))
         self._postlogin()
 
     def auth_bankid(self, user, conv=None):
         if conv is None:
             conv = auth.default()
         try:
-            data = self._jreq("v5/identification/bankid/mobile", data = {
+            data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/bankid/authentication", data={
                 "userId": user,
-                "useEasyLogin": False,
-                "generateEasyLoginId": False})
+                "idMethod": "MOBILE_BANKID",
+                "generateEasyLogin": False,
+                "mockedAuthencation": False,
+                "sameDevice": False,
+            })
         except jsonerror as e:
             if e.code == 400:
-                flds = resolve(e.data, ("errorMessages", "fields"), False)
-                if isinstance(flds, list):
-                    for fld in flds:
-                        if resolve(fld, ("field",), None) == "userId":
-                            raise autherror(fld["message"])
+                for fld in cktyp(resolve(e.data, ("errorMessages", "fields"), None), list, "field messages", []):
+                    if resolve(fld, ("field",), None) == "userId":
+                        raise autherror(fld["message"])
             raise
-        st = data.get("status")
-        vfy = linkurl(resolve(data, ("links", "next", "uri")))
+        aid = tget(data, "authenticationId", str, "authentication id")
+        idata = tget(data, "imageData", str, "challenge image data")
+        img = Image.open(io.BytesIO(self._req(apicall, "cross-channel/customer-security/authentication/v1/challenge-image", query={"data": idata, "size": "M"})))
+        conv.image(img)
         fst = None
         while True:
-            if st in {"USER_SIGN", "CLIENT_NOT_STARTED"}:
-                if st != fst:
-                    conv.message("Status: %s" % (st,), auth.conv.msg_info)
-                    fst = st
-            elif st == "COMPLETE":
-                self._postlogin()
-                return
-            elif st == "CANCELLED":
-                raise autherror("authentication cancelled")
-            elif st == "OUTSTANDING_TRANSACTION":
-                raise autherror("another bankid transaction already in progress")
-            else:
-                raise fmterror("unexpected bankid status: " + str(st))
-            time.sleep(3)
-            vdat = self._jreq(vfy)
-            st = vdat.get("status")
+            data = self._jreq(apicall, "cross-channel/customer-security/authentication/v1/bankid/authentication/" + aid)
+            match tget(data, "status", str, "authentication status"):
+                case "SUCCESSFUL":
+                    self.usertoken = tget(data, "userToken", str, "user token")
+                    self._postlogin()
+                    return
+                case "IN_PROGRESS":
+                    st = tget(data, "bankIdStatus", str, "authenticator status")
+                    if st != fst:
+                        conv.message("Status: %s" % (st,), auth.conv.msg_info)
+                        fst = st
+                case st:
+                    raise fmterror("unexpected authentication status: " + st)
+            time.sleep(1)
 
     def keepalive(self):
-        data = self._jreq("v5/framework/clientsession")
+        data = self._jreq(tdecall, "v5/framework/clientsession")
         return data["timeoutInMillis"] / 1000
 
     @property
     def accounts(self):
         if self._accounts is None:
-            txndata = self._jreq("v5/engagement/overview")
-            crddata = self._jreq("v5/card/creditcard")
+            txndata = self._jreq(tdecall, "v5/engagement/overview")
+            crddata = self._jreq(tdecall, "v5/card/creditcard")
             accounts = []
             for acct in resolve(txndata, ("transactionAccounts",)):
                 accounts.append(txnaccount(self, resolve(acct, ("id",)), acct))
@@ -324,12 +350,14 @@ class session(data.session):
 
     def logout(self):
         if self.userid is not None:
-            self._jreq("v5/identification/logout", method="PUT")
+            self._jreq(tdecall, "v5/identification/logout", method="PUT")
+            self.usertoken = None
+            self.engagement = None
             self.userid = None
 
     def close(self):
         self.logout()
-        self._req("v5/framework/clientsession", method="DELETE")
+        self._req(tdecall, "v5/framework/clientsession", method="DELETE")
 
     def __getstate__(self):
         state = dict(self.__dict__)
@@ -357,4 +385,11 @@ class session(data.session):
 
     @classmethod
     def create(cls):
-        return cls(getdsid())
+        self = cls()
+        self._req(tdecall, loginurl)
+        cookies = {cookie.name for cookie in self.jar.cookiejar}
+        if "SWBTC" not in cookies:
+            raise fmterror("did not get SWBTC cookie from initial request")
+        if "dsid" not in cookies:
+            raise fmterror("did not get SWBTC cookie from initial request")
+        return self