Djangoでのモデルインスタンスとその関連オブジェクトの複製/オブジェクトを再帰的に複製するためのアルゴリズム

StackOverflow https://stackoverflow.com/questions/437166

質問

Books Chapters 、および Pages のモデルがあります。それらはすべて User によって書かれています:

from django.db import models

class Book(models.Model)
    author = models.ForeignKey('auth.User')

class Chapter(models.Model)
    author = models.ForeignKey('auth.User')
    book = models.ForeignKey(Book)

class Page(models.Model)
    author = models.ForeignKey('auth.User')
    book = models.ForeignKey(Book)
    chapter = models.ForeignKey(Chapter)

やりたいことは、既存の Book を複製し、その User を他の誰かに更新することです。しわは、関連するすべてのモデルインスタンスを Book に複製したいことです。すべての Chapters Pages も同様です!

Page を見ると非常に注意が必要です-新しい Pages author フィールドを更新する必要があるだけでなく、また、新しい Chapter オブジェクトを指す必要があります!

Djangoは、これを行うための独創的な方法をサポートしていますか?モデルを複製するための一般的なアルゴリズムはどのようになりますか?

乾杯、

ジョン


更新:

上記のクラスは、私が抱えている問題を説明するための例にすぎません!

役に立ちましたか?

解決

CollectedObjectsが削除されたため、Django 1.3では動作しなくなりました。 changeset 14507

をご覧ください。

Django Snippetsにソリューションを投稿しました。 django.db.models.query.CollectedObject オブジェクトの削除に使用されるコード:

from django.db.models.query import CollectedObjects
from django.db.models.fields.related import ForeignKey

def duplicate(obj, value, field):
    """
    Duplicate all related objects of `obj` setting
    `field` to `value`. If one of the duplicate
    objects has an FK to another duplicate object
    update that as well. Return the duplicate copy
    of `obj`.  
    """
    collected_objs = CollectedObjects()
    obj._collect_sub_objects(collected_objs)
    related_models = collected_objs.keys()
    root_obj = None
    # Traverse the related models in reverse deletion order.    
    for model in reversed(related_models):
        # Find all FKs on `model` that point to a `related_model`.
        fks = []
        for f in model._meta.fields:
            if isinstance(f, ForeignKey) and f.rel.to in related_models:
                fks.append(f)
        # Replace each `sub_obj` with a duplicate.
        sub_obj = collected_objs[model]
        for pk_val, obj in sub_obj.iteritems():
            for fk in fks:
                fk_value = getattr(obj, "%s_id" % fk.name)
                # If this FK has been duplicated then point to the duplicate.
                if fk_value in collected_objs[fk.rel.to]:
                    dupe_obj = collected_objs[fk.rel.to][fk_value]
                    setattr(obj, fk.name, dupe_obj)
            # Duplicate the object and save it.
            obj.id = None
            setattr(obj, field, value)
            obj.save()
            if root_obj is None:
                root_obj = obj
    return root_obj

他のヒント

オブジェクトをコピーする簡単な方法を次に示します。

基本的に:

(1)元のオブジェクトのIDをNoneに設定します:

book_to_copy.id =なし

(2) 'author'属性を変更してojbectを保存します:

book_to_copy.author = new_author

book_to_copy.save()

(3)UPDATEの代わりにINSERTを実行

(ページの著者の変更については触れていません。モデルの再構築に関するコメントに同意します)

djangoで試したことはありませんが、pythonの deepcopy がうまくいくかもしれません

編集:

関数を実装する場合、モデルのカスタムコピー動作を定義できます。

__copy__() and __deepcopy__()

これは、 http://www.djangosnippets.org/snippets/1282/

1.3でCollectedObjectsを置き換えたCollectorと互換性があります。

これはあまり重くテストしていませんでしたが、約20,000個のサブオブジェクトを持つオブジェクトでテストしましたが、外部キーの深さはわずか3層でした。もちろん、自己責任で使用してください。

この投稿を読む野心的な人のために、Collectorを" DuplicateCollector"のようなクラスにサブクラス化する(またはクラス全体をコピーしてdjango APIのこの未公開セクションの依存関係を削除する)ことを検討する必要があります.deleteメソッドと同様に機能する.duplicateメソッドを記述します。そうすれば、この問題を実際に解決できます。

from django.db.models.deletion import Collector
from django.db.models.fields.related import ForeignKey

