1 import json, http.cookiejar, binascii, time, datetime, pickle
2 from urllib import request, parse
3 from bs4 import BeautifulSoup as soup
4 from . import currency, auth, data
5 soupify = lambda cont: soup(cont, "html.parser")
7 apibase = "https://online.swedbank.se/TDE_DAP_Portal_REST_WEB/api/"
8 loginurl = "https://online.swedbank.se/app/privat/login"
9 serviceid = "B7dZHQcY78VRVz9l"
11 class fmterror(Exception):
14 class autherror(Exception):
17 def resolve(d, keys, default=fmterror):
19 if default is fmterror:
25 if isinstance(d, dict):
28 return rec(d[keys[0]], keys[1:])
29 elif isinstance(d, list):
30 if not 0 <= keys[0] < len(d):
32 return rec(d[keys[0]], keys[1:])
39 raise fmterror("unexpected link url: " + ln)
40 return parse.urljoin(apibase, ln[1:])
43 with request.urlopen(loginurl) as resp:
45 raise fmterror("Unexpected HTTP status code: " + str(resp.code))
46 doc = soupify(resp.read())
47 dsel = doc.find("div", id="cust-sess-id")
48 if not dsel or not dsel.has_attr("value"):
49 raise fmterror("DSID DIV not on login page")
53 return binascii.b2a_base64(data).decode("ascii").strip().rstrip("=")
55 class transaction(data.transaction):
56 def __init__(self, account, data):
57 self.account = account
63 def value(self): return currency.currency.get(resolve(self._data, ("currency",))).parse(resolve(self._data, ("amount",)))
65 def message(self): return resolve(self._data, ("description",))
68 p = time.strptime(resolve(self._data, ("accountingDate",)), self._datefmt)
69 return datetime.date(p.tm_year, p.tm_mon, p.tm_mday)
71 class txnaccount(data.txnaccount):
72 def __init__(self, sess, id, idata):
80 if self._data is None:
81 self._data = self.sess._jreq("v5/engagement/account/" + self.id)
85 def number(self): return resolve(self.data, ("accountNumber",))
87 def clearing(self): return resolve(self.data, ("clearingNumber",))
89 def fullnumber(self): return resolve(self.data, ("fullyFormattedNumber",))
91 def balance(self): return currency.currency.get(resolve(self.data, ("balance", "currencyCode"))).parse(resolve(self.data, ("balance", "amount")))
93 def name(self): return resolve(self._idata, ("name",))
95 def transactions(self):
99 data = self.sess._jreq("v5/engagement/transactions/" + self.id, transactionsPerPage=pagesz, page=page)
100 txlist = resolve(data, ("transactions",))
104 yield transaction(self, tx)
107 class cardtransaction(data.transaction):
108 def __init__(self, account, data):
109 self.account = account
112 _datefmt = "%Y-%m-%d"
116 am = resolve(self._data, ("localAmount",))
117 return currency.currency.get(resolve(am, ("currencyCode",))).parse(resolve(am, ("amount",)))
119 def message(self): return resolve(self._data, ("description",))
122 p = time.strptime(resolve(self._data, ("date",)), self._datefmt)
123 return datetime.date(p.tm_year, p.tm_mon, p.tm_mday)
125 class cardaccount(data.cardaccount):
126 def __init__(self, sess, id, idata):
134 if self._data is None:
135 self._data = self.sess._jreq("v5/engagement/cardaccount/" + self.id)
139 def number(self): return resolve(self.data, ("cardAccount", "cardNumber"))
142 cc = resolve(self.data, ("transactions", 0, "localAmount", "currencyCode"))
143 return currency.currency.get(cc).parse(resolve(self.data, ("cardAccount", "currentBalance")))
145 def name(self): return resolve(self._idata, ("name",))
147 def transactions(self):
151 data = self.sess._jreq("v5/engagement/cardaccount/" + self.id, transactionsPerPage=pagesz, page=page)
152 txlist = resolve(data, ("transactions",))
156 yield cardtransaction(self, tx)
159 class session(object):
160 def __init__(self, dsid):
162 self.auth = base64((serviceid + ":" + str(int(time.time() * 1000))).encode("ascii"))
163 self.jar = request.HTTPCookieProcessor()
164 self.jar.cookiejar.set_cookie(http.cookiejar.Cookie(
165 version=0, name="dsid", value=dsid, path="/", path_specified=True,
166 domain=".online.swedbank.se", domain_specified=True, domain_initial_dot=True,
167 port=None, port_specified=False, secure=False, expires=None,
168 discard=True, comment=None, comment_url=None,
169 rest={}, rfc2109=False))
171 self._accounts = None
173 def _req(self, url, data=None, ctype=None, headers={}, method=None, **kws):
174 if "dsid" not in kws:
175 kws["dsid"] = self.dsid
176 kws = {k: v for (k, v) in kws.items() if v is not None}
177 url = parse.urljoin(apibase, url + "?" + parse.urlencode(kws))
178 if isinstance(data, dict):
179 data = json.dumps(data).encode("utf-8")
180 ctype = "application/json;charset=UTF-8"
181 req = request.Request(url, data=data, method=method)
182 for hnam, hval in headers.items():
183 req.add_header(hnam, hval)
184 if ctype is not None:
185 req.add_header("Content-Type", ctype)
186 req.add_header("Authorization", self.auth)
187 self.jar.https_request(req)
188 with request.urlopen(req) as resp:
189 if resp.code != 200 and resp.code != 201:
190 raise fmterror("Unexpected HTTP status code: " + str(resp.code))
191 self.jar.https_response(req, resp)
194 def _jreq(self, *args, **kwargs):
195 headers = kwargs.pop("headers", {})
196 headers["Accept"] = "application/json"
197 ret = self._req(*args, headers=headers, **kwargs)
198 return json.loads(ret.decode("utf-8"))
200 def _postlogin(self):
201 auth = self._jreq("v5/user/authenticationinfo")
202 uid = auth.get("identifiedUser", "")
204 raise fmterror("no identified user even after successful authentication")
206 prof = self._jreq("v5/profile/")
207 if len(prof["banks"]) != 1:
208 raise fmterror("do not know the meaning of multiple banks")
209 rolesw = linkurl(resolve(prof["banks"][0], ("privateProfile", "links", "next", "uri")))
210 self._jreq(rolesw, method="POST")
212 def auth_bankid(self, user, conv=None):
214 conv = auth.default()
215 data = self._jreq("v5/identification/bankid/mobile", data = {
217 "useEasyLogin": False,
218 "generateEasyLoginId": False})
219 if data.get("status") != "USER_SIGN":
220 raise fmterror("unexpected bankid status: " + str(data.get("status")))
221 vfy = linkurl(resolve(data, ("links", "next", "uri")))
225 vdat = self._jreq(vfy)
226 st = vdat.get("status")
227 if st in {"USER_SIGN", "CLIENT_NOT_STARTED"}:
229 conv.message("Status: %s" % (st,), auth.conv.msg_info)
232 elif st == "COMPLETE":
235 elif st == "CANCELLED":
236 raise autherror("authentication cancelled")
238 raise fmterror("unexpected bankid status: " + str(st))
241 data = self._jreq("v5/framework/clientsession")
242 return data["timeoutInMillis"] / 1000
246 if self._accounts is None:
247 data = self._jreq("v5/engagement/overview")
249 for acct in resolve(data, ("transactionAccounts",)):
250 accounts.append(txnaccount(self, resolve(acct, ("id",)), acct))
251 for acct in resolve(data, ("cardAccounts",)):
252 accounts.append(cardaccount(self, resolve(acct, ("id",)), acct))
253 self._accounts = accounts
254 return self._accounts
257 if self.userid is not None:
258 self._jreq("v5/identification/logout", method="PUT")
263 self._req("v5/framework/clientsession", method="DELETE")
268 def __exit__(self, *excinfo):
273 if self.userid is not None:
274 return "#<fsb.session %s>" % self.userid
275 return "#<fsb.session>"
279 return cls(getdsid())
281 def save(self, filename):
282 with open(filename, "wb") as fp:
283 pickle.dump(self, fp)
286 def load(cls, filename):
287 with open(filename, "rb") as fp:
288 return pickle.load(fp)