cleanup: remove legacy api tokens, moved to OIDC
This commit is contained in:
parent
65f1f0a1d5
commit
5902db030c
|
@ -0,0 +1,20 @@
|
|||
# Generated by Django 3.2.13 on 2023-09-25 10:35
|
||||
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('authn', '0008_auto_20230306_1623'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RemoveField(
|
||||
model_name='session',
|
||||
name='app',
|
||||
),
|
||||
migrations.DeleteModel(
|
||||
name='Apps',
|
||||
),
|
||||
]
|
|
@ -2,7 +2,6 @@ from datetime import datetime, timedelta
|
|||
from uuid import uuid4
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.postgres.fields import ArrayField
|
||||
from django.db import models
|
||||
|
||||
from club.exceptions import RateLimitException, InvalidCode
|
||||
|
@ -10,26 +9,10 @@ from users.models.user import User
|
|||
from utils.strings import random_string, random_number
|
||||
|
||||
|
||||
class Apps(models.Model):
|
||||
id = models.CharField(max_length=16, primary_key=True)
|
||||
name = models.CharField(max_length=64, unique=True)
|
||||
owner = models.ForeignKey(User, related_name="apps", null=True, on_delete=models.CASCADE)
|
||||
jwt_secret = models.TextField(null=True)
|
||||
jwt_algorithm = models.CharField(max_length=16, default="")
|
||||
jwt_expire_hours = models.IntegerField(default=240)
|
||||
redirect_urls = ArrayField(models.CharField(max_length=256), default=list, null=False)
|
||||
service_token = models.CharField(max_length=128, unique=True, db_index=True, null=True)
|
||||
|
||||
class Meta:
|
||||
db_table = "apps"
|
||||
|
||||
|
||||
class Session(models.Model):
|
||||
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
|
||||
|
||||
user = models.ForeignKey(User, related_name="sessions", db_index=True, on_delete=models.CASCADE)
|
||||
app = models.ForeignKey(Apps, related_name="sessions", null=True, on_delete=models.CASCADE)
|
||||
|
||||
token = models.CharField(max_length=128, unique=True, db_index=True)
|
||||
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
from datetime import datetime, timedelta
|
||||
from urllib.parse import quote, urlparse, parse_qsl, urlencode
|
||||
|
||||
import jwt
|
||||
from django.shortcuts import render, redirect
|
||||
from django.urls import reverse
|
||||
|
||||
from authn.helpers import authorized_user
|
||||
from authn.models.session import Apps
|
||||
|
||||
|
||||
def external_login(request):
|
||||
goto = request.GET.get("redirect")
|
||||
if not goto:
|
||||
return render(request, "error.html", {"message": "Нужен параметр ?redirect"}, status=400)
|
||||
|
||||
# check if user is logged in or redirect to login screen
|
||||
me = authorized_user(request)
|
||||
if not me:
|
||||
redirect_here_again = quote(reverse("external_login") + f"?redirect={goto}", safe="")
|
||||
return redirect(reverse("login") + f"?goto={redirect_here_again}")
|
||||
|
||||
# we only authorize applications we know with the keys we set for them
|
||||
app_id = request.GET.get("app_id")
|
||||
app = Apps.objects.filter(id=app_id).first()
|
||||
if not app:
|
||||
return render(
|
||||
request, "error.html",
|
||||
{"message": "Неизвестное приложение, проверьте параметр ?app_id"},
|
||||
status=400
|
||||
)
|
||||
|
||||
# check if redirect_url is in the list of allowed urls
|
||||
goto_parsed = urlparse(goto)
|
||||
goto_path_without_params = f"{goto_parsed.scheme}://{goto_parsed.netloc}{goto_parsed.path}"
|
||||
if goto_path_without_params not in app.redirect_urls:
|
||||
return render(
|
||||
request, "error.html",
|
||||
{"message": f"'{goto}' не находится в списке разрешеных redirect_urls для этого приложения"},
|
||||
status=400
|
||||
)
|
||||
|
||||
# TODO: show "authorize" window and ask for user's consent here
|
||||
|
||||
# success! issue a new signed JWT
|
||||
payload = {
|
||||
"user_slug": me.slug,
|
||||
"user_name": me.full_name,
|
||||
"user_email": me.email,
|
||||
"exp": datetime.utcnow() + timedelta(hours=app.jwt_expire_hours),
|
||||
}
|
||||
jwt_token = jwt.encode(payload, app.jwt_secret, algorithm=app.jwt_algorithm)
|
||||
|
||||
# add ?jwt= to redirect_url and activate the redirect
|
||||
goto_params = parse_qsl(goto_parsed.query)
|
||||
goto_params += [("jwt", jwt_token)]
|
||||
return redirect(f"{goto_path_without_params}?{urlencode(goto_params)}")
|
|
@ -1,7 +1,6 @@
|
|||
import unittest
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import django
|
||||
from django.test import TestCase
|
||||
|
@ -9,13 +8,12 @@ from django.urls import reverse
|
|||
from django.http.response import HttpResponseNotAllowed, HttpResponseBadRequest
|
||||
from django_q import brokers
|
||||
from django_q.signing import SignedPackage
|
||||
import jwt
|
||||
from unittest import skip
|
||||
from unittest.mock import patch
|
||||
|
||||
django.setup() # todo: how to run tests from PyCharm without this workaround?
|
||||
|
||||
from authn.models.session import Apps, Code
|
||||
from authn.models.session import Code
|
||||
from authn.providers.common import Membership, Platform
|
||||
from authn.exceptions import PatreonException
|
||||
from club import features
|
||||
|
@ -255,96 +253,6 @@ class ViewEmailLoginCodeTests(TestCase):
|
|||
self.assertFalse(self.client.is_authorised())
|
||||
self.assertFalse(User.objects.get(id=self.new_user.id).is_email_verified)
|
||||
|
||||
|
||||
class ViewExternalLoginTests(TestCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
# Set up data for the whole TestCase
|
||||
cls.new_user: User = User.objects.create(
|
||||
email="testemail@xx.com",
|
||||
membership_started_at=datetime.now() - timedelta(days=5),
|
||||
membership_expires_at=datetime.now() + timedelta(days=5),
|
||||
slug="ujlbu4"
|
||||
)
|
||||
|
||||
cls.app: Apps = Apps.objects.create(
|
||||
id="test",
|
||||
name="test",
|
||||
jwt_secret=JWT_STUB_VALUES.JWT_PRIVATE_KEY,
|
||||
jwt_algorithm="RS256",
|
||||
jwt_expire_hours=1,
|
||||
redirect_urls=["https://some-page"],
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
self.client = HelperClient()
|
||||
|
||||
def test_successful_flat_redirect(self):
|
||||
# given
|
||||
self.client = HelperClient(user=self.new_user)
|
||||
self.client.authorise()
|
||||
|
||||
# when
|
||||
response = self.client.get(
|
||||
reverse("external_login"),
|
||||
data={
|
||||
"redirect": "https://some-page",
|
||||
"app_id": "test"
|
||||
}
|
||||
)
|
||||
|
||||
# then
|
||||
self.assertRegex(text=urljoin(response.request["PATH_INFO"], response.url),
|
||||
expected_regex="https://some-page\?jwt=.*")
|
||||
|
||||
# check jwt
|
||||
url_params = response.url.split("?")[1]
|
||||
jwt_str = url_params.split("=")[1]
|
||||
payload = jwt.decode(jwt_str, algorithms=["RS256"], options={"verify_signature": False})
|
||||
self.assertIsNotNone(payload)
|
||||
self.assertEqual(payload["user_slug"], self.new_user.slug)
|
||||
self.assertEqual(payload["user_name"], self.new_user.full_name)
|
||||
self.assertIsNotNone(payload["exp"])
|
||||
|
||||
def test_successful_redirect_with_query_params(self):
|
||||
# given
|
||||
self.client = HelperClient(user=self.new_user)
|
||||
self.client.authorise()
|
||||
|
||||
# when
|
||||
response = self.client.get(
|
||||
reverse("external_login"),
|
||||
data={
|
||||
"redirect": "https://some-page?param1=value1",
|
||||
"app_id": "test"
|
||||
}
|
||||
)
|
||||
|
||||
# then
|
||||
self.assertRegex(text=urljoin(response.request["PATH_INFO"], response.url),
|
||||
expected_regex="https://some-page\?param1=value1&jwt=.*")
|
||||
|
||||
def test_param_wrong_app_id(self):
|
||||
self.client = HelperClient(user=self.new_user)
|
||||
self.client.authorise()
|
||||
response = self.client.get(reverse("external_login"), data={"app_id": "UNKNOWN", "redirect": "https://some-page"})
|
||||
self.assertContains(response=response, text="Неизвестное приложение, проверьте параметр ?app_id", status_code=400)
|
||||
|
||||
def test_param_redirect_absent(self):
|
||||
self.client = HelperClient(user=self.new_user)
|
||||
self.client.authorise()
|
||||
response = self.client.get(reverse("external_login"), data={"app_id": "test"})
|
||||
self.assertContains(response=response, text="Нужен параметр ?redirect", status_code=400)
|
||||
|
||||
def test_user_is_unauthorised(self):
|
||||
response = self.client.get(reverse("external_login"), data={"redirect": "some-page", "app_id": "test"})
|
||||
self.assertRedirects(response=response,
|
||||
expected_url="/auth/login/?goto=%2Fauth%2Fexternal%2F%3Fredirect%3Dsome-page",
|
||||
fetch_redirect_response=False)
|
||||
|
||||
self.assertFalse(self.client.is_authorised())
|
||||
|
||||
|
||||
@unittest.skipIf(not features.PATREON_AUTH_ENABLED, reason="Patreon auth was disabled")
|
||||
class ViewPatreonLoginTests(TestCase):
|
||||
@classmethod
|
||||
|
|
|
@ -9,7 +9,6 @@ from authn.views.apps import list_apps, create_app, edit_app, delete_app
|
|||
from authn.views.auth import login, logout, join
|
||||
from authn.views.debug import debug_dev_login, debug_random_login, debug_login
|
||||
from authn.views.email import email_login, email_login_code
|
||||
from authn.views.external import external_login
|
||||
from authn.views.openid import openid_authorize, openid_issue_token, openid_revoke_token, \
|
||||
openid_well_known_configuration, openid_well_known_jwks
|
||||
from authn.views.patreon import patreon_sync, patreon_sync_callback
|
||||
|
@ -78,7 +77,6 @@ urlpatterns = [
|
|||
path("auth/patreon_callback/", patreon_sync_callback, name="patreon_sync_callback"),
|
||||
path("auth/email/", email_login, name="email_login"),
|
||||
path("auth/email/code/", email_login_code, name="email_login_code"),
|
||||
path("auth/external/", external_login, name="external_login"),
|
||||
|
||||
path("auth/openid/authorize", openid_authorize, name="openid_authorize"),
|
||||
path("auth/openid/token", openid_issue_token, name="openid_issue_token"),
|
||||
|
|
Loading…
Reference in New Issue