def duplicate(obj, value=None, field=None, duplicate_order=None):
    """
    Duplicate all related objects of obj setting
    field to value. If one of the duplicate
    objects has an FK to another duplicate object
    update that as well. Return the duplicate copy
    of obj.
    duplicate_order is a list of models which specify how
    the duplicate objects are saved. For complex objects
    this can matter. Check to save if objects are being
    saved correctly and if not just pass in related objects
    in the order that they should be saved.
    """
    collector = Collector({})
    collector.collect([obj])
    collector.sort()
    related_models = collector.data.keys()
    data_snapshot =  {}
    for key in collector.data.keys():
        data_snapshot.update({ key: dict(zip([item.pk for item in collector.data[key]], [item for item in collector.data[key]])) })
    root_obj = None

    # Sometimes it's good enough just to save in reverse deletion order.
    if duplicate_order is None:
        duplicate_order = reversed(related_models)

    for model in duplicate_order:
        # Find all FKs on model that point to a related_model.
        fks = []
        for f in model._meta.fields:
            if isinstance(f, ForeignKey) and f.rel.to in related_models:
                fks.append(f)
        # Replace each `sub_obj` with a duplicate.
        if model not in collector.data:
            continue
        sub_objects = collector.data[model]
        for obj in sub_objects:
            for fk in fks:
                fk_value = getattr(obj, "%s_id" % fk.name)
                # If this FK has been duplicated then point to the duplicate.
                fk_rel_to = data_snapshot[fk.rel.to]
                if fk_value in fk_rel_to:
                    dupe_obj = fk_rel_to[fk_value]
                    setattr(obj, fk.name, dupe_obj)
            # Duplicate the object and save it.
            obj.id = None
            if field is not None:
                setattr(obj, field, value)
            obj.save()
            if root_obj is None:
                root_obj = obj
    return root_obj

編集:デバッグ" print"を削除しました。ステートメント。

Django 1.5では、これは私のために機能します:

thing.id = None
thing.pk = None
thing.save()

上記のCollectedObjectsスニペットの使用は機能しなくなりましたが、次の変更を行うことで実行できます。

from django.contrib.admin.util import NestedObjects
from django.db import DEFAULT_DB_ALIAS

and

collector = NestedObjects(using=DEFAULT_DB_ALIAS)

CollectorObjectsの代わりに

構築しているデータベースにコピーが2つしかない場合、管理インターフェイスの[戻る]ボタンを使用し、必要なフィールドを変更して、インスタンスを再度保存できることがわかりました。これは、たとえば「ギムレット」を作成する必要がある場合に役立ちます。 「ウォッカギムレット」唯一の違いは、名前と材料を置き換えることです。明らかに、これにはデータの少し先見が必要であり、djangoのコピー/ディープコピーをオーバーライドするほど強力ではありませんが、一部の人にとってはトリックになるかもしれません。

シンプルで非汎用的な方法

提案された解決策は私にとってはうまくいかなかったので、私は単純で賢い方法ではありませんでした。これは単純な場合にのみ役立ちます。

次の構造を持つモデルの場合

Book
 |__ CroppedFace
 |__ Photo
      |__ AwsReco
            |__ AwsLabel
            |__ AwsFace
                  |__ AwsEmotion

これは動作します

def duplicate_book(book: Book, new_user: MyUser):
    # AwsEmotion, AwsFace, AwsLabel, AwsReco, Photo, CroppedFace, Book

    old_cropped_faces = book.croppedface_set.all()
    old_photos = book.photo_set.all()

    book.pk = None
    book.user = new_user
    book.save()

    for cf in old_cropped_faces:
        cf.pk = None
        cf.book = book
        cf.save()

    for photo in old_photos:
        photo.pk = None
        photo.book = book
        photo.save()

        if hasattr(photo, 'awsreco'):
            reco = photo.awsreco
            old_aws_labels = reco.awslabel_set.all()
            old_aws_faces = reco.awsface_set.all()
            reco.pk = None
            reco.photo = photo
            reco.save()

            for label in old_aws_labels:
                label.pk = None
                label.reco = reco
                label.save()

            for face in old_aws_faces:
                old_aws_emotions = face.awsemotion_set.all()
                face.pk = None
                face.reco = reco
                face.save()

                for emotion in old_aws_emotions:
                    emotion.pk = None
                    emotion.aws_face = face
                    emotion.save()
    return book

Django 2.2 / Python 3.6でいくつかの回答を試しましたが、1対多および多対多の関連オブジェクトをコピーするようには見えませんでした。また、多くにはハードコーディングが組み込まれており、データ構造の事前知識が組み込まれています。

