본문 바로가기

Programming

thrading - Manage concurrent threads

threading - Manage concurrent threads

link : https://pymotw.com/2/threading/

  • 스레딩 모듈은 스레드의 저수준 기능을 기반으로 스레드 작업을 훨씬 쉽고, 파이토닉하다(?). 스레드를 사용하면 프로그램이 동일한 프로세스 공간에서 여러 작업을 동시에 실행할 수 있다.

Thread Objects

  • 스레드를 사용하는 가장 간단한 방법은 대상 함수로 스레드를 인스턴스화하고, start()를 호출해 작동을 시작하는 것이다.

    import threading
    
    def worker():
        """thread worker function"""
        print 'Worker'
        return
    
    threads = []
    for i in range(5):
        t = threading.Thread(target=worker)
        threads.append(t)
        t.start()

    출력은 각각 "Worker"가 있는 5줄이다.

    $ python threading_simple.py
    
    Worker
    Worker
    Worker
    Worker
    Worker

    스레드를 생성하고 인수를 전달해 수행할 작업을 알려주는 것이 유용한다. 이 예제는 숫자를 전달해 스레드가 인쇄한다.

    import threading
    
    def worker(num):
        """thread worker function"""
        print 'Worker: %s' % num
        return
    
    threads = []
    for i in range(5):
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)
        t.start()

    정수 인수는 이제 각 스레드가 인쇄한 메시지에 포함된다.

    $ python -u threading_simpleargs.py
    
    Worker : 0
    Worker : 1
    Worker : 2
    Worker : 3
    Worker : 4

Determining the Current Thread

  • 스레드를 식별하거나 이름을 지정하기 위해 인수를 사용하는 것은 번거롭고 불필요하다. 각 스레드 인스턴스에는 스레드가 작성 될 때 변경할 수 있는 기본 값이 있는 이름이 있다. 이름 지정 스레드는 다른 작업을 처리하는 여러 서비스 스레드가 있는 서버 프로세스에서 유용한다.

    import threading
    import time
    
    def worker():
        print threading.currentThread().getName(), 'Starting'
        time.sleep(2)
        print threading.currentThread().getName(), 'Exiting'
    
    def my_service():
        print threading.currentThread().getName(), 'Starting'
        time.sleep(3)
        print threading.currentThread().getName(), 'Exiting'
    
    t = threading.Thread(name='my_service', traget=my_service)
    w = threading.Thread(name='worker', target=worker)
    w2 = threading.Thread(target=worker) # use default name
    
    w.start()
    w2.start()
    t.start()

    디버그 출력에는 각 줄의 현재 스레드 이름이 포함된다. 스레드 이름 열에 "Thread-1"이 있는 행은 이름이 없는 스레드 w2에 해당한다.

    $ python -u threading_names.py
    
    worker Thread-1 Starting
    my_service Starting
    Starting
    Thread-1worker Exiting
    Exiting
    my_service Exiting

    대부분의 프로그램은 디버그를 위해 프린트를 사용하지 않는다. 로깅 모듈은 포맷터 코드 % (threadName) s를 사용해 모든 로그 메시지에 스레드 이름 임베드를 지원한다. 로그 메시지에 스레드 이름을 포함하면 해당 메시지를 소스로 쉽게 추적 할 수 있다.

    import logging
    import threading
    import time
    
    logging.basicConfig(level=logging.DEBUG, 
                        format='[%(levelname)s] (%(threadName)-10s)%(message)s',
                       )
    
    def worker():
        logging.debug('Starting')
        time.sleep(2)
        logging.debug('Exiting')
    
    def my_service():
        logging.debug('Starting')
        time.sleep(3)
        logging.debug('Exiting')
    
    t = threading.Thread(name='my_service', target=my_service)
    w = threading.Thread(name='worker', target=worker)
    w2 = threading.Thread(target=worker) # use default name
    
    w.start()
    w2.start()
    t.start()

    logging은 스레드로부터 안전하므로 다른 스레드의 메시지는 출력에서 고유하게 유지된다.

    $ python threading_name_log.py
    
    [DEBUG] (worker        ) Starting
    [DEBUG]    (Thread-1    ) Starting
    [DEBUG] (my_service ) Starting
    [DEBUG] (worker    ) Exiting
    [DEBUG] (Thread-1  ) Exiting
    [DEBUG] (my_service) Exiting

