[tests] Add auth tests (#371)
* refactor(auth code): make inclusive comparison for ratelimit check * test(auth code): add tests for issue auth codes * ci: run tests in github actions * test(auth code): add check_code tests * chore(auth code): add data types hints * test(auth.views.auth): add join tests * ci(tests): run builed frontend (views tests required it) * docs: add how to run tests section * test(auth.views.auth): add login/logout views tests with addition test utils and debug methods * test(auth.views.auth): add tests for debug login (dev and random) * ci(tests): fix ebabling debug settings for tests * ci(fix): up redis for tests * test(debug.utils): add test for test-utils * docs(tests): update postgres section * test: initiate login-email draft tests * test: draft cluser_q check tasks * test: extends email_login cases * test: add email_login_code view tests * style: remove debug code * test: add external_login tests * test: add patreon_login view test * tests(auth-patreon): implement patreon auth tests * test(auth-patreon): add view patreon auth tests * test(auth-patreon): add parse_active_membership unit tests * test(auth): divide tests by modules * docs(tests): update readme * style(test): remove unused code * docs(tests): replace TBD stubs with real refs
This commit is contained in:
parent
9ee72a7ab9
commit
f7e5250af6
|
@ -0,0 +1,11 @@
|
|||
.git/
|
||||
node_modules/
|
||||
|
||||
.editorconfig
|
||||
.gitignore
|
||||
|
||||
CHANGELOG.md
|
||||
README.md
|
||||
|
||||
*.tmp
|
||||
*.log
|
|
@ -5,22 +5,45 @@ on: [pull_request]
|
|||
jobs:
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@master
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: '3.8'
|
||||
architecture: 'x64'
|
||||
- name: Install requirements
|
||||
run: |
|
||||
pip install --no-cache-dir flake8
|
||||
- name: run flake8
|
||||
run: |
|
||||
# stop the build if there are Python syntax errors or undefined names
|
||||
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
|
||||
# exit-zero treats all errors as warnings.
|
||||
flake8 . --count --exit-zero --statistics
|
||||
- uses: actions/checkout@master
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: '3.8'
|
||||
architecture: 'x64'
|
||||
- name: Install requirements
|
||||
run: |
|
||||
pip install --no-cache-dir flake8
|
||||
- name: run flake8
|
||||
run: |
|
||||
# stop the build if there are Python syntax errors or undefined names
|
||||
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
|
||||
# exit-zero treats all errors as warnings.
|
||||
flake8 . --count --exit-zero --statistics
|
||||
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@master
|
||||
- name: Build image
|
||||
run: |
|
||||
docker-compose -f docker-compose.test.yml build
|
||||
- name: Run postgres
|
||||
run: |
|
||||
docker-compose -f docker-compose.test.yml up -d postgres
|
||||
- name: Run redis
|
||||
run: |
|
||||
docker-compose -f docker-compose.test.yml up -d redis
|
||||
- name: Run frontend
|
||||
run: |
|
||||
docker-compose -f docker-compose.test.yml up -d webpack
|
||||
- name: Wait postgres
|
||||
uses: jakejarvis/wait-action@master
|
||||
with:
|
||||
time: '20s'
|
||||
- name: Run tests
|
||||
run: |
|
||||
docker-compose -f docker-compose.test.yml run --rm tests
|
||||
|
||||
|
||||
dockerize:
|
||||
|
|
|
@ -10,10 +10,10 @@ jobs:
|
|||
name: Deploy
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Pull new code and restart server
|
||||
uses: appleboy/ssh-action@master
|
||||
with:
|
||||
host: ${{ secrets.PRODUCTION_SSH_HOST }}
|
||||
username: ${{ secrets.PRODUCTION_SSH_USERNAME }}
|
||||
key: ${{ secrets.PRODUCTION_SSH_KEY }}
|
||||
script: cd vas3k.club && git pull && make redeploy
|
||||
- name: Pull new code and restart server
|
||||
uses: appleboy/ssh-action@master
|
||||
with:
|
||||
host: ${{ secrets.PRODUCTION_SSH_HOST }}
|
||||
username: ${{ secrets.PRODUCTION_SSH_USERNAME }}
|
||||
key: ${{ secrets.PRODUCTION_SSH_KEY }}
|
||||
script: cd vas3k.club && git pull && make redeploy
|
||||
|
|
6
Makefile
6
Makefile
|
@ -50,7 +50,11 @@ docker-migrate:
|
|||
build-frontend: ## Runs webpack
|
||||
npm run --prefix frontend build
|
||||
|
||||
test-ci: lint ## Run tests (intended for CI usage)
|
||||
test:
|
||||
pipenv run python3 manage.py test
|
||||
|
||||
test-ci: ## Run tests (intended for CI usage)
|
||||
python3 manage.py test
|
||||
|
||||
psql:
|
||||
psql -h localhost -p 5433 -d vas3k_club -U vas3k
|
||||
|
|
73
README.md
73
README.md
|
@ -136,6 +136,79 @@ $ docker-compose -f docker-compose.yml up redis
|
|||
```
|
||||
|
||||
|
||||
#### Tests
|
||||
|
||||
##### Prerequisites
|
||||
Consider next required conditions for running tests :
|
||||
- **venv**
|
||||
|
||||
Don't forget to run it under configured venv. Look [setup venv](#setup-venv) how to configure venv
|
||||
- **postgres**
|
||||
|
||||
Due to tests make database queries the local postgres should be running.
|
||||
|
||||
Run postgres:
|
||||
```sh
|
||||
$ docker-compose -f docker-compose.yml up -d postgres
|
||||
```
|
||||
For first time run migrations (it needs only for fresh images)
|
||||
```sh
|
||||
(venv) $ ./manage.py migrate
|
||||
```
|
||||
- **redis**
|
||||
Run redis:
|
||||
```sh
|
||||
$ docker-compose -f docker-compose.yml up -d redis
|
||||
```
|
||||
- build **frontend**
|
||||
|
||||
For [views tests](https://docs.djangoproject.com/en/3.1/intro/tutorial05/#a-test-for-a-view) its essential to build our frontend upfront.
|
||||
Hot to build front look [setup-frontend](#setup-frontend) section, for now just run next commands:
|
||||
```sh
|
||||
$ cd frontend
|
||||
$ npm ci # or npm install
|
||||
$ npm run build
|
||||
```
|
||||
Above commands will create [required `webpack-stats.json`](https://github.com/vas3k/vas3k.club/blob/6f1812f36b546feba2bd729ac84011e20e237136/club/settings.py#L228) file
|
||||
- test environment variables
|
||||
```dotenv
|
||||
DJANGO_SETTINGS_MODULE=club.settings;
|
||||
PYTHONUNBUFFERED=1;
|
||||
TESTS_RUN=da
|
||||
POSTGRES_DB=vas3k_club
|
||||
POSTGRES_USER=postgres
|
||||
POSTGRES_PASSWORD=postgres
|
||||
POSTGRES_HOST=localhost
|
||||
REDIS_DB=0
|
||||
REDIS_HOST=localhost
|
||||
```
|
||||
##### Run tests
|
||||
Basically tests automatically runs in CI in opened PR, but if you want to run tests **locally** there are few ways to do it
|
||||
1. virgin shell
|
||||
```sh
|
||||
$ make test
|
||||
```
|
||||
2. venv shell
|
||||
```sh
|
||||
$ source {your-venv-folder}/bin/activate
|
||||
(venv) $ ./manage.py test
|
||||
```
|
||||
(^*don't forget inject test env variables*)
|
||||
3. pycharm *profession edition*
|
||||
Use `django tests` template out of the box
|
||||
4. pycharm *common edition*
|
||||
- Make sure you have set `Unittest` as default test runner: Settings --> Tools --> Python Integrated Tools --> Default Test Runner: Unittests
|
||||
![Default Test Runner](_docs/images/pycharm-ce.settings.default-test-runner.png)
|
||||
- In Run/Debug Configuration put environment variables from [prerequisites](#Prerequisites)
|
||||
![Test template](_docs/images/pycharm-ce.debug-run-configurations.template.png)
|
||||
- For workaround *"django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet."* add this lines to test file before importing models:
|
||||
```python
|
||||
import django
|
||||
django.setup()
|
||||
```
|
||||
|
||||
For more information about testing in django look well written [official documentation](https://docs.djangoproject.com/en/3.1/topics/testing/overview/)
|
||||
|
||||
## 🛠 Tech stack
|
||||
|
||||
👨💻 **TL;DR: Django, Postgres, Redis, Vue.js, Webpack**
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 54 KiB |
Binary file not shown.
After Width: | Height: | Size: 11 KiB |
|
@ -64,13 +64,13 @@ class Code(models.Model):
|
|||
ordering = ["-created_at"]
|
||||
|
||||
@classmethod
|
||||
def create_for_user(cls, user, recipient, length=6):
|
||||
def create_for_user(cls, user: User, recipient: str, length=6):
|
||||
recipient = recipient.lower()
|
||||
last_codes_count = Code.objects.filter(
|
||||
recipient=recipient,
|
||||
created_at__gte=datetime.utcnow() - settings.AUTH_MAX_CODE_TIMEDELTA,
|
||||
).count()
|
||||
if last_codes_count > settings.AUTH_MAX_CODE_COUNT:
|
||||
if last_codes_count >= settings.AUTH_MAX_CODE_COUNT:
|
||||
raise RateLimitException(title="Вы запросили слишком много кодов", message="Подождите немного")
|
||||
|
||||
return Code.objects.create(
|
||||
|
@ -82,7 +82,7 @@ class Code(models.Model):
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def check_code(cls, recipient, code):
|
||||
def check_code(cls, recipient: str, code: str) -> User:
|
||||
recipient = recipient.lower()
|
||||
last_code = Code.objects.filter(recipient=recipient).order_by("-created_at").first()
|
||||
if not last_code:
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
import django
|
||||
from django.test import SimpleTestCase
|
||||
|
||||
django.setup() # todo: how to run tests from PyCharm without this workaround?
|
||||
|
||||
from auth.providers.common import Membership
|
||||
from auth.providers.patreon import parse_active_membership
|
||||
|
||||
|
||||
class UnitTestsParseActiveMembership(SimpleTestCase):
|
||||
def setUp(self):
|
||||
self.stub_patreon_response_oauth_identity = {
|
||||
"data": {
|
||||
"attributes": {
|
||||
"about": "A Patreon Platform User",
|
||||
"created": "2018-04-01T00:36:26+00:00",
|
||||
"email": "user-email@email.com",
|
||||
"first_name": "Firstname",
|
||||
"full_name": "FullName With Space",
|
||||
"image_url": "https://url.example",
|
||||
"last_name": "Lastname",
|
||||
"social_connections": {
|
||||
"deviantart": None,
|
||||
"discord": None,
|
||||
"facebook": None,
|
||||
"reddit": None,
|
||||
"spotify": None,
|
||||
"twitch": None,
|
||||
"twitter": {
|
||||
"user_id": "12345"
|
||||
},
|
||||
"youtube": None
|
||||
},
|
||||
"thumb_url": "https://url.example",
|
||||
"url": "https://www.patreon.com/example",
|
||||
"vanity": "platform"
|
||||
},
|
||||
"id": "12345689",
|
||||
"type": "user"
|
||||
},
|
||||
"included": [
|
||||
{
|
||||
"attributes": {
|
||||
"full_name": "Member FullName",
|
||||
"email": "member-email@email.com",
|
||||
"is_follower": False,
|
||||
"last_charge_date": "2018-04-01T21:28:06+00:00",
|
||||
"last_charge_status": "Paid",
|
||||
"lifetime_support_cents": 400,
|
||||
"patron_status": "active_patron",
|
||||
"currently_entitled_amount_cents": 100,
|
||||
"pledge_relationship_start": "2018-04-01T16:33:27.861405+00:00",
|
||||
"will_pay_amount_cents": 100
|
||||
},
|
||||
"id": "03ca69c3-ebea-4b9a-8fac-e4a837873254",
|
||||
"type": "member"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
def test_successful_parsed(self):
|
||||
result: Membership = parse_active_membership(self.stub_patreon_response_oauth_identity)
|
||||
|
||||
self.assertIsNotNone(result)
|
||||
self.assertTrue(isinstance(result, Membership))
|
||||
|
||||
self.assertEqual(result.platform, "patreon")
|
||||
self.assertEqual(result.user_id, "12345689")
|
||||
self.assertEqual(result.full_name, "FullName With Space")
|
||||
self.assertEqual(result.email, "user-email@email.com")
|
||||
self.assertEqual(result.image, None)
|
||||
self.assertEqual(result.started_at, datetime(2018, 4, 1, 0, 0))
|
||||
self.assertEqual(result.expires_at, datetime(2018, 5, 16, 0, 0))
|
||||
self.assertEqual(result.lifetime_support_cents, 400)
|
||||
self.assertEqual(result.currently_entitled_amount_cents, 100)
|
||||
|
||||
def test_successful_god_id(self):
|
||||
with self.settings(PATREON_GOD_IDS=['12345689']):
|
||||
result: Membership = parse_active_membership(self.stub_patreon_response_oauth_identity)
|
||||
|
||||
self.assertIsNotNone(result)
|
||||
self.assertTrue(isinstance(result, Membership))
|
||||
|
||||
self.assertEqual(result.platform, "patreon")
|
||||
self.assertEqual(result.user_id, "12345689")
|
||||
self.assertEqual(result.full_name, "FullName With Space")
|
||||
self.assertEqual(result.email, "user-email@email.com")
|
||||
self.assertEqual(result.image, 'https://url.example')
|
||||
|
||||
self.assertGreaterEqual(result.started_at, datetime.utcnow() - timedelta(seconds=5))
|
||||
self.assertLessEqual(result.started_at, datetime.utcnow() + timedelta(seconds=5))
|
||||
|
||||
self.assertGreaterEqual(result.expires_at,
|
||||
datetime.utcnow() + timedelta(days=100 * 365) - timedelta(seconds=5))
|
||||
self.assertLessEqual(result.expires_at,
|
||||
datetime.utcnow() + timedelta(days=100 * 365) + timedelta(seconds=5))
|
||||
|
||||
self.assertEqual(result.lifetime_support_cents, -1)
|
||||
self.assertEqual(result.currently_entitled_amount_cents, 0)
|
||||
|
||||
def test_wrong_data(self):
|
||||
result = parse_active_membership({})
|
||||
self.assertIsNone(result)
|
||||
|
||||
result = parse_active_membership({"data": {}}) # no included
|
||||
self.assertIsNone(result)
|
||||
|
||||
result = parse_active_membership({"included": {}}) # no data
|
||||
self.assertIsNone(result)
|
|
@ -0,0 +1,125 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
import django
|
||||
from django.conf import settings
|
||||
from django.test import TestCase
|
||||
|
||||
django.setup() # todo: how to run tests from PyCharm without this workaround?
|
||||
|
||||
from auth.models import Code
|
||||
from club.exceptions import RateLimitException, InvalidCode
|
||||
from users.models.user import User
|
||||
|
||||
|
||||
class ModelCodeTests(TestCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
# Set up data for the whole TestCase
|
||||
cls.new_user: User = User.objects.create(
|
||||
email="testemail@xx.com",
|
||||
membership_started_at=datetime.now() - timedelta(days=5),
|
||||
membership_expires_at=datetime.now() + timedelta(days=5),
|
||||
)
|
||||
|
||||
def test_create_code_positive(self):
|
||||
recipient = "success@a.com"
|
||||
|
||||
code = Code.create_for_user(user=self.new_user, recipient=recipient, length=settings.AUTH_CODE_LENGTH)
|
||||
self.assertEqual(code.recipient, recipient)
|
||||
self.assertEqual(self.new_user.id, code.user_id)
|
||||
self.assertEqual(len(code.code), settings.AUTH_CODE_LENGTH)
|
||||
self.assertAlmostEqual(code.expires_at.second, (datetime.utcnow() + timedelta(minutes=15)).second, delta=5)
|
||||
|
||||
def test_create_code_ratelimit(self):
|
||||
recipient = "ratelimit@a.com"
|
||||
|
||||
# override the AUTH_MAX_CODE_TIMEDELTA setting
|
||||
with self.settings(AUTH_MAX_CODE_COUNT=1):
|
||||
code = Code.create_for_user(user=self.new_user, recipient=recipient, length=settings.AUTH_CODE_LENGTH)
|
||||
self.assertEqual(len(code.code), settings.AUTH_CODE_LENGTH)
|
||||
|
||||
# second attempt should rise exception
|
||||
with self.assertRaises(RateLimitException):
|
||||
Code.create_for_user(user=self.new_user, recipient=recipient)
|
||||
|
||||
def test_create_code_reset_ratelimit(self):
|
||||
recipient = "ratelimit@a.com"
|
||||
|
||||
with self.settings(AUTH_MAX_CODE_COUNT=1):
|
||||
code = Code.create_for_user(user=self.new_user, recipient=recipient, length=settings.AUTH_CODE_LENGTH)
|
||||
self.assertEqual(len(code.code), settings.AUTH_CODE_LENGTH)
|
||||
|
||||
# move creation time to deep enough past
|
||||
code.created_at = datetime.utcnow() - settings.AUTH_MAX_CODE_TIMEDELTA - timedelta(seconds=1)
|
||||
code.save()
|
||||
|
||||
# no exception raises
|
||||
code = Code.create_for_user(user=self.new_user, recipient=recipient)
|
||||
self.assertEqual(len(code.code), settings.AUTH_CODE_LENGTH)
|
||||
|
||||
def test_check_code_positive(self):
|
||||
recipient = "success@a.com"
|
||||
code = Code.create_for_user(user=self.new_user, recipient=recipient, length=settings.AUTH_CODE_LENGTH)
|
||||
|
||||
user = Code.check_code(recipient=recipient, code=code.code)
|
||||
|
||||
self.assertEqual(user.id, self.new_user.id)
|
||||
|
||||
def test_check_code_which_is_incorrect(self):
|
||||
with self.assertRaises(InvalidCode):
|
||||
Code.check_code(recipient="failed@xxx.com", code="failed")
|
||||
|
||||
def test_check_code_twice(self):
|
||||
recipient = "success@a.com"
|
||||
code = Code.create_for_user(user=self.new_user, recipient=recipient, length=settings.AUTH_CODE_LENGTH)
|
||||
Code.check_code(recipient=recipient, code=code.code) # activate first time
|
||||
|
||||
with self.assertRaises(InvalidCode):
|
||||
Code.check_code(recipient=recipient, code=code.code)
|
||||
|
||||
def test_check_code_which_is_not_last_one(self):
|
||||
# issue few codes
|
||||
recipient = "fewcodes@a.com"
|
||||
code1: Code = Code.create_for_user(user=self.new_user, recipient=recipient, length=settings.AUTH_CODE_LENGTH)
|
||||
code2: Code = Code.create_for_user(user=self.new_user, recipient=recipient, length=settings.AUTH_CODE_LENGTH)
|
||||
# for stability test runs
|
||||
code2.created_at -= timedelta(seconds=1)
|
||||
code2.save()
|
||||
|
||||
with self.assertRaises(InvalidCode):
|
||||
Code.check_code(recipient=recipient, code=code2.code)
|
||||
|
||||
# first one is successful
|
||||
user = Code.check_code(recipient=recipient, code=code1.code)
|
||||
self.assertEqual(user.id, self.new_user.id)
|
||||
|
||||
def test_check_code_which_is_for_other_user(self):
|
||||
recipient_right = "true-user@a.com"
|
||||
recipient_wrong = "wrong-user@x.com"
|
||||
code = Code.create_for_user(user=self.new_user, recipient=recipient_right, length=settings.AUTH_CODE_LENGTH)
|
||||
|
||||
with self.assertRaises(InvalidCode):
|
||||
Code.check_code(recipient=recipient_wrong, code=code.code)
|
||||
|
||||
def test_check_code_when_exceeded_attempts_count(self):
|
||||
recipient = "exceeded_attemts@a.com"
|
||||
code = Code.create_for_user(user=self.new_user, recipient=recipient, length=settings.AUTH_CODE_LENGTH)
|
||||
|
||||
# override the AUTH_MAX_CODE_TIMEDELTA setting
|
||||
with self.settings(AUTH_MAX_CODE_ATTEMPTS=1):
|
||||
# first attempt
|
||||
with self.assertRaises(InvalidCode):
|
||||
Code.check_code(recipient=recipient, code="wrong_attempt")
|
||||
|
||||
# second attempt should rise ratelimit exception
|
||||
with self.assertRaises(RateLimitException):
|
||||
Code.check_code(recipient=recipient, code=code.code)
|
||||
|
||||
def test_check_code_which_is_expired(self):
|
||||
recipient = "expired@a.com"
|
||||
code = Code.create_for_user(user=self.new_user, recipient=recipient, length=settings.AUTH_CODE_LENGTH)
|
||||
code.expires_at = datetime.utcnow() - timedelta(seconds=1)
|
||||
code.save()
|
||||
|
||||
with self.assertRaises(InvalidCode):
|
||||
Code.check_code(recipient=recipient, code=code.code)
|
|
@ -42,7 +42,7 @@ def logout(request):
|
|||
|
||||
|
||||
def debug_dev_login(request):
|
||||
if not settings.DEBUG:
|
||||
if not (settings.DEBUG or settings.TESTS_RUN):
|
||||
raise AccessDenied(title="Эта фича доступна только при DEBUG=true")
|
||||
|
||||
user, is_created = User.objects.get_or_create(
|
||||
|
@ -74,7 +74,7 @@ def debug_dev_login(request):
|
|||
|
||||
|
||||
def debug_random_login(request):
|
||||
if not settings.DEBUG:
|
||||
if not (settings.DEBUG or settings.TESTS_RUN):
|
||||
raise AccessDenied(title="Эта фича доступна только при DEBUG=true")
|
||||
|
||||
slug = "random_" + random_string()
|
||||
|
|
|
@ -0,0 +1,490 @@
|
|||
from datetime import datetime, timedelta
|
||||
import json
|
||||
from urllib.parse import urljoin
|
||||
import uuid
|
||||
|
||||
import django
|
||||
from django.test import Client, TestCase
|
||||
from django.urls import reverse
|
||||
from django.http.response import HttpResponseNotAllowed, HttpResponseBadRequest
|
||||
from django_q import brokers
|
||||
from django_q.signing import SignedPackage
|
||||
import jwt
|
||||
from unittest import skip
|
||||
from unittest.mock import patch
|
||||
|
||||
django.setup() # todo: how to run tests from PyCharm without this workaround?
|
||||
|
||||
from auth.models import Code, Session
|
||||
from auth.providers.common import Membership, Platform
|
||||
from auth.exceptions import PatreonException
|
||||
from users.models.user import User
|
||||
|
||||
|
||||
class HelperClient(Client):
|
||||
|
||||
def __init__(self, user=None):
|
||||
super(HelperClient, self).__init__()
|
||||
self.user = user
|
||||
|
||||
def authorise(self):
|
||||
if not self.user:
|
||||
raise ValueError('Missed `user` property to use this method')
|
||||
|
||||
session = Session.create_for_user(self.user)
|
||||
self.cookies["token"] = session.token
|
||||
self.cookies["token"]["expires"] = datetime.utcnow() + timedelta(days=30)
|
||||
self.cookies["token"]['httponly'] = True
|
||||
self.cookies["token"]['secure'] = True
|
||||
|
||||
return self
|
||||
|
||||
@staticmethod
|
||||
def is_response_contain(response, text):
|
||||
content = response.content
|
||||
if not isinstance(text, bytes):
|
||||
text = str(text)
|
||||
content = content.decode(response.charset)
|
||||
|
||||
real_count = content.count(text)
|
||||
return real_count != 0
|
||||
|
||||
def is_authorised(self) -> bool:
|
||||
response = self.get(reverse('debug_api_me'))
|
||||
content = response.content.decode(response.charset)
|
||||
content_dict = json.loads(content)
|
||||
|
||||
return content_dict["is_authorised"]
|
||||
|
||||
def print_me(self):
|
||||
response = self.get(reverse('debug_api_me'))
|
||||
content = response.content.decode(response.charset)
|
||||
content_dict = json.loads(content)
|
||||
|
||||
return content_dict
|
||||
|
||||
@staticmethod
|
||||
def is_access_denied(response):
|
||||
return HelperClient.is_response_contain(response, text="Эта страница доступна только участникам Клуба")
|
||||
|
||||
|
||||
class ViewsAuthTests(TestCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
# Set up data for the whole TestCase
|
||||
cls.new_user: User = User.objects.create(
|
||||
email="testemail@xx.com",
|
||||
membership_started_at=datetime.now() - timedelta(days=5),
|
||||
membership_expires_at=datetime.now() + timedelta(days=5),
|
||||
slug="ujlbu4"
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
self.client = HelperClient(user=self.new_user)
|
||||
|
||||
def test_join_anonymous(self):
|
||||
response = self.client.get(reverse('join'))
|
||||
# check auth/join.html is rendered
|
||||
self.assertContains(response=response, text="Всегда рады новым членам", status_code=200)
|
||||
|
||||
def test_join_authorised(self):
|
||||
self.client.authorise()
|
||||
|
||||
response = self.client.get(reverse('join'))
|
||||
self.assertRedirects(response=response, expected_url=f'/user/{self.new_user.slug}/',
|
||||
fetch_redirect_response=False)
|
||||
|
||||
def test_login_anonymous(self):
|
||||
response = self.client.get(reverse('login'))
|
||||
# check auth/join.html is rendered
|
||||
self.assertContains(response=response, text="Вход по почте или нику", status_code=200)
|
||||
|
||||
def test_login_authorised(self):
|
||||
self.client.authorise()
|
||||
|
||||
response = self.client.get(reverse('login'))
|
||||
self.assertRedirects(response=response, expected_url=f'/user/{self.new_user.slug}/',
|
||||
fetch_redirect_response=False)
|
||||
|
||||
def test_logout_success(self):
|
||||
self.client.authorise()
|
||||
|
||||
response = self.client.post(reverse('logout'))
|
||||
|
||||
self.assertRedirects(response=response, expected_url=f'/', fetch_redirect_response=False)
|
||||
self.assertFalse(self.client.is_authorised())
|
||||
|
||||
def test_logout_unauthorised(self):
|
||||
response = self.client.post(reverse('logout'))
|
||||
self.assertTrue(self.client.is_access_denied(response))
|
||||
|
||||
def test_logout_wrong_method(self):
|
||||
self.client.authorise()
|
||||
|
||||
response = self.client.get(reverse('logout'))
|
||||
self.assertEqual(response.status_code, HttpResponseNotAllowed.status_code)
|
||||
|
||||
response = self.client.put(reverse('logout'))
|
||||
self.assertEqual(response.status_code, HttpResponseNotAllowed.status_code)
|
||||
|
||||
response = self.client.delete(reverse('logout'))
|
||||
self.assertEqual(response.status_code, HttpResponseNotAllowed.status_code)
|
||||
|
||||
def test_debug_dev_login_unauthorised(self):
|
||||
response = self.client.post(reverse('debug_dev_login'))
|
||||
self.assertTrue(self.client.is_authorised())
|
||||
|
||||
me = self.client.print_me()
|
||||
self.assertIsNotNone(me['id'])
|
||||
self.assertEqual(me['email'], 'dev@dev.dev')
|
||||
self.assertTrue(me['is_email_verified'])
|
||||
self.assertTrue(me['slug'], 'dev')
|
||||
self.assertEqual(me['moderation_status'], 'approved')
|
||||
self.assertEqual(me['roles'], ['god'])
|
||||
# todo: check created post (intro)
|
||||
|
||||
def test_debug_dev_login_authorised(self):
|
||||
self.client.authorise()
|
||||
|
||||
response = self.client.post(reverse('debug_dev_login'))
|
||||
self.assertTrue(self.client.is_authorised())
|
||||
|
||||
me = self.client.print_me()
|
||||
self.assertTrue(me['slug'], self.new_user.slug)
|
||||
|
||||
def test_debug_random_login_unauthorised(self):
|
||||
response = self.client.post(reverse('debug_random_login'))
|
||||
self.assertTrue(self.client.is_authorised())
|
||||
|
||||
me = self.client.print_me()
|
||||
self.assertIsNotNone(me['id'])
|
||||
self.assertIn('@random.dev', me['email'])
|
||||
self.assertTrue(me['is_email_verified'])
|
||||
self.assertEqual(me['moderation_status'], 'approved')
|
||||
self.assertEqual(me['roles'], [])
|
||||
# todo: check created post (intro)
|
||||
|
||||
|
||||
class ViewEmailLoginTests(TestCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
# Set up data for the whole TestCase
|
||||
cls.new_user: User = User.objects.create(
|
||||
email="testemail@xx.com",
|
||||
membership_started_at=datetime.now() - timedelta(days=5),
|
||||
membership_expires_at=datetime.now() + timedelta(days=5),
|
||||
slug="ujlbu4"
|
||||
)
|
||||
|
||||
cls.broker = brokers.get_broker()
|
||||
cls.assertTrue(cls.broker.ping(), 'broker is not available')
|
||||
|
||||
def setUp(self):
|
||||
self.client = HelperClient(user=self.new_user)
|
||||
|
||||
self.broker.purge_queue()
|
||||
|
||||
def test_login_by_email_positive(self):
|
||||
# when
|
||||
response = self.client.post(reverse('email_login'),
|
||||
data={'email_or_login': self.new_user.email, })
|
||||
|
||||
# then
|
||||
self.assertContains(response=response, text="Вам отправлен код!", status_code=200)
|
||||
issued_code = Code.objects.filter(recipient=self.new_user.email).get()
|
||||
self.assertIsNotNone(issued_code)
|
||||
|
||||
# check email was sent
|
||||
packages = self.broker.dequeue()
|
||||
task_signed = packages[0][1]
|
||||
task = SignedPackage.loads(task_signed)
|
||||
self.assertEqual(task['func'].__name__, 'send_auth_email')
|
||||
self.assertEqual(task['args'][0].id, self.new_user.id)
|
||||
self.assertEqual(task['args'][1].id, issued_code.id)
|
||||
|
||||
# check notify wast sent
|
||||
packages = self.broker.dequeue()
|
||||
task_signed = packages[0][1]
|
||||
task = SignedPackage.loads(task_signed)
|
||||
self.assertEqual(task['func'].__name__, 'notify_user_auth')
|
||||
self.assertEqual(task['args'][0].id, self.new_user.id)
|
||||
self.assertEqual(task['args'][1].id, issued_code.id)
|
||||
|
||||
# it's not yet authorised, only code was sent
|
||||
self.assertFalse(self.client.is_authorised())
|
||||
|
||||
def test_login_user_not_exist(self):
|
||||
response = self.client.post(reverse('email_login'),
|
||||
data={'email_or_login': 'not-existed@user.com', })
|
||||
self.assertContains(response=response, text="Такого юзера нет 🤔", status_code=200)
|
||||
|
||||
def test_secret_hash_login(self):
|
||||
response = self.client.post(reverse('email_login'),
|
||||
data={'email_or_login': self.new_user.secret_auth_code, })
|
||||
|
||||
self.assertRedirects(response=response, expected_url=f'/user/{self.new_user.slug}/',
|
||||
fetch_redirect_response=False)
|
||||
self.assertTrue(self.client.is_authorised())
|
||||
|
||||
def test_secret_hash_user_not_exist(self):
|
||||
response = self.client.post(reverse('email_login'),
|
||||
data={'email_or_login': 'not-existed@user.com|-xxx', })
|
||||
self.assertContains(response=response, text="Такого юзера нет 🤔", status_code=200)
|
||||
|
||||
@skip("todo")
|
||||
def test_secret_hash_cancel_user_deletion(self):
|
||||
# todo: mark user as deleted
|
||||
self.assertTrue(False)
|
||||
|
||||
def test_email_login_missed_input_data(self):
|
||||
response = self.client.post(reverse('email_login'), data={})
|
||||
self.assertRedirects(response=response, expected_url=f'/auth/login/',
|
||||
fetch_redirect_response=False)
|
||||
|
||||
def test_email_login_wrong_method(self):
|
||||
response = self.client.get(reverse('email_login'))
|
||||
self.assertRedirects(response=response, expected_url=f'/auth/login/',
|
||||
fetch_redirect_response=False)
|
||||
|
||||
response = self.client.put(reverse('email_login'))
|
||||
self.assertRedirects(response=response, expected_url=f'/auth/login/',
|
||||
fetch_redirect_response=False)
|
||||
|
||||
response = self.client.delete(reverse('email_login'))
|
||||
self.assertRedirects(response=response, expected_url=f'/auth/login/',
|
||||
fetch_redirect_response=False)
|
||||
|
||||
|
||||
class ViewEmailLoginCodeTests(TestCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
# Set up data for the whole TestCase
|
||||
cls.new_user: User = User.objects.create(
|
||||
email="testemail@xx.com",
|
||||
membership_started_at=datetime.now() - timedelta(days=5),
|
||||
membership_expires_at=datetime.now() + timedelta(days=5),
|
||||
slug="ujlbu4"
|
||||
)
|
||||
cls.code = Code.create_for_user(user=cls.new_user, recipient=cls.new_user.email)
|
||||
|
||||
def setUp(self):
|
||||
self.client = HelperClient(user=self.new_user)
|
||||
|
||||
def test_correct_code(self):
|
||||
# given
|
||||
# email is not verified yet
|
||||
self.assertFalse(User.objects.get(id=self.new_user.id).is_email_verified)
|
||||
|
||||
# when
|
||||
response = self.client.get(reverse('email_login_code'),
|
||||
data={'email': self.new_user.email, 'code': self.code.code})
|
||||
|
||||
self.assertRedirects(response=response, expected_url=f'/user/{self.new_user.slug}/',
|
||||
fetch_redirect_response=False)
|
||||
self.assertTrue(self.client.is_authorised())
|
||||
self.assertTrue(User.objects.get(id=self.new_user.id).is_email_verified)
|
||||
|
||||
def test_empty_params(self):
|
||||
response = self.client.get(reverse('email_login_code'), data={})
|
||||
self.assertRedirects(response=response, expected_url=f'/auth/login/',
|
||||
fetch_redirect_response=False)
|
||||
self.assertFalse(self.client.is_authorised())
|
||||
self.assertFalse(User.objects.get(id=self.new_user.id).is_email_verified)
|
||||
|
||||
def test_wrong_code(self):
|
||||
response = self.client.get(reverse('email_login_code'),
|
||||
data={'email': self.new_user.email, 'code': 'intentionally-wrong-code'})
|
||||
|
||||
self.assertEqual(response.status_code, HttpResponseBadRequest.status_code)
|
||||
self.assertFalse(self.client.is_authorised())
|
||||
self.assertFalse(User.objects.get(id=self.new_user.id).is_email_verified)
|
||||
|
||||
|
||||
class ViewExternalLoginTests(TestCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
# Set up data for the whole TestCase
|
||||
cls.new_user: User = User.objects.create(
|
||||
email="testemail@xx.com",
|
||||
membership_started_at=datetime.now() - timedelta(days=5),
|
||||
membership_expires_at=datetime.now() + timedelta(days=5),
|
||||
slug="ujlbu4"
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
self.client = HelperClient()
|
||||
|
||||
def test_successful_flat_redirect(self):
|
||||
# given
|
||||
self.client = HelperClient(user=self.new_user)
|
||||
self.client.authorise()
|
||||
|
||||
# when
|
||||
with self.settings(JWT_SECRET="xxx"):
|
||||
response = self.client.get(reverse('external_login'), data={'redirect': 'some-page'})
|
||||
|
||||
# then
|
||||
self.assertRegex(text=urljoin(response.request['PATH_INFO'], response.url),
|
||||
expected_regex='\/auth\/external\/some-page\?jwt=.*')
|
||||
|
||||
# check jwt
|
||||
url_params = response.url.split("?")[1]
|
||||
jwt_str = url_params.split("=")[1]
|
||||
payload = jwt.decode(jwt_str, key="xxx", verify=True)
|
||||
self.assertIsNotNone(payload)
|
||||
self.assertEqual(payload['user_slug'], self.new_user.slug)
|
||||
self.assertEqual(payload['user_name'], self.new_user.full_name)
|
||||
self.assertIsNotNone(payload['exp'])
|
||||
|
||||
def test_successful_redirect_with_query_params(self):
|
||||
# given
|
||||
self.client = HelperClient(user=self.new_user)
|
||||
self.client.authorise()
|
||||
|
||||
# when
|
||||
with self.settings(JWT_SECRET="xxx"):
|
||||
response = self.client.get(reverse('external_login'), data={'redirect': 'some-page?param1=value1'})
|
||||
|
||||
# then
|
||||
self.assertRegex(text=urljoin(response.request['PATH_INFO'], response.url),
|
||||
expected_regex='\/auth\/external\/some-page\?param1=value1&jwt=.*')
|
||||
|
||||
def test_param_redirect_absent(self):
|
||||
response = self.client.get(reverse('external_login'))
|
||||
self.assertContains(response=response, text="Нужен параметр ?redirect", status_code=200)
|
||||
|
||||
def test_user_is_unauthorised(self):
|
||||
response = self.client.get(reverse('external_login'), data={'redirect': 'some-page'})
|
||||
self.assertRedirects(response=response,
|
||||
expected_url='/auth/login/?goto=%2Fauth%2Fexternal%2F%3Fredirect%3Dsome-page',
|
||||
fetch_redirect_response=False)
|
||||
|
||||
self.assertFalse(self.client.is_authorised())
|
||||
|
||||
|
||||
class ViewPatreonLoginTests(TestCase):
|
||||
def test_positive(self):
|
||||
with self.settings(PATREON_CLIENT_ID="x-client_id",
|
||||
PATREON_REDIRECT_URL="http://x-redirect_url.com",
|
||||
PATREON_SCOPE="x-scope"):
|
||||
response = self.client.get(reverse('patreon_login'), )
|
||||
self.assertRedirects(response=response,
|
||||
expected_url='https://www.patreon.com/oauth2/authorize?client_id=x-client_id&redirect_uri=http%3A%2F%2Fx-redirect_url.com&response_type=code&scope=x-scope',
|
||||
fetch_redirect_response=False)
|
||||
|
||||
|
||||
@patch('auth.views.patreon.patreon')
|
||||
class ViewPatreonOauthCallbackTests(TestCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
# Set up data for the whole TestCase
|
||||
cls.new_user: User = User.objects.create(
|
||||
email="existed-user@email.com",
|
||||
membership_started_at=datetime.now() - timedelta(days=5),
|
||||
membership_expires_at=datetime.now() + timedelta(days=5),
|
||||
slug="ujlbu4"
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
self.client = HelperClient()
|
||||
|
||||
self.stub_patreon_response_oauth_token = {
|
||||
"access_token": "xxx-access-token",
|
||||
"refresh_token": "xxx-refresh-token",
|
||||
"expires_in": (datetime.utcnow() + timedelta(minutes=5)).microsecond,
|
||||
"scope": "scope??",
|
||||
"token_type": "Bearer"
|
||||
}
|
||||
self.stub_patreon_response_oauth_identity = None # doesn't need for now
|
||||
self.stub_parse_membership = Membership(
|
||||
platform=Platform.patreon,
|
||||
user_id=str(uuid.uuid4()),
|
||||
full_name="PatreonMember FullName",
|
||||
email="platform@patreon.com",
|
||||
image="http://xxx.url",
|
||||
started_at=datetime.utcnow(),
|
||||
charged_at=None,
|
||||
expires_at=datetime.utcnow() + timedelta(days=100 * 365),
|
||||
lifetime_support_cents=400,
|
||||
currently_entitled_amount_cents=0
|
||||
)
|
||||
|
||||
def test_successful_login_new_member(self, mocked_patreon):
|
||||
# given
|
||||
mocked_patreon.fetch_auth_data.return_value = self.stub_patreon_response_oauth_token
|
||||
mocked_patreon.fetch_user_data.return_value = self.stub_patreon_response_oauth_identity
|
||||
membership = self.stub_parse_membership
|
||||
membership.user_id = str(uuid.uuid4())
|
||||
membership.email = f"{membership.user_id}@email.com"
|
||||
mocked_patreon.parse_active_membership.return_value = membership
|
||||
|
||||
# when
|
||||
response = self.client.get(reverse('patreon_oauth_callback'), data={'code': '1234'})
|
||||
|
||||
# then
|
||||
self.assertRedirects(response=response, expected_url=f'/user/PatreonMemberFullName/',
|
||||
fetch_redirect_response=False)
|
||||
self.assertTrue(self.client.is_authorised())
|
||||
# created user
|
||||
created_user: User = User.objects.filter(email=membership.email).get()
|
||||
self.assertIsNotNone(created_user)
|
||||
self.assertEqual(created_user.patreon_id, membership.user_id)
|
||||
self.assertEqual(created_user.full_name, "PatreonMember FullName")
|
||||
self.assertEqual(created_user.membership_platform_type, "patreon")
|
||||
self.assertEqual(created_user.membership_started_at, membership.started_at)
|
||||
self.assertEqual(created_user.membership_expires_at, membership.expires_at)
|
||||
self.assertEqual(created_user.balance, 4) # 400 / 100
|
||||
self.assertFalse(created_user.is_email_verified)
|
||||
self.assertEqual(created_user.membership_platform_data, {'access_token': 'xxx-access-token',
|
||||
'refresh_token': 'xxx-refresh-token'})
|
||||
|
||||
def test_successful_login_existed_member(self, mocked_patreon):
|
||||
# given
|
||||
mocked_patreon.fetch_auth_data.return_value = self.stub_patreon_response_oauth_token
|
||||
mocked_patreon.fetch_user_data.return_value = self.stub_patreon_response_oauth_identity
|
||||
membership = self.stub_parse_membership
|
||||
membership.email = "existed-user@email.com"
|
||||
membership.lifetime_support_cents = 100500
|
||||
mocked_patreon.parse_active_membership.return_value = membership
|
||||
|
||||
# when
|
||||
response = self.client.get(reverse('patreon_oauth_callback'), data={'code': '1234'})
|
||||
|
||||
# then
|
||||
self.assertRedirects(response=response, expected_url=f'/user/ujlbu4/',
|
||||
fetch_redirect_response=False)
|
||||
self.assertTrue(self.client.is_authorised())
|
||||
# user updated attributes
|
||||
created_user: User = User.objects.filter(email="existed-user@email.com").get()
|
||||
self.assertIsNotNone(created_user)
|
||||
self.assertEqual(created_user.membership_expires_at, membership.expires_at)
|
||||
self.assertEqual(created_user.balance, 1005) # 100500 / 100
|
||||
self.assertEqual(created_user.membership_platform_data, {'access_token': 'xxx-access-token',
|
||||
'refresh_token': 'xxx-refresh-token'})
|
||||
|
||||
def test_patreon_exception(self, mocked_patreon):
|
||||
# given
|
||||
mocked_patreon.fetch_auth_data.side_effect = PatreonException("custom_test_exception")
|
||||
|
||||
# when
|
||||
response = self.client.get(reverse('patreon_oauth_callback'), data={'code': '1234'})
|
||||
|
||||
# then
|
||||
self.assertContains(response=response, text="Не получилось загрузить ваш профиль с серверов патреона",
|
||||
status_code=200)
|
||||
|
||||
def test_patreon_not_membership(self, mocked_patreon):
|
||||
# given
|
||||
mocked_patreon.fetch_auth_data.return_value = self.stub_patreon_response_oauth_token
|
||||
mocked_patreon.fetch_user_data.return_value = None
|
||||
mocked_patreon.parse_active_membership.return_value = None # no membership
|
||||
|
||||
# when
|
||||
response = self.client.get(reverse('patreon_oauth_callback'), data={'code': '1234'})
|
||||
|
||||
# then
|
||||
self.assertContains(response=response, text="Надо быть патроном, чтобы состоять в Клубе", status_code=200)
|
||||
|
||||
def test_param_code_absent(self, mocked_patreon=None):
|
||||
response = self.client.get(reverse('patreon_oauth_callback'), data={})
|
||||
self.assertContains(response=response, text="Что-то сломалось между нами и патреоном", status_code=200)
|
|
@ -13,6 +13,7 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|||
|
||||
SECRET_KEY = os.getenv("SECRET_KEY") or "wow so secret"
|
||||
DEBUG = (os.getenv("DEBUG") == "true") # SECURITY WARNING: don't run with debug turned on in production!
|
||||
TESTS_RUN = True if os.getenv("TESTS_RUN") else False
|
||||
|
||||
ALLOWED_HOSTS = ["*", "127.0.0.1", "localhost", "0.0.0.0", "vas3k.club"]
|
||||
INTERNAL_IPS = ["127.0.0.1"]
|
||||
|
@ -157,7 +158,7 @@ LAUNCH_DATE = datetime(2020, 4, 13)
|
|||
AUTH_CODE_LENGTH = 6
|
||||
AUTH_CODE_EXPIRATION_TIMEDELTA = timedelta(minutes=15)
|
||||
AUTH_MAX_CODE_TIMEDELTA = timedelta(hours=1)
|
||||
AUTH_MAX_CODE_COUNT = 4
|
||||
AUTH_MAX_CODE_COUNT = 5
|
||||
AUTH_MAX_CODE_ATTEMPTS = 3
|
||||
|
||||
DEFAULT_PAGE_SIZE = 70
|
||||
|
|
11
club/urls.py
11
club/urls.py
|
@ -90,7 +90,7 @@ urlpatterns = [
|
|||
path("post/<slug:post_slug>/admin/", admin_post, name="admin_post"),
|
||||
path("post/<slug:post_slug>/announce/", announce_post, name="announce_post"),
|
||||
path("post/<slug:post_slug>/comment/create/", create_comment, name="create_comment"),
|
||||
path("post/<slug:post_slug>/comment/<uuid:comment_id>/", show_comment, name="show_comment",),
|
||||
path("post/<slug:post_slug>/comment/<uuid:comment_id>/", show_comment, name="show_comment", ),
|
||||
|
||||
path("bookmarks/", bookmarks, name="bookmarks"),
|
||||
|
||||
|
@ -137,4 +137,13 @@ urlpatterns = [
|
|||
|
||||
if settings.DEBUG:
|
||||
import debug_toolbar
|
||||
|
||||
urlpatterns = [path("__debug__/", include(debug_toolbar.urls))] + urlpatterns
|
||||
|
||||
# According to django doc: https://docs.djangoproject.com/en/3.1/topics/testing/overview/#other-test-conditions
|
||||
# Regardless of the value of the DEBUG setting in your configuration file, all Django tests run with DEBUG=False
|
||||
# so we use separate special var instead of settings.DEBUG
|
||||
if settings.TESTS_RUN:
|
||||
from debug.api import api_me
|
||||
|
||||
urlpatterns.append(path("debug/me", api_me, name="debug_api_me"))
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
from django.http import JsonResponse
|
||||
from django.shortcuts import get_object_or_404
|
||||
|
||||
from debug.utils_for_tests import todict
|
||||
from users.models.user import User
|
||||
|
||||
|
||||
def api_me(request):
|
||||
response = {}
|
||||
|
||||
if request.me:
|
||||
user: User = get_object_or_404(User, slug=request.me.slug)
|
||||
response = todict(user)
|
||||
response["is_authorised"] = True
|
||||
else:
|
||||
response["is_authorised"] = False
|
||||
|
||||
return JsonResponse(response)
|
|
@ -0,0 +1,126 @@
|
|||
from django.test import SimpleTestCase
|
||||
|
||||
from debug.utils_for_tests import todict
|
||||
|
||||
|
||||
class TestToDictFunction(SimpleTestCase):
|
||||
def test_convert_instance_variables(self):
|
||||
class TestClass:
|
||||
def __init__(self, instance_attribute_variable=2):
|
||||
self.instance_attribute_variable = instance_attribute_variable
|
||||
|
||||
json_obj = todict(TestClass(), convert_private=False, include_none_fields=True)
|
||||
self.assertEqual(json_obj, {'instance_attribute_variable': 2})
|
||||
|
||||
def test_convert_class_attributes(self):
|
||||
class TestClass:
|
||||
class_attribute_variable = 1
|
||||
|
||||
def __init__(self, instance_attribute_variable=2):
|
||||
self.instance_attribute_variable = instance_attribute_variable
|
||||
|
||||
# convert_private=False
|
||||
json_obj = todict(TestClass(), convert_private=False, include_none_fields=True)
|
||||
self.assertEqual(json_obj, {'instance_attribute_variable': 2})
|
||||
|
||||
# convert_private=True
|
||||
json_obj = todict(TestClass(), convert_private=True, include_none_fields=True)
|
||||
self.assertEqual(json_obj, {'instance_attribute_variable': 2})
|
||||
|
||||
# include_class_attrs=True
|
||||
json_obj = todict(TestClass(), convert_private=True, include_none_fields=True, include_class_attrs=True)
|
||||
self.assertEqual(json_obj, {'class_attribute_variable': 1,
|
||||
'instance_attribute_variable': 2})
|
||||
|
||||
def test_not_convert_callable_methods(self):
|
||||
class TestClass:
|
||||
class_attribute_variable = 1
|
||||
|
||||
def __init__(self, instance_attribute_variable=2):
|
||||
self.instance_attribute_variable = instance_attribute_variable
|
||||
|
||||
def method_a(self):
|
||||
pass
|
||||
|
||||
json_obj = todict(TestClass(), convert_private=True, include_none_fields=True, include_class_attrs=True)
|
||||
self.assertNotIn("method_a", json_obj)
|
||||
self.assertEqual(json_obj, {'class_attribute_variable': 1, 'instance_attribute_variable': 2})
|
||||
|
||||
def test_convert_private_attribute_flag(self):
|
||||
class TestClass:
|
||||
def __init__(self):
|
||||
self.public_variable = 1
|
||||
self._private_variable = 2
|
||||
|
||||
# convert_private=True
|
||||
json_obj = todict(TestClass(), convert_private=True, include_none_fields=True)
|
||||
self.assertEqual(json_obj, {'_private_variable': 2, 'public_variable': 1})
|
||||
|
||||
# convert_private=False
|
||||
json_obj = todict(TestClass(), convert_private=False, include_none_fields=True)
|
||||
self.assertEqual(json_obj, {'public_variable': 1})
|
||||
self.assertNotIn("method_a", json_obj)
|
||||
|
||||
def test_convert_types(self):
|
||||
class TestClass:
|
||||
def __init__(self):
|
||||
self.variable_int = 1
|
||||
self.variable_string = "hello world"
|
||||
self.variable_list = [1, 2, 3]
|
||||
self.variable_dict = {'a': 1, 'b': 2}
|
||||
self.variable_tuple = (1, 2)
|
||||
self.variable_boolean = True
|
||||
|
||||
json_obj = todict(TestClass(), convert_private=True, include_none_fields=True)
|
||||
self.assertEqual(json_obj, {'variable_boolean': True,
|
||||
'variable_dict': {'a': 1, 'b': 2},
|
||||
'variable_int': 1,
|
||||
'variable_list': [1, 2, 3],
|
||||
'variable_string': 'hello world',
|
||||
'variable_tuple': [1, 2]})
|
||||
|
||||
def test_convert_inner_objects(self):
|
||||
class TestClassA:
|
||||
def __init__(self):
|
||||
self.var_dict = {'a': 1, 'b': 2}
|
||||
|
||||
class TestClassB:
|
||||
def __init__(self, other_obj):
|
||||
self.obj_with_inner_obj = other_obj
|
||||
|
||||
json_obj = todict(TestClassB(other_obj=TestClassA()), convert_private=True, include_none_fields=True)
|
||||
self.assertEqual(json_obj, {'obj_with_inner_obj': {'var_dict': {'a': 1, 'b': 2}}})
|
||||
|
||||
def test_include_none_fields_flag(self):
|
||||
class TestClass:
|
||||
def __init__(self):
|
||||
self.var_int0 = 0
|
||||
self.var_int1 = 1
|
||||
self.var_string_empty = ""
|
||||
self.var_string = "hello world"
|
||||
self.var_list_empty = []
|
||||
self.var_list = [1, 2, 3]
|
||||
self.var_bool_false = False
|
||||
self.var_bool_true = True
|
||||
self.var_none = None
|
||||
|
||||
# include_none_fields=True
|
||||
json_obj = todict(TestClass(), convert_private=False, include_none_fields=True)
|
||||
self.assertEqual(json_obj, {'var_bool_false': False,
|
||||
'var_bool_true': True,
|
||||
'var_int0': 0,
|
||||
'var_int1': 1,
|
||||
'var_list': [1, 2, 3],
|
||||
'var_list_empty': [],
|
||||
'var_none': None,
|
||||
'var_string': 'hello world',
|
||||
'var_string_empty': ''})
|
||||
|
||||
# include_none_fields=False
|
||||
json_obj = todict(TestClass(), convert_private=False, include_none_fields=False)
|
||||
self.assertEqual(json_obj, {'var_bool_false': False,
|
||||
'var_bool_true': True,
|
||||
'var_int0': 0,
|
||||
'var_int1': 1,
|
||||
'var_list': [1, 2, 3],
|
||||
'var_string': 'hello world'})
|
|
@ -0,0 +1,35 @@
|
|||
def todict(obj, include_class_attrs=False, convert_private=False, include_none_fields=True):
|
||||
"""Convert object to dict"""
|
||||
if isinstance(obj, dict):
|
||||
data = {}
|
||||
for (k, v) in obj.items():
|
||||
data[k] = todict(v, include_class_attrs, convert_private)
|
||||
return data
|
||||
elif hasattr(obj, "_ast"):
|
||||
return todict(obj._ast(), convert_private=convert_private)
|
||||
elif hasattr(obj, "__iter__") and not isinstance(obj, str):
|
||||
return [todict(v, include_class_attrs, convert_private) for v in obj]
|
||||
elif hasattr(obj, "__dict__"):
|
||||
if convert_private:
|
||||
instance_attributes = [(key, value) for key, value in obj.__dict__.items() if not callable(value)]
|
||||
else:
|
||||
instance_attributes = [(key, value) for key, value in obj.__dict__.items() if
|
||||
not callable(value) and not key.startswith('_')]
|
||||
|
||||
if include_class_attrs and hasattr(obj, "__class__"):
|
||||
class_attributes = [(key, value) for key, value in obj.__class__.__dict__.items() if
|
||||
(key[:2] != "__") and (not callable(value))]
|
||||
else:
|
||||
class_attributes = []
|
||||
|
||||
items = instance_attributes
|
||||
items.extend(class_attributes)
|
||||
|
||||
# if include_none_fields or value: for include or exclude none fields
|
||||
data = dict(
|
||||
[(key, todict(value, include_class_attrs, convert_private, include_none_fields)) for key, value in items if
|
||||
include_none_fields or (value is not None and value != [] and value != "")])
|
||||
|
||||
return data
|
||||
else:
|
||||
return obj
|
|
@ -0,0 +1,12 @@
|
|||
FROM node:14-slim
|
||||
|
||||
WORKDIR /app/frontend
|
||||
|
||||
COPY ./frontend/package.json ./
|
||||
COPY ./frontend/package-lock.json ./
|
||||
|
||||
RUN npm ci
|
||||
|
||||
COPY ./frontend ./
|
||||
RUN npm run build
|
||||
|
|
@ -0,0 +1,52 @@
|
|||
version: "3.7"
|
||||
|
||||
services:
|
||||
tests:
|
||||
build:
|
||||
dockerfile: dev.dockerfile
|
||||
context: .
|
||||
command: make test-ci
|
||||
container_name: tests
|
||||
environment:
|
||||
- DEBUG=true # regardless of this value django will override it on false
|
||||
- TESTS_RUN=da
|
||||
- PYTHONUNBUFFERED=1
|
||||
- POSTGRES_DB=vas3k_club
|
||||
- POSTGRES_USER=postgres
|
||||
- POSTGRES_PASSWORD=postgres
|
||||
- POSTGRES_HOST=postgres
|
||||
- REDIS_DB=0
|
||||
- REDIS_HOST=redis
|
||||
volumes:
|
||||
- .:/app:delegated
|
||||
depends_on:
|
||||
- postgres
|
||||
- redis
|
||||
- webpack
|
||||
|
||||
redis:
|
||||
image: redis:alpine
|
||||
environment:
|
||||
- ALLOW_EMPTY_PASSWORD=yes
|
||||
ports:
|
||||
- 6379
|
||||
|
||||
postgres:
|
||||
image: postgres:11
|
||||
container_name: club_postgres
|
||||
environment:
|
||||
- POSTGRES_USER=postgres
|
||||
- POSTGRES_PASSWORD=postgres
|
||||
- POSTGRES_DB=vas3k_club
|
||||
ports:
|
||||
- 5432
|
||||
|
||||
webpack:
|
||||
build:
|
||||
dockerfile: dev.frontend.dockerfile
|
||||
context: .
|
||||
command: npm run watch
|
||||
restart: "no"
|
||||
volumes:
|
||||
- .:/app:delegated
|
||||
working_dir: /app/frontend
|
|
@ -1,9 +1,11 @@
|
|||
from datetime import datetime, timedelta
|
||||
import random
|
||||
import socket
|
||||
import time
|
||||
|
||||
if __name__ == "__main__":
|
||||
while True:
|
||||
started_at = datetime.utcnow()
|
||||
while datetime.utcnow() < started_at + timedelta(minutes=5):
|
||||
try:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
sock.connect(("postgres", 5432))
|
||||
|
|
Loading…
Reference in New Issue