1対多および多対多の関連オブジェクトを処理する、より一般的な方法でこれを行う方法を書きました。コメントが含まれており、提案があれば改善したいと考えています。

def duplicate_object(self):
    """
    Duplicate a model instance, making copies of all foreign keys pointing to it.
    There are 3 steps that need to occur in order:

        1.  Enumerate the related child objects and m2m relations, saving in lists/dicts
        2.  Copy the parent object per django docs (doesn't copy relations)
        3a. Copy the child objects, relating to the copied parent object
        3b. Re-create the m2m relations on the copied parent object

    """
    related_objects_to_copy = []
    relations_to_set = {}
    # Iterate through all the fields in the parent object looking for related fields
    for field in self._meta.get_fields():
        if field.one_to_many:
            # One to many fields are backward relationships where many child 
            # objects are related to the parent. Enumerate them and save a list 
            # so we can copy them after duplicating our parent object.
            print(f'Found a one-to-many field: {field.name}')

            # 'field' is a ManyToOneRel which is not iterable, we need to get
            # the object attribute itself.
            related_object_manager = getattr(self, field.name)
            related_objects = list(related_object_manager.all())
            if related_objects:
                print(f' - {len(related_objects)} related objects to copy')
                related_objects_to_copy += related_objects

        elif field.many_to_one:
            # In testing, these relationships are preserved when the parent
            # object is copied, so they don't need to be copied separately.
            print(f'Found a many-to-one field: {field.name}')

        elif field.many_to_many:
            # Many to many fields are relationships where many parent objects
            # can be related to many child objects. Because of this the child
            # objects don't need to be copied when we copy the parent, we just
            # need to re-create the relationship to them on the copied parent.
            print(f'Found a many-to-many field: {field.name}')
            related_object_manager = getattr(self, field.name)
            relations = list(related_object_manager.all())
            if relations:
                print(f' - {len(relations)} relations to set')
                relations_to_set[field.name] = relations

    # Duplicate the parent object
    self.pk = None
    self.save()
    print(f'Copied parent object ({str(self)})')

    # Copy the one-to-many child objects and relate them to the copied parent
    for related_object in related_objects_to_copy:
        # Iterate through the fields in the related object to find the one that 
        # relates to the parent model.
        for related_object_field in related_object._meta.fields:
            if related_object_field.related_model == self.__class__:
                # If the related_model on this field matches the parent
                # object's class, perform the copy of the child object and set
                # this field to the parent object, creating the new
                # child -> parent relationship.
                related_object.pk = None
                setattr(related_object, related_object_field.name, self)
                related_object.save()

                text = str(related_object)
                text = (text[:40] + '..') if len(text) > 40 else text
                print(f'|- Copied child object ({text})')

    # Set the many-to-many relations on the copied parent
    for field_name, relations in relations_to_set.items():
        # Get the field by name and set the relations, creating the new
        # relationships.
        field = getattr(self, field_name)
        field.set(relations)
        text_relations = []
        for relation in relations:
            text_relations.append(str(relation))
        print(f'|- Set {len(relations)} many-to-many relations on {field_name} {text_relations}')

    return self

よりシンプルなデータモデルであなたも幸せになると思います。

ページはある章にあるが別の本にあるのは本当ですか?

userMe = User( username="me" )
userYou= User( username="you" )
bookMyA = Book( userMe )
bookYourB = Book( userYou )

chapterA1 = Chapter( book= bookMyA, author=userYou ) # "me" owns the Book, "you" owns the chapter?

chapterB2 = Chapter( book= bookYourB, author=userMe ) # "you" owns the book, "me" owns the chapter?

page1 = Page( book= bookMyA, chapter= chapterB2, author=userMe ) # Book and Author aggree, chapter doesn't?

モデルが複雑すぎるようです。

あなたはもっとシンプルなもので幸せになると思います。あなたがすべての問題を知っているわけではないので、私はこれを推測しています。

class Book(models.Model)
    name = models.CharField(...)

class Chapter(models.Model)
    name = models.CharField(...)
    book = models.ForeignKey(Book)

class Page(models.Model)
    author = models.ForeignKey('auth.User')
    chapter = models.ForeignKey(Chapter)

各ページには独自のオーサーシップがあります。したがって、各章には著者と同様の本があります。これで、ブック、チャプター、およびページを複製して、複製されたページを新しい作成者に割り当てることができます。

実際には、ページとチャプターの間に多対多の関係を持ち、ブックとチャプターのクローンを作成せずに、ページのみの複数のコピーを作成したい場合があります。