Demon vs. Non-Daemon Threads

  • 지금까지 예제 프로그램은 모든 스레드가 작업을 완료 할 때까지 내재적으로 대기했습니다. 때때로 프로그램은 메인 프로그램의 종료를 막지 않고 실행되는 데몬으로 스레드를 생성한다. 데몬 스레드를 사용하면 스레드를 쉽게 중단 할 수 있는 방법이 없거나 작업 도중 스레드가 죽어도 데이터가 손실되거나 손상되지 않는 서비스에 유용하다. 스레드를 데몬으로 표시하려면 부울 인수와 함께 setDemon() 메소드를 호출해라. 기본 값은 스레드가 데몬이 아닌 것이므로 True를 전달하면 데몬 모드가 켜진다.

    import threading
    import time
    import logging
    
    logging.basicConfig(level=logging.DEBUG,
                        format='(%(threadName)-10s) %(message)s',
                       )
    
    def daemon():
        logging.debug('Starting')
        time.sleep(2)
        logging.debug('Exiting')
    
    d = threading.Thread(name='daemon', target=daemon)
    d.setDaemon(True)
    
    def non_daemon():
        logging.debug('Starting')
        logging.debug('Exiting')
    
    t = threading.Thread(name='non-daemon', target=non_daemon)
    
    d.start()
    t.start()

    데몬 스레드가 2초의 절전 모드에서 깨어나기 전에 모든 비데몬 스레드(주 스레드 포함)가 종료되므로 출력에 데몬 스레드의 "종료" 메시지가 포함되지 않는다.

    $ python threading_daemon.py
    
    (daemon        ) Starting
    (non-daemon    ) Starting
    (non-daemon ) Exiting

    데몬 스레드가 작업을 완료 할 때까지 기다리려면 join() 메소드를 사용해라.

    import threading
    import time
    import logging
    
    logging.basicConfig(level=logging.DEBUG,
                        format='(%(threadName)-10s) %(message)s',
                       )
    
    def daemon():
        logging.debug('Starting')
        time.sleep(2)
        logging.debug('Exiting')
    
    d = threading.Thread(name='daemon', target=daemon)
    d.setDaemon(True)
    
    def non_daemon():
        logging.debug('Starting')
        logging.debug('Exiting')
    
    t = threading.Thread(name='non-daemon', target=non_daemon)
    
    d.start()
    t.start()
    
    d.join()
    t.join()

    join()을 사용해 데몬 스레드가 종료 될 때까지 "종료" 메시지가 생성 될 수 있음을 의미한다.

    $ python threading_daemon_join.py
    
    (daemon    ) Starting
    (non-daemon) Starting
    (non-daemon) Exiting
    (daemon    ) Exiting

    기본적으로 join()은 무기한 차단된다. 또한, 시간 종료 인수를 전달 할 수도 있따. 스레드가 초과 기간 내에 완료되지 않으면 join()은 어떤 방식으로든 리턴된다.

    import threading
    import time
    import logging
    
    logging.basicConfig(level=logging.DEBUG,
                        format='(%(threadName)-10s) %(message)s',
                        )
    
    def daemon():
        logging.debug('Starting')
        time.sleep(2)
        logging.debug('Exiting')
    
    d = threading.Thread(name='daemon', target=daemon)
    d.setDaemon(True)
    
    def non_daemon():
        logging.debug('Starting')
        logging.debug('Exiting')
    
    t = threading.Thread(name='non-daemon', target=non_daemon)
    
    d.start()
    t.start()
    
    d.join(1)
    print 'd.isAlive()', d.isAlive()
    t.join()

    전달된 시간 초과는 데몬 스레드가 대기하는 시간보다 짧으므로 join()이 반환된 후에도 스레드는 여전히 "사용 중"이다.

    $ python threading_daemon_join_timeout.py
    
    (daemon    ) Starting
    (non-daemon) Starting
    (non-daemon) Exiting
    d.isAlive() True

