Backported and adapted patch for py-social-auth-core 5.4.3 to fix CVE-2025-61783. Obtained from: From 10c80e2ebabeccd4e9c84ad0e16e1db74148ed4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20=C4=8Ciha=C5=99?= Date: Tue, 30 Sep 2025 13:38:21 +0200 Subject: [PATCH] fix: avoid associating with existing user when creating fails This behavior was introduced in 9f86059e9d8070bc5ecd7ba069fadab1c9bf502a to workaround concurrency issues, but the only safe way to deal with this is to restart the pipeline to make sure that all possible policies apply. This is currently not possible, so let's fail with AuthAlreadyAssociated and let user restart the authentication pipeline manually. --- social_django/storage.py.orig 2025-02-13 13:06:56 UTC +++ social_django/storage.py @@ -5,6 +5,7 @@ from django.db.utils import IntegrityError from django.core.exceptions import FieldDoesNotExist from django.db import router, transaction from django.db.utils import IntegrityError +from social_core.exceptions import AuthAlreadyAssociated from social_core.storage import ( AssociationMixin, BaseStorage, @@ -75,26 +76,24 @@ class DjangoUserMixin(UserMixin): cls.user_model()._meta.get_field("username") except FieldDoesNotExist: kwargs.pop("username") + + if hasattr(transaction, "atomic"): + # In Django versions that have an "atomic" transaction decorator / context + # manager, there's a transaction wrapped around this call. + # If the create fails below due to an IntegrityError, ensure that the transaction + # stays undamaged by wrapping the create in an atomic. + using = router.db_for_write(cls.user_model()) + try: if hasattr(transaction, "atomic"): - # In Django versions that have an "atomic" transaction decorator / context - # manager, there's a transaction wrapped around this call. - # If the create fails below due to an IntegrityError, ensure that the transaction - # stays undamaged by wrapping the create in an atomic. - using = router.db_for_write(cls.user_model()) with transaction.atomic(using=using): user = cls.user_model()._default_manager.create_user(*args, **kwargs) else: user = cls.user_model()._default_manager.create_user(*args, **kwargs) + + return user except IntegrityError as exc: - # If email comes in as None it won't get found in the get - if kwargs.get("email", True) is None: - kwargs["email"] = "" - try: - user = cls.user_model()._default_manager.get(*args, **kwargs) - except cls.user_model().DoesNotExist: - raise exc - return user + raise AuthAlreadyAssociated(None) from exc @classmethod def get_user(cls, pk=None, **kwargs): --- tests/test_models.py.orig 2025-02-13 13:06:56 UTC +++ tests/test_models.py @@ -5,6 +5,7 @@ from django.test import TestCase from django.core.management import call_command from django.db import IntegrityError from django.test import TestCase +from social_core.exceptions import AuthAlreadyAssociated from social_django.models import ( AbstractUserSocialAuth, @@ -101,17 +102,21 @@ class TestUserSocialAuth(TestCase): self.assertEqual(UserSocialAuth.get_username(self.user), self.user.username) def test_create_user(self): - # Catch integrity error and find existing user - UserSocialAuth.create_user(username=self.user.username) + UserSocialAuth.create_user(username="testuser") def test_create_user_reraise(self): - with self.assertRaises(IntegrityError): + with self.assertRaises(AuthAlreadyAssociated): UserSocialAuth.create_user(username=self.user.username, email=None) @mock.patch("social_django.models.UserSocialAuth.username_field", return_value="email") - @mock.patch("django.contrib.auth.models.UserManager.create_user", side_effect=IntegrityError) + @mock.patch("django.contrib.auth.models.UserManager.create_user", return_value="") def test_create_user_custom_username(self, *args): UserSocialAuth.create_user(username=self.user.email) + + @mock.patch("django.contrib.auth.models.UserManager.create_user", side_effect=IntegrityError) + def test_create_user_existing(self, *args): + with self.assertRaises(AuthAlreadyAssociated): + UserSocialAuth.create_user(username=self.user.email) @mock.patch("social_django.storage.transaction", spec=[]) def test_create_user_without_transaction_atomic(self, *args):