ここでの Django 2.1.2 での回答には運がなかったので、上記の回答に大きく基づいたデータベースモデルのディープコピーを実行する一般的な方法を作成しました。

上記の回答との主な違いは、 ForeignKey rel という属性がなくなったため、 f.remote_field.modelに変更する必要があることです。 など。

さらに、データベースモデルのコピー順序を知るのが難しいため、コピーに失敗した場合に現在のモデルをリストの最後にプッシュする単純なキューイングシステムを作成しました。コードは以下のpostetです:

import queue
from django.contrib.admin.utils import NestedObjects
from django.db.models.fields.related import ForeignKey

def duplicate(obj, field=None, value=None, max_retries=5):
    # Use the Nested Objects collector to retrieve the related models
    collector = NestedObjects(using='default')
    collector.collect([obj])
    related_models = list(collector.data.keys())

    # Create an object to map old primary keys to new ones
    data_snapshot = {}
    model_queue = queue.Queue()
    for key in related_models:
        data_snapshot.update(
            {key: {item.pk: None for item in collector.data[key]}}
        )
        model_queue.put(key)

    # For each of the models in related models copy their instances
    root_obj = None
    attempt_count = 0
    while not model_queue.empty():
        model = model_queue.get()
        root_obj, success = copy_instances(model, related_models, collector, data_snapshot, root_obj)

        # If the copy is not a success, it probably means that not
        # all the related fields for the model has been copied yet.
        # The current model is therefore pushed to the end of the list to be copied last
        if not success:

            # If the last model is unsuccessful or the number of max retries is reached, raise an error
            if model_queue.empty() or attempt_count > max_retries:
                raise DuplicationError(model)
            model_queue.put(model)
            attempt_count += 1
    return root_obj

def copy_instances(model, related_models, collector, data_snapshot, root_obj):

# Store all foreign keys for the model in a list
fks = []
for f in model._meta.fields:
    if isinstance(f, ForeignKey) and f.remote_field.model in related_models:
        fks.append(f)

# Iterate over the instances of the model
for obj in collector.data[model]:

    # For each of the models foreign keys check if the related object has been copied
    # and if so, assign its personal key to the current objects related field
    for fk in fks:
        pk_field = f"{fk.name}_id"
        fk_value = getattr(obj, pk_field)

        # Fetch the dictionary containing the old ids
        fk_rel_to = data_snapshot[fk.remote_field.model]

        # If the value exists and is in the dictionary assign it to the object
        if fk_value is not None and fk_value in fk_rel_to:
            dupe_pk = fk_rel_to[fk_value]

            # If the desired pk is none it means that the related object has not been copied yet
            # so the function returns unsuccessful
            if dupe_pk is None:
                return root_obj, False

            setattr(obj, pk_field, dupe_pk)

    # Store the old pk and save the object without an id to create a shallow copy of the object
    old_pk = obj.id
    obj.id = None

    if field is not None:
        setattr(obj, field, value)

    obj.save()

    # Store the new id in the data snapshot object for potential use on later objects
    data_snapshot[model][old_pk] = obj.id

    if root_obj is None:
        root_obj = obj

return root_obj, True

お役に立てば幸いです:)

複製エラーは単純な例外の拡張です:

class DuplicationError(Exception):
    """
    Is raised when a duplication operation did not succeed

    Attributes:
        model -- The database model that failed
    """

    def __init__(self, model):
        self.error_model = model

    def __str__(self):
        return f'Was not able to duplicate database objects for model {self.error_model}'

これはやや単純な解決策です。これは、文書化されていないDjango APIには依存しません。子、孫などのレコードとともに、単一の親レコードを複製することを前提としています。実際に複製する必要があるクラスのホワイトリストを、子オブジェクトを指す各親オブジェクトの1対多関係の名前の list の形式で渡します。このコードは、上記のホワイトリストを前提として、ツリー全体が自己完結型であり、外部参照を気にしないことを前提としています。

このソリューションは、上記の author フィールドに対して特別なことを行いません。それでうまくいくかどうかはわかりません。他の人が言ったように、その author フィールドはおそらく異なるモデルクラスで繰り返されるべきではありません。

このコードに関するもう1つのこと:子孫の新しいレベルごとに自分自身を呼び出すという点で、このコードは本当に再帰的です。

from collections import OrderedDict

