Commit | Line | Data |
---|---|---|
8e60b2da FT |
1 | import json, http.cookiejar, binascii, time |
2 | from urllib import request, parse | |
3 | from bs4 import BeautifulSoup as soup | |
4 | soupify = lambda cont: soup(cont, "html.parser") | |
5 | ||
6 | apibase = "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/" | |
7 | loginurl = "https://online.swedbank.se/app/privat/login" | |
8 | serviceid = "B7dZHQcY78VRVz9l" | |
9 | ||
10 | class fmterror(Exception): | |
11 | pass | |
12 | ||
13 | class autherror(Exception): | |
14 | pass | |
15 | ||
16 | def resolve(d, keys, default=fmterror): | |
17 | def err(): | |
18 | if default is fmterror: | |
19 | raise fmterror() | |
20 | return default | |
21 | def rec(d, keys): | |
22 | if len(keys) == 0: | |
23 | return d | |
24 | if isinstance(d, dict): | |
25 | if keys[0] not in d: | |
26 | return err() | |
27 | return rec(d[keys[0]], keys[1:]) | |
28 | else: | |
29 | return err() | |
30 | return rec(d, keys) | |
31 | ||
32 | def linkurl(ln): | |
33 | if ln[0] != '/': | |
34 | raise fmterror("unexpected link url: " + ln) | |
35 | return parse.urljoin(apibase, ln[1:]) | |
36 | ||
37 | def getdsid(): | |
38 | with request.urlopen(loginurl) as resp: | |
39 | if resp.code != 200: | |
40 | raise fmterror("Unexpected HTTP status code: " + str(resp.code)) | |
41 | doc = soupify(resp.read()) | |
42 | dsel = doc.find("div", id="cust-sess-id") | |
43 | if not dsel or not dsel.has_attr("value"): | |
44 | raise fmterror("DSID DIV not on login page") | |
45 | return dsel["value"] | |
46 | ||
47 | def base64(data): | |
48 | return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=") | |
49 | ||
50 | class session(object): | |
51 | def __init__(self, dsid): | |
52 | self.dsid = dsid | |
53 | self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii")) | |
54 | self.jar = request.HTTPCookieProcessor() | |
55 | self.jar.cookiejar.set_cookie(http.cookiejar.Cookie( | |
56 | version=0, name="dsid", value=dsid, path="/", path_specified=True, | |
57 | domain=".online.swedbank.se", domain_specified=True, domain_initial_dot=True, | |
58 | port=None, port_specified=False, secure=False, expires=None, | |
59 | discard=True, comment=None, comment_url=None, | |
60 | rest={}, rfc2109=False)) | |
61 | self.userid = None | |
62 | ||
63 | def _req(self, url, data=None, ctype=None, headers={}, method=None, **kws): | |
64 | if "dsid" not in kws: | |
65 | kws["dsid"] = self.dsid | |
66 | kws = {k: v for (k, v) in kws.items() if v is not None} | |
67 | url = parse.urljoin(apibase, url + "?" + parse.urlencode(kws)) | |
68 | if isinstance(data, dict): | |
69 | data = json.dumps(data).encode("utf-8") | |
70 | ctype = "application/json;charset=UTF-8" | |
71 | req = request.Request(url, data=data, method=method) | |
72 | for hnam, hval in headers.items(): | |
73 | req.add_header(hnam, hval) | |
74 | if ctype is not None: | |
75 | req.add_header("Content-Type", ctype) | |
76 | req.add_header("Authorization", self.auth) | |
77 | self.jar.https_request(req) | |
78 | with request.urlopen(req) as resp: | |
79 | if resp.code != 200: | |
80 | raise fmterror("Unexpected HTTP status code: " + str(resp.code)) | |
81 | self.jar.https_response(req, resp) | |
82 | return resp.read() | |
83 | ||
84 | def _jreq(self, *args, **kwargs): | |
85 | headers = kwargs.pop("headers", {}) | |
86 | headers["Accept"] = "application/json" | |
87 | ret = self._req(*args, headers=headers, **kwargs) | |
88 | return json.loads(ret.decode("utf-8")) | |
89 | ||
90 | def auth_bankid(self, user): | |
91 | data = self._jreq("v5/identification/bankid/mobile", data = { | |
92 | "userId": user, | |
93 | "useEasyLogin": False, | |
94 | "generateEasyLoginId": False}) | |
95 | if data.get("status") != "USER_SIGN": | |
96 | raise fmterror("unexpected bankid status: " + str(data.get("status"))) | |
97 | vfy = linkurl(resolve(data, ("links", "next", "uri"))) | |
98 | while True: | |
99 | time.sleep(3) | |
100 | vdat = self._jreq(vfy) | |
101 | st = vdat.get("status") | |
102 | if st == "USER_SIGN": | |
103 | continue | |
104 | elif st == "COMPLETE": | |
105 | auth = self._jreq("v5/user/authenticationinfo") | |
106 | uid = auth.get("identifiedUser", "") | |
107 | if uid == "": | |
108 | raise fmterror("no identified user even after successful authentication") | |
109 | self.userid = uid | |
110 | return | |
111 | elif st == "CANCELLED": | |
112 | raise autherror("authentication cancelled") | |
113 | elif st == "CLIENT_NOT_STARTED": | |
114 | raise autherror("authentication client not started") | |
115 | else: | |
116 | raise fmterror("unexpected bankid status: " + str(st)) | |
117 | ||
118 | def logout(self): | |
119 | if self.userid is not None: | |
120 | self._jreq("v5/identification/logout", method="PUT") | |
121 | self.userid = None | |
122 | ||
123 | def close(self): | |
124 | self.logout() | |
125 | self._req("v5/framework/clientsession", method="DELETE") | |
126 | ||
127 | def __enter__(self): | |
128 | return self | |
129 | ||
130 | def __exit__(self, *excinfo): | |
131 | self.close() | |
132 | return False | |
133 | ||
134 | @classmethod | |
135 | def create(cls): | |
136 | return cls(getdsid()) |