質問

ある実例のGoFオブザーバを実施したPython?私はビットコードは現在のビットのバーコードのレースを通じてファッションのアクセント(現在発生メッセージstderr場合は魔法の環境でも通知非通知を設定できます。また、クラスのインターフェースのための段階的に結果を返しますなどの保管(メモリ)のためのポスト処となります。(クラス自体は仕事マネージャを同時に実行するコマンドをリモート機以上のssh).

現在の使用量の種るとどうなるのかわかりません:

job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
    for each in job.poll():
        incrementally_process(job.results[each])
        time.sleep(0.2) # or other more useful work
post_process(job.results)

るalernative利用モデル:

job = SSHJobMan(hostlist, cmd)
job.wait()  # implicitly performs a start()
process(job.results)

この動作は、現在のユーティリティしかしな柔軟性を欠く。例えば私は現在のところサポートを簡単に出力のフォーマット、進捗バーとして増結果にも対応 簡単な、完全に統合メッセージ"の出力 post_process() 機能です。

しかし、そういった支援を複数結果の出力ストリーム(進捗バーの端末にアプリケーションのデバッグ警報をログファイルに出力からの成功の仕事をひとつのファイル/ディレクトリのエラーメッセージおよびその他の結果から成功の仕事もします。

この音のような状況にすることのできるオブザーバー...てインスタンスのクラスの受け入れらのオブジェやコンバーが特定の種類のイベントとしています。

私は見る PyPubSub また複数の参照になります。なんなのかとなったテレビアニメの追加は、外部依存性自体も見る価値を活用した大規模シミュレーションを行い、モデル鉱山がいやすくするためのその他に利用します。(プロジェクトとしてもドコマンドおよびクラスのための書面のスクリプト/さい。

短いは知っていて欲しい...ルがあり方に光を当てています。い提案いる他のユーザーのコード。

このコード自体には: classh.

役に立ちましたか?

解決

  

しかし、それは不足柔軟性を行います。

非同期APIは、あなたが望むものである場合は、

うーん...実際に、これは私には良いデザインのように見えます。これは通常です。たぶん、あなたが必要とするすべては、のようなものを持っているPythonの logging のモジュールに標準エラー出力から切り替えることです何Logger.addHandler()でというように、独自のモデルをパブリッシュ/サブスクライブます。

あなたがオブザーバーをサポートしたいならば、

、私のアドバイスは、それをシンプルに保つことです。あなたは本当にのみ、数行のコードを必要とします。

class Event(object):
    pass

class Observable(object):
    def __init__(self):
        self.callbacks = []
    def subscribe(self, callback):
        self.callbacks.append(callback)
    def fire(self, **attrs):
        e = Event()
        e.source = self
        for k, v in attrs.iteritems():
            setattr(e, k, v)
        for fn in self.callbacks:
            fn(e)

あなたの仕事のクラスはObservableをサブクラス化することができます。興味のある何かが起こると、self.fire(type="progress", percent=50)などを呼び出します。

他のヒント

私は他の回答の人々がそれをやり過ぎだと思います。あなたは簡単にコードの未満15行とPythonでのイベントを達成することができます。

EventObserver

あなたは、単純な2つのクラスを持っています。イベントをリッスンしたい任意のクラスには、オブザーバーを継承し、特定のイベントのために(観察)聞くように設定する必要があります。 Eventがインスタンス化して発射された場合、そのイベントに耳を傾け、すべての観察者が指定したコールバック関数を実行します。

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observables = {}
    def observe(self, event_name, callback):
        self._observables[event_name] = callback


class Event():
    def __init__(self, name, data, autofire = True):
        self.name = name
        self.data = data
        if autofire:
            self.fire()
    def fire(self):
        for observer in Observer._observers:
            if self.name in observer._observables:
                observer._observables[self.name](self.data)

class Room(Observer):

    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # Observer's init needs to be called
    def someone_arrived(self, who):
        print(who + " has arrived!")

room = Room()
room.observe('someone arrived',  room.someone_arrived)

Event('someone arrived', 'Lenard')

出力:

Room is ready.
Lenard has arrived!

さらにいくつかのアプローチ...

例:ロギングモジュール

たぶん、あなたが必要とするすべては、Pythonの logging のモジュールに標準エラー出力から切り替えることです、これは強力なパブリッシュ/サブスクライブ・モデルを持っています。

これは、ログレコードの生産を始めるのは簡単です。

# producer
import logging

log = logging.getLogger("myjobs")  # that's all the setup you need

class MyJob(object):
    def run(self):
        log.info("starting job")
        n = 10
        for i in range(n):
            log.info("%.1f%% done" % (100.0 * i / n))
        log.info("work complete")

消費者側では少しより多くの仕事があります。残念ながら、ロガーの出力を設定するように、コードの7行全体が何を取ります。 ;)

# consumer
import myjobs, sys, logging

if user_wants_log_output:
    ch = logging.StreamHandler(sys.stderr)
    ch.setLevel(logging.INFO)
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    myjobs.log.addHandler(ch)
    myjobs.log.setLevel(logging.INFO)

myjobs.MyJob().run()

一方のものの驚くべき量は、loggingパッケージにあります。あなたはファイルの回転セット、電子メールアドレス、およびWindowsイベントログへのログデータを送信する必要がある場合は、あなたがカバーしています。

例:可能オブザーバー最も簡単な

しかし、あなたはまったくのライブラリを使用する必要はありません。オブザーバーをサポートするための非常に簡単な方法は、何もしないメソッドを呼び出すことです。

# producer
class MyJob(object):
    def on_progress(self, pct):
        """Called when progress is made. pct is the percent complete.
        By default this does nothing. The user may override this method
        or even just assign to it."""
        pass

    def run(self):
        n = 10
        for i in range(n):
            self.on_progress(100.0 * i / n)
        self.on_progress(100.0)

# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()
代わりにラムダを書くの

時には、あなただけの素敵である、job.on_progress = progressBar.updateを言うことができます。

これは、それを取得と同じくらい簡単です。一つの欠点は、それが自然に同じイベントをサブスクライブ複数のリスナーをサポートしていないということです。

例:C#ライクなイベント

サポートコードのビットを使用すると、PythonでC#ライクなイベントを取得することができます。ここでは、コードがあります:

# glue code
class event(object):
    def __init__(self, func):
        self.__doc__ = func.__doc__
        self._key = ' ' + func.__name__
    def __get__(self, obj, cls):
        try:
            return obj.__dict__[self._key]
        except KeyError, exc:
            be = obj.__dict__[self._key] = boundevent()
            return be

class boundevent(object):
    def __init__(self):
        self._fns = []
    def __iadd__(self, fn):
        self._fns.append(fn)
        return self
    def __isub__(self, fn):
        self._fns.remove(fn)
        return self
    def __call__(self, *args, **kwargs):
        for f in self._fns[:]:
            f(*args, **kwargs)

プロデューサーはデコレータを使用してイベントを宣言します:

# producer
class MyJob(object):
    @event
    def progress(pct):
        """Called when progress is made. pct is the percent complete."""

    def run(self):
        n = 10
        for i in range(n+1):
            self.progress(100.0 * i / n)

#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()

これは、まさに上記の「シンプルオブザーバー」のコードのように動作しますが、あなたは+=を使って、好きなだけのリスナーを追加することができます。 (C#のとは異なり、イベントハンドラのタイプが存在しない、あなたは、イベントをサブスクライブするときnew EventHandler(foo.bar)する必要はありません、あなたは、イベントを発射する前にnullをチェックする必要はありません。同様のC#、イベントは例外をスケルチしません。)

を選択する方法

loggingはあなたが必要なすべてをした場合は、それを使用。それ以外の場合はあなたのために働く最も簡単なことを行います。注意すべき重要なことは、あなたは大きな外部依存関係を取る必要がないことです。

どの実施が物んに生かされているというだけの理由により、い観察は何かな?以下さの実装は、オブザーバータ、主な特長は以下の通りです:

  1. 利用pythonic.追加オブザーバーへの行き方法 .bar のインスタンス foo, だい foo.bar.addObserver(observer).
  2. オブザーバーは、生かされているようにしている。つまり、オブザーバーコードを使用しない強い参照です。
  3. なサブclassing必要な記述子ftw).
  4. 使用できunhashableます。
  5. として使用できるくしたい単一のクラスです。
  6. (ボーナス)として、今日のコードが存在する適切なランセンドで設置可能 パッケージgithub.

こちらのコード( githubのパッケージ または PyPIパッケージ 最新の日実施):

import weakref
import functools

class ObservableMethod(object):
    """
    A proxy for a bound method which can be observed.

    I behave like a bound method, but other bound methods can subscribe to be
    called whenever I am called.
    """

    def __init__(self, obj, func):
        self.func = func
        functools.update_wrapper(self, func)
        self.objectWeakRef = weakref.ref(obj)
        self.callbacks = {}  #observing object ID -> weak ref, methodNames

    def addObserver(self, boundMethod):
        """
        Register a bound method to observe this ObservableMethod.

        The observing method will be called whenever this ObservableMethod is
        called, and with the same arguments and keyword arguments. If a
        boundMethod has already been registered to as a callback, trying to add
        it again does nothing. In other words, there is no way to sign up an
        observer to be called back multiple times.
        """
        obj = boundMethod.__self__
        ID = id(obj)
        if ID in self.callbacks:
            s = self.callbacks[ID][1]
        else:
            wr = weakref.ref(obj, Cleanup(ID, self.callbacks))
            s = set()
            self.callbacks[ID] = (wr, s)
        s.add(boundMethod.__name__)

    def discardObserver(self, boundMethod):
        """
        Un-register a bound method.
        """
        obj = boundMethod.__self__
        if id(obj) in self.callbacks:
            self.callbacks[id(obj)][1].discard(boundMethod.__name__)

    def __call__(self, *arg, **kw):
        """
        Invoke the method which I proxy, and all of it's callbacks.

        The callbacks are called with the same *args and **kw as the main
        method.
        """
        result = self.func(self.objectWeakRef(), *arg, **kw)
        for ID in self.callbacks:
            wr, methodNames = self.callbacks[ID]
            obj = wr()
            for methodName in methodNames:
                getattr(obj, methodName)(*arg, **kw)
        return result

    @property
    def __self__(self):
        """
        Get a strong reference to the object owning this ObservableMethod

        This is needed so that ObservableMethod instances can observe other
        ObservableMethod instances.
        """
        return self.objectWeakRef()


class ObservableMethodDescriptor(object):

    def __init__(self, func):
        """
        To each instance of the class using this descriptor, I associate an
        ObservableMethod.
        """
        self.instances = {}  # Instance id -> (weak ref, Observablemethod)
        self._func = func

    def __get__(self, inst, cls):
        if inst is None:
            return self
        ID = id(inst)
        if ID in self.instances:
            wr, om = self.instances[ID]
            if not wr():
                msg = "Object id %d should have been cleaned up"%(ID,)
                raise RuntimeError(msg)
        else:
            wr = weakref.ref(inst, Cleanup(ID, self.instances))
            om = ObservableMethod(inst, self._func)
            self.instances[ID] = (wr, om)
        return om

    def __set__(self, inst, val):
        raise RuntimeError("Assigning to ObservableMethod not supported")


def event(func):
    return ObservableMethodDescriptor(func)


class Cleanup(object):
    """
    I manage remove elements from a dict whenever I'm called.

    Use me as a weakref.ref callback to remove an object's id from a dict
    when that object is garbage collected.
    """
    def __init__(self, key, d):
        self.key = key
        self.d = d

    def __call__(self, wr):
        del self.d[self.key]

使用して飾るだけではどのような方法を選択して観察 @event.以下に例を示します

class Foo(object):
    def __init__(self, name):
        self.name = name

    @event
    def bar(self):
        print("%s called bar"%(self.name,))

    def baz(self):
        print("%s called baz"%(self.name,))

a = Foo('a')
b = Foo('b')
a.bar.addObserver(b.bar)
a.bar()

からウィキペディアするます:

from collections import defaultdict

class Observable (defaultdict):

  def __init__ (self):
      defaultdict.__init__(self, object)

  def emit (self, *args):
      '''Pass parameters to all observers and update states.'''
      for subscriber in self:
          response = subscriber(*args)
          self[subscriber] = response

  def subscribe (self, subscriber):
      '''Add a new subscriber to self.'''
      self[subscriber]

  def stat (self):
      '''Return a tuple containing the state of each observer.'''
      return tuple(self.values())

観察可能で、このように使用されます。

myObservable = Observable ()

# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)

# emit parameters to each observer
myObservable.emit(6, 2)

# get updated values
myObservable.stat()         # returns: (8, 3.0, 4, 12)

ジェイソンの答えに基づいて、私は、ドキュメントやテストを含む本格的なPythonモジュールとしてC#ライクなイベントの例を実装しました。私は派手なニシキヘビのものを愛して:)

あなたには、いくつかのすぐに使用できるソリューションをしたいのであれば、あなただけのgithubの上のコードを使用することができます。

例:ツイストログオブザーバー

(他のオブザーバーに加えて)すべてのログイベントを受け取るためにオブザーバーyourCallable()(辞書を受け入れ呼び出し可能)を登録するには:

twisted.python.log.addObserver(yourCallable)

例:完全プロデューサ/コンシューマの例

ツイストPythonのメーリングリストから:

#!/usr/bin/env python
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""

import random

from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver

class Producer:
    """Send back the requested number of random integers to the client."""
    implements(interfaces.IPushProducer)
    def __init__(self, proto, cnt):
        self._proto = proto
        self._goal = cnt
        self._produced = 0
        self._paused = False
    def pauseProducing(self):
        """When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
temporarily."""
        self._paused = True
        print('pausing connection from %s' % (self._proto.transport.getPeer()))
    def resumeProducing(self):
        self._paused = False
        while not self._paused and self._produced < self._goal:
            next_int = random.randint(0, 10000)
            self._proto.transport.write('%d\r\n' % (next_int))
            self._produced += 1
        if self._produced == self._goal:
            self._proto.transport.unregisterProducer()
            self._proto.transport.loseConnection()
    def stopProducing(self):
        pass

class ServeRandom(LineReceiver):
    """Serve up random data."""
    def connectionMade(self):
        print('connection made from %s' % (self.transport.getPeer()))
        self.transport.write('how many random integers do you want?\r\n')
    def lineReceived(self, line):
        cnt = int(line.strip())
        producer = Producer(self, cnt)
        self.transport.registerProducer(producer, True)
        producer.resumeProducing()
    def connectionLost(self, reason):
        print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
reactor.run()

オブザーバの設計への機能的アプローチます:

def add_listener(obj, method_name, listener):

    # Get any existing listeners
    listener_attr = method_name + '_listeners'
    listeners = getattr(obj, listener_attr, None)

    # If this is the first listener, then set up the method wrapper
    if not listeners:

        listeners = [listener]
        setattr(obj, listener_attr, listeners)

        # Get the object's method
        method = getattr(obj, method_name)

        @wraps(method)
        def method_wrapper(*args, **kwags):
            method(*args, **kwags)
            for l in listeners:
                l(obj, *args, **kwags) # Listener also has object argument

        # Replace the original method with the wrapper
        setattr(obj, method_name, method_wrapper)

    else:
        # Event is already set up, so just add another listener
        listeners.append(listener)


def remove_listener(obj, method_name, listener):

    # Get any existing listeners
    listener_attr = method_name + '_listeners'
    listeners = getattr(obj, listener_attr, None)

    if listeners:
        # Remove the listener
        next((listeners.pop(i)
              for i, l in enumerate(listeners)
              if l == listener),
             None)

        # If this was the last listener, then remove the method wrapper
        if not listeners:
            method = getattr(obj, method_name)
            delattr(obj, listener_attr)
            setattr(obj, method_name, method.__wrapped__)

これらの方法は、その後、任意のクラスメソッドにリスナーを追加するために使用することができます。たとえばます:

class MyClass(object):

    def __init__(self, prop):
        self.prop = prop

    def some_method(self, num, string):
        print('method:', num, string)

def listener_method(obj, num, string):
    print('listener:', num, string, obj.prop)

my = MyClass('my_prop')

add_listener(my, 'some_method', listener_method)
my.some_method(42, 'with listener')

remove_listener(my, 'some_method', listener_method)
my.some_method(42, 'without listener')

と出力されます:

method: 42 with listener
listener: 42 with listener my_prop
method: 42 without listener

OPが尋ね、「Pythonで実装GoFのオブザーバーのいずれかの典型的な例がありますか?」 これは、Python 3.7での の例です。この観測クラスは、関係を作成する要求を満たすの 1観察多くの観察者のままの独立したのそれらの構造の

from functools import partial
from dataclasses import dataclass, field
import sys
from typing import List, Callable


@dataclass
class Observable:
    observers: List[Callable] = field(default_factory=list)

    def register(self, observer: Callable):
        self.observers.append(observer)

    def deregister(self, observer: Callable):
        self.observers.remove(observer)

    def notify(self, *args, **kwargs):
        for observer in self.observers:
            observer(*args, **kwargs)


def usage_demo():
    observable = Observable()

    # Register two anonymous observers using lambda.
    observable.register(
        lambda *args, **kwargs: print(f'Observer 1 called with args={args}, kwargs={kwargs}'))
    observable.register(
        lambda *args, **kwargs: print(f'Observer 2 called with args={args}, kwargs={kwargs}'))

    # Create an observer function, register it, then deregister it.
    def callable_3():
        print('Observer 3 NOT called.')

    observable.register(callable_3)
    observable.deregister(callable_3)

    # Create a general purpose observer function and register four observers.
    def callable_x(*args, **kwargs):
        print(f'{args[0]} observer called with args={args}, kwargs={kwargs}')

    for gui_field in ['Form field 4', 'Form field 5', 'Form field 6', 'Form field 7']:
        observable.register(partial(callable_x, gui_field))

    observable.notify('test')


if __name__ == '__main__':
    sys.exit(usage_demo())
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top