Enumerating All Thread

  • 기본 프로세스를 종료하기 전에 모든 데몬 스레드에 대한 명시적 핸들을 유지해 완료되었는지 확인하지 않아도 된다. enumerate()는 활성화 된 thread 인스턴스의 목록을 반환한다. 이 목록에는 현재 스레드가 포함되며 현재 스레드에 참여할 수 없으므로 (교착 상태가 발생하므로) 건너 뛰어야 한다.

    import random
    import threading
    import time
    import logging
    
    logging.basicConfig(level=logging.DEBUG,
                        format='(%(threadName)-10s) %(message)s',
                        )
    
    def worker():
        """thread worker function"""
        t = threading.currentThread()
        pause = random.randint(1,5)
        logging.debug('sleeping %s', pause)
        time.sleep(pause)
        logging.debug('ending')
        return
    
    for i in range(3):
        t = threading.Thread(target=worker)
        t.setDaemon(True)
        t.start()
    
    main_thread = threading.currentThread()
    for t in threading.enumerate():
        if t is main_thread:
            continue
        logging.debug('joining %s', t.getName())
        t.join()

    worker가 임의이 시간동안 sleep하기 때문에 이 프로그램의 출력이 다를 수 있다. 다음과 같이 보일 것이다:

    $ python threading_enumerate.py
    
    (Thread-1  ) sleeping 3
    (Thread-2  ) sleeping 2
    (Thread-3  ) sleeping 5
    (MainThread) joining Thread-1
    (Thread-2  ) ending
    (Thread-1  ) ending
    (MainThread) joining Thread-3
    (Thread-3  ) ending
    (MainThread) joining Thread-2

Subclassing Thread

  • 시작 시에 스레드는 기본 초기화를 수행 한 다음 생성자에 전달된 대상 함수를 호출하는 run() 메소드를 호출한다. Thread의 하위 클래스를 만들려면 run()을 재정의해 필요한 모든 작업을 수행해라.

    import threading
    import logging
    
    #logging 초기화 (다른 코드 참조)
    
    class MyThread(threading.Thread):
        def run(self):
            logging.debug('running')
            return
    
    for i in range(5):
        t = MyThread()
        t.start()

    run()의 반환 값은 무시된다.

    $ python threading_subclass.py
    
    (Thread-1  ) running
    (Thread-2  ) running
    (Thread-3  ) running
    (Thread-4  ) running
    (Thread-5  ) running

    Thread 생성자에 전달된 args 및 kwargs 값은 전용 변수에 저장되므로 하위 클래스에서 쉽게 액세스 할 수 없다. 인수를 사용자 정의 스레드 유형으로 전달하려면 생성자를 재정의해 서브 클래스에서 볼 수 있는 인스턴스 속성에 값을 저장해라.

    import threading
    import logging
    
    #logging 초기화 (다른 코드 참조)
    
    class MyThreadWithArgs(threading.Thread):
        def __init__(self, group=None, target=None, name=None,
                     args=(), kwargs=None, verbose=None):
            threading.Thread.__init__(self, group=group, target=target, name=name,
                                      verbose=verbose)
            self.args = args
            self.kwargs = kwargs
            return
    
        def run(self):
            logging.debug('running with %s and %s', self.args, self.kwargs)
            return
    
    

for i in range(5):
t = MyThreadWithArgs(args=(i,), kwargs={'a':'A', 'b':'B'})
t.start()


   MyThreadWithArgs는 Thread와 동일한 API를 사용하지만 다른 클래스는 다른 클래스와 마찬가지로 스레드의 목적과 더 직접적으로 많은 또는 다른 인수를 취하도록 생성자 메소드를 쉽게 변경할 수 있다.

$ python threading_subclass_args.py

(Thread-1 ) running with (0,) and {'a': 'A', 'b': 'B'}
(Thread-2 ) running with (1,) and {'a': 'A', 'b': 'B'}
(Thread-3 ) running with (2,) and {'a': 'A', 'b': 'B'}
(Thread-4 ) running with (3,) and {'a': 'A', 'b': 'B'}
(Thread-5 ) running with (4,) and {'a': 'A', 'b': 'B'}




**Timer Threads**
----