def duplicate_model_with_descendants(obj, whitelist, _new_parent_pk=None):
    kwargs = {}
    children_to_clone = OrderedDict()
    for field in obj._meta.get_fields():
        if field.name == "id":
            pass
        elif field.one_to_many:
            if field.name in whitelist:
                these_children = list(getattr(obj, field.name).all())
                if children_to_clone.has_key(field.name):
                    children_to_clone[field.name] |= these_children
                else:
                    children_to_clone[field.name] = these_children
            else:
                pass
        elif field.many_to_one:
            if _new_parent_pk:
                kwargs[field.name + '_id'] = _new_parent_pk
        elif field.concrete:
            kwargs[field.name] = getattr(obj, field.name)
        else:
            pass
    new_instance = obj.__class__(**kwargs)
    new_instance.save()
    new_instance_pk = new_instance.pk
    for ky in children_to_clone.keys():
        child_collection = getattr(new_instance, ky)
        for child in children_to_clone[ky]:
            child_collection.add(duplicate_model_with_descendants(child, whitelist=whitelist, _new_parent_pk=new_instance_pk))
    return new_instance

使用例:

from django.db import models

class Book(models.Model)
    author = models.ForeignKey('auth.User')

class Chapter(models.Model)
    # author = models.ForeignKey('auth.User')
    book = models.ForeignKey(Book, related_name='chapters')

class Page(models.Model)
    # author = models.ForeignKey('auth.User')
    # book = models.ForeignKey(Book)
    chapter = models.ForeignKey(Chapter, related_name='pages')

WHITELIST = ['books', 'chapters', 'pages']
original_record = models.Book.objects.get(pk=1)
duplicate_record = duplicate_model_with_descendants(original_record, WHITELIST)

Stephen G Tuggyのソリューションを試したところ、非常に賢いことがわかりましたが、残念ながら、特別な状況では機能しません。

次のシナリオを想定してみましょう:

class FattAqp(models.Model):    
    descr = models.CharField('descrizione', max_length=200)
    ef = models.ForeignKey(Esercizio, ...)
    forn = models.ForeignKey(Fornitore, ...)

class Periodo(models.Model):
    #  id usato per identificare i documenti
    # periodo rilevato in fattura
    data_i_p = models.DateField('data inizio', blank=True)
    idfatt = models.ForeignKey(FattAqp, related_name='periodo')

class Lettura(models.Model):
    mc_i = models.DecimalField(max_digits=7, ...)
    faqp = models.ForeignKey(FattAqp, related_name='lettura')
    an_im = models.ForeignKey('cnd.AnagImm', ..)

class DettFAqp(models.Model):
    imponibile = models.DecimalField(...)
    voce = models.ForeignKey(VoceAqp, ...)
    periodo = models.ForeignKey(Periodo, related_name='dettfaqp')

この場合、FattAqpインスタンスをディープコピーしようとすると、ef、forn、an_im、voceフィールドは正しく設定されません。一方、idfatt、faqp、periodoはそうします。

関数にパラメーターをもう1つ追加し、コードを少し変更することで問題を解決しました。 Python 3.6とDjango 2.2でテストしました ここにあります:

def duplicate_model_with_descendants(obj, whitelist, _new_parent_pk=None, static_fk=None):
    kwargs = {}
    children_to_clone = OrderedDict()
    for field in obj._meta.get_fields():
        if field.name == "id":
            pass
        elif field.one_to_many:
            if field.name in whitelist:
                these_children = list(getattr(obj, field.name).all())

                if field.name in children_to_clone:
                    children_to_clone[field.name] |= these_children
                else:
                    children_to_clone[field.name] = these_children
            else:
                pass
        elif field.many_to_one:
            name_with_id = field.name + '_id'
            if _new_parent_pk:
                kwargs[name_with_id] = _new_parent_pk

            if name_with_id in static_fk:
                kwargs[name_with_id] = getattr(obj, name_with_id)

        elif field.concrete:
            kwargs[field.name] = getattr(obj, field.name)
        else:
            pass
    new_instance = obj.__class__(**kwargs)
    new_instance.save()
    new_instance_pk = new_instance.pk
    for ky in children_to_clone.keys():
        child_collection = getattr(new_instance, ky)
        for child in children_to_clone[ky]:
            child_collection.add(
                duplicate_model_with_descendants(child, whitelist=whitelist, _new_parent_pk=new_instance_pk,static_fk=static_fk))

使用例:

original_record = FattAqp.objects.get(pk=4)
WHITELIST = ['lettura', 'periodo', 'dettfaqp']
STATIC_FK = ['fornitore_id','ef_id','an_im_id', 'voce_id']
duplicate_record = duplicate_model_with_descendants(original_record, WHITELIST, static_fk=STATIC_FK)
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top