- Thread를 서브 클래싱하는 이유의 한 예는 Timer에 의해 제공되며, 스레딩에도 포함된다. 타이머는 지연된 후 작업을 시작하며 해당 지연 시간 내에 언제든지 취소 할 수 있다.

  ```python
  import threading
  import time
  import logging

  logging.basicConfig(level=logging.DEBUG,
                      format='(%(threadName)-10s) %(message)s',
                      )

  def delayed():
      logging.debug('worker running')
      return

  t1 = threading.Timer(3, delayed)
  t1.setName('t1')
  t2 = threading.Timer(3, delayed)
  t2.setName('t2')

  logging.debug('starting timers')
  t1.start()
  t2.start()

  logging.debug('waiting before canceling %s', t2.getName())
  time.sleep(2)
  logging.debug('canceling %s', t2.getName())
  t2.cancel()
  logging.debug('done')

두 번째 타이머는 실횡되지 않으며 나머지 주 프로그램이 완료된 후 첫 번째 타이머가 실행되는 것으로 나타난다. 데몬 스레드가 아니기 때문에 기본 스레드가 완료되면 암시적으로 join된다.

  $ python threading_timer.py

  (MainThread) starting timers
  (MainThread) waiting before canceling t2
  (MainThread) canceling t2
  (MainThread) done
  (t1        ) worker running

Signaling Between Threads

  • 여러 스레드를 사용하는 지점은 별도의 작업을 동시에 실행해 실행하는 것이지만 두 개 이상의 스레드에서 작업을 동기화 할 수 있어야 하는 경우가 있다. 스레드간에 통신하는 간단한 방법은 Event 객체를 사용하는 것이다. 이벤트는 호출자가 set() 또는 clear() 할 수 있는 내부 플래그를 관리한다. 다른 스레드는 플래그가 설정 될 때까지 wait() 하여 계속 진행될 때까지 진행을 효과적으로 차단할 수 있다.

    import logging
    import threading
    import time
    
    logging.basicConfig(level=logging.DEBUG,
                        format='(%(threadName)-10s) %(message)s',
                        )
    
    def wait_for_event(e):
        """Wait for the event to be set before doing anything"""
        logging.debug('wait_for_event starting')
        event_is_set = e.wait()
        logging.debug('event set: %s', event_is_set)
    
    def wait_for_event_timeout(e, t):
        """Wait t seconds and then timeout"""
        while not e.isSet():
            logging.debug('wait_for_event_timeout starting')
            event_is_set = e.wait(t)
            logging.debug('event set: %s', event_is_set)
            if event_is_set:
                logging.debug('processing event')
            else:
                logging.debug('doing other work')
    
    

e = threading.Event()
t1 = threading.Thread(name='block',
target=wait_for_event,
args=(e,))
t1.start()

t2 = threading.Thread(name='non-block',
target=wait_for_event_timeout,
args=(e, 2))
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')


   wait() 메서드는 시간이 초과되기 전에 이벤트를 기다리는 시간(초)을 나타내는 인수를 사용한다. 이벤트 설정  여부를 나타내는 부울을 리턴하므로 호출자는 wait()이 리턴된 이유를 알고 있다. isSet() 메소드는 블로킹에 대한 두려움 없이 이벤트에서 별도로 사용될 수 있다.
   이 예에서 wait_for_event_timeout()은 무기한 차단하지 않고 이벤트 상태를 확인한다. wait_for_event()는 wait() 호출에서 차단되며, 이벤트 상태가 변경 될 때까지 반환되지 않는다.

$ python threading_event.py

(block ) wait_for_event starting
(non-block ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(non-block ) event set: False
(non-block ) doing other work
(non-block ) wait_for_event_timeout starting
(MainThread) Event is set
(block ) event set: True
(non-block ) event set: True
(non-block ) processing event




**Controlling Access to Resources**
----

- 스레드 작업을 동기화하는 것 외에도 데이터가 손상되거나 누락되지 않도록 공유 리소스에 대한 액세스를 제어 할 수 있어야 한다. 파이썬의 내장 데이터 구조(목록, 사전 등)는 조작을 위해 원자 바이트 코드를 사용하는 부작용으로 스레드로부터 안전하다. Python으로 구현 된 다른 데이터 구조 또는 정수 및 부동 소수점과 같은 간단한 유형에는 그러한 보호 기능이 없다. 객체에 대한 동시 액세스를 방지하려면 Lock 객체를 사용해라.

  ```python
  import logging
  import random
  import threading
  import time

  logging.basicConfig(level=logging.DEBUG,
                      format='(%(threadName)-10s) %(message)s',
                      )

  class Counter(object):
      def __init__(self, start=0):
          self.lock = threading.Lock()
          self.value = start
      def increment(self):
          logging.debug('Waiting for lock')
          self.lock.acquire()
          try:
              logging.debug('Acquired lock')
              self.value = self.value + 1
          finally:
              self.lock.release()

  def worker(c):
      for i in range(2):
          pause = random.random()
          logging.debug('Sleeping %0.02f', pause)
          time.sleep(pause)
          c.increment()
      logging.debug('Done')

  counter = Counter()
  for i in range(2):
      t = threading.Thread(target=worker, args=(counter,))
      t.start()

  logging.debug('Waiting for worker threads')
  main_thread = threading.currentThread()
  for t in threading.enumerate():
      if t is not main_thread:
          t.join()
  logging.debug('Counter: %d', counter.value)

@더 남음

'Programming' 카테고리의 다른 글

Data Structure  (0) 2020.03.17
Network (Basic)  (0) 2020.03.17