C# 6.0 완벽 가이드/ 병렬 프로그래밍

Contents

  • 이번 장에서는 다중 코어 프로세서의 활용을 목적으로 하는 다음과 같은 다중 스레드 API 들과 구축 요소들을 살펴본다.
    • PLINQ(Parallel LINQ; 병렬 LINQ)
    • Parallel 클래스
    • 작업 병렬성 구축 요소
    • 동시적 컬렉션(concurrent collection)
  • 이들은 모두 .NET Framework 4.0에서 도입되었다. 이들을 통틀어 PFX(Parallel Framework; 병렬 프레임워크)라고 부르기도한다.
    • 그리고 Parallel 클래스와 작업 병렬성 요소들을 합해서 TPL(Task Parallel Library; 작업 병렬 라이브러리)이라고 부른다.
  • 이번 장을 이해하려면 14장에서 말한 기본 개념들에 익숙해야 한다. 특히 잠근, 스레드 안전성, Task 클래스를 숙지할 필요가 있다.

PFX가 왜 필요한가?

  • 지난 10여년 사이에 CPU 제조사들은 단일 코어 프로세서에서 다중 코어 프로세서로 초점을 옮겼다. 이떄문에 예전처럼 그냥 CPU만 빠랄지면 단일 스레드 코드도 저절로 빨라지는 현상은 더는 기대할 수 없게 되었다. 이제 성능 향상을 위해서는 여러 개의 코어(core)를 제대로 활용해야 한다.
  • 서버 응용 프로그램들은 대부분 각 클라이언트 요청을 개별 스레드에서 처리하는 형태이므로 여러 코어를 활용하는 것이 어렵지 않다. 그러나 데스크톱 응용 프로그램은 그렇지 않다. 데스크톱 응용 프로그램에서 다중 코어를 활용하려면 프로그램 중 처리량이 많은 코드의 구조를 다음과 같은 형태로 개선해야 한다.
    1. 처리할 일거리를 더 작은 덩어리들로 분할(partitioning)한다.
    2. 각 덩어리를 다중 스레드 기법을 이용해서 병렬로 처리한다.
    3. 처리가 끝난 스레드들의 결과를 스레드에 안전한, 그리고 성능 효율적인 방식으로 취합(collating) 한다.
  • 이러한 개선을 고전적인 다중 스레드 적용 수단들을 이용해서 독자가 직접 수행할 수도 있지만, 그리 쉬운 일은 아니다. 특히 분할과 취합 단계가 까다롭다.
    • 게다가 다수의 스레드가 같은 자료를 동시에 다루는 경우 스레드 안전성 확보에 흔히 쓰이는 잠금 전략들을 그대로 적용하면 경합이 심해져서 성능이 떨어진다.
    • PFX 라이브러리들은 바로 이런 상황에 도움이 되도록 설계되었다.
  • 다중 코어 또는 다중 프로세서를 활용하는 프로그래밍을 병렬 프로그래밍(parallel programming)이라고 부른다. 병렬 프로그래밍은 그보다 더 넓은 개념인 다중 스레드 적용(multithreading)의 일부이다.

PFX의 개념들

  • 일거리(work)를 여러 스레드로 분할하는 전략은 크게 두 가지이다.
    • 하나는 자료 병렬성(data parallelism)을 실현하는 것이고 또 하나는 작업 병렬성(task parallelism)을 실현하는 것이다.
  • 어떤 작업들을 많은 수의 자료 값들에 대해 수행할 때는 모든 스레드가 같은 종류의 작업들을 수행하되, 각 스레드가 그 자료의 일부만 처리하게 하는 구조가 적합하다.
    • 여러 스레드로 분할해서 그 처리를 병렬화하는 대상이 자료라서(작업이 아니라) 이를 자료 병렬성이라 부른다.
    • 그와는 반대로 작업 병렬성 전략에서는 작업들을 분할한다. 즉, 작업 병렬설 전략에서는 스레드가 각자 다른 작업을 수행한다.
  • 일반적으로 자료 병렬성이 더 쉽고 더 병렬적인 하드웨어로의 규모가변성도 좋다.
    • 이는 자료들을 분할하는 덕분에 스레드들이 공유하는 자료가 줄어들거나 아예 사라지기 때문이다.(공유 자료가 적으면 경합과 스레드 안전성 문제도 줄어든다)
    • 또한 자료 병렬성 전략은 일반적으로 서로 다른 종류의 작업들보다 서로 다른 자료 값들이 더 많은 경우가 흔하다는 사실과 잘 맞는다. 이는 병렬성 수준을 더욱 높이는 요인이 된다.
  • 자료 병렬성은 또한 구조적 병렬성(structured parallelism)에 도움이 된다. 구조적 병렬성이란 병렬적인 작업 단위들이 프로그램의 동일한 지점에서 시작하고 끝나는 것을 말한다.
    • 반면 작업 병렬성은 프로그램의 구조를 흐트러뜨리는 경향이 있다. 즉, 작업 병렬성을 적용하면 병렬 작업 단위들이 프로그램의 여러 곳에서 시작하고 끝나는 경우가 많다.
    • 구조적 병렬성이 좋으면 코드가 간단해지고 오류의 여지가 줄어든다. 또한 어려운 과제인 일거리 분할과 스레드 실행 관리를(심지어는 자료 취합도) 외부 라이브러리에 맡길 수 있게 된다.

PFX의 구성요소

  • PFX의 기능은 두 계층으로 이루어져 있다.(아래 그림) 상위 계층에는 PILNQ와 Parallel 클래스라는 두 구조적 자료 병렬성 API가 있다. 하위 계층은 주로 자료 병렬성 클래스들로 구성되며, 병렬 프로그래밍 활동을 돕는 일단의 추가 요소들도 있다.

  • 이 중 가장 풍부한 기능을 제공하는 것은 PLINQ이다. PLINQ는 병렬화의 모든 단계(일거리를 작업들로 분할하고, 그 작업들을 여러 스레드에서 실행하고, 그 결과들을 하나의 단일한 출력 순차열로 취합하는 등등)를 자동화한다.
  • PLINQ는 선언식(declarative)이다. 즉, 프로그래머가 이러이러한 작업(LINQ 질의 형태로 된)을 병렬화하겠다고 선언하기만 하면 .NET Framework가 알아서 구현 세부사항을 처리해 준다.
  • 반면 PFX의 다른 구성요소들은 명령식(imperative)이다. 즉, 분할이나 취합을 수행하는 코드를 독자가 명시적으로 작성해야 한다.
    • 예컨대 Parallel 클래스를 사용할 떄는 스레드들의 결과를 독자가 직접 취합해야 하며, 자료 병렬성 수단들을 사용할 때는 일거리를 독자가 직접 분할해야 한다.
자동 일거리 분할 자동 결과 취합
PLINQ
Parallel 클래스 아니요
PFX의 작업 병렬성 아니요 아니요

 

  • 동시적 컬렉션과 회전 기본수단(spinning primitive)들은 저수준 병렬 프로그래밍 활동을 돕는다. 이들은 PFX가 오늘날의 하드웨어 뿐만 아니라 코어 수가 훨씬 많은 향후 세대의 프로세서들에도 작동하도록 설계된 것이라는 점에서 중요하다.
    • 예컨대 벌목한 나무를 32명의 일꾼이 옮긴다고 할 때, 가장 어려운 부분은 일꾼들이 서로의 길을 방해하지 않게 하는 것이다. 알고리즘을 32개의 코어로 분할 때도 마찬가지이다.
    • 만일 공유 자원을 보통의 잠금 수단을 이용해서 보호한다면, 경합과 차단 때문에 임의의 시점에서 코어 중 일부만 작동하고 나머지는 빈둥거릴 가능성이 크다.
    • 동시적 컬렉션은 고도로 동시적인 접근 상황에 적합하도록 차단의 최소화 또는 제거에 초점을 두고 특별히 조율된 컬렉션이다.
    • PLINQ와 Parallel 클래스 자체는 이러한 동시적 컬렉션과 회전 기본수단에 의존해서 작업을 효율적으로 관리한다.

PFX의 또 다른 용도

  • 병렬 프로그래밍 구축 요소들은 다중 코어를 활용할 때 뿐만 아니라 다음과 같은 상황들에 유용하다.
    • 스레드에 안전한 대기열이나 스택, 사전이 필요할 때 동시적 컬렉션이 적합한 경우가 종종 있다.
    • BlockingCollection을 이용하면 생산자/소비자 구조를 손쉽게 구현할 수 있으며, 동시성을 제한하기도 쉽다.
    • 작업 객체는 비동기 프로그래밍의 토대이다.

PFCX의 바람직한 적용 대상

  • PFX의 주된 용도는 병렬 프로그래밍, 즉 다수의 코어 또는 프로세서를 활용해서 계산량이 많은 코드의 속도를 높이는 것이다.
  • 다중 코어 활용은 암달의 법칙(Amdahl’s law)으로 제한된다.
    • 암달의 법칙은 병렬화로 얻을 수 있는 최대 성능 향상은 반드시 순차적으로 실행해야 하는 코드의 비율로 결정된다는 것이다. 예컨대 알고리즘의 실행 시간 중 2/3를 병렬화 할 수 있다면, 코어가 무한히 많다고 해도 성능 향상은 3배를 넘지 못한다.
  • 따라서 계산량이 많은 코드를 병렬화 할 때는 먼저 그 코드에서 병목에 해당하는 부분을 실제로 병렬화할 수 있는지부터 확인할 필요가 있다.
    • 또한 애초에 코드의 계산량이 그렇게 많을 필요가 있는지도 점검해 보는게 좋다. 병렬화보다는 최적화가 더 쉽고 효과적일 때가 많다.
  • PFX를 적용해서 성능 이득을 얻을 수 있는 가장 쉬운 대상은 소위 대놓고 병렬적인(embarrassingly parallel) 문제, 즉 효율적으로 실행할 수 있는 작업 단위들로 손쉽게 분할할 수 있는 일거리이다 (이런 문제에서는 구조적 병렬성 전략이 아주 적합하다)
    • 예컨대 영상(화상) 처리 작업, 광선 추적(ray tracing) 렌더링, 그리고 수학이나 암호학의 무차별 시도(brute force) 접근방식 등이 그러한 문제에 속한다.
    • 대놓고 병렬적인 것은 아닌 문제의 예로는 빠른정렬(quicksort) 알고리즘의 한 최적화 버전을 들 수 있다. 그 알고리즘을 병렬화해서 좋은 결과를 얻으려면 미리 고민을 좀 해야 하며, 어쩌면 비구조적 병렬성이 필요할 수도 있다.

PLINQ

  • PLINQ는 지역 LINQ 질의를 자동으로 병렬화한다. PLINQ의 장점은 사용하기 쉽다는 것이다. PLINQ에서는 일거리의 분할과 결과의 취합을 .NET Framework에 맡길 수 있다.
  • PLINQ를 사용하려면 그냥 입력 순차열에 대해 AsParallel을 호출할 후, 평소대로 LINQ 질의를 계속 진행하면 된다.
    • 다음 질의는 컴퓨터의 모든 코어를 활용해서 3에서 100,000 사이의 소수를 계산한다.
// 간단한(최적화 되지 않은) 알고리즘을 이용해서 소수를 계산한다.
IEnumerable<int> numbers = Enumerable.Range(3, 10000-3);

var parallelQuery = 
  from n in numbers.AsParallel()
  where Enumerable.Range(2, (int)Math.Sqrt(n)).All(i => n % i > 0)
  select n;

int[] primes = parallelQuery.ToArray();
  • AsParallel은 System.Linq.ParallelEnumerable 클래스에 정의되어 있는 확장 메서드이다. 이 메서드는 입력을 ParallelQuery<TSource> 파생 형식의 순차열로 감싼다.
    • 이에 의해 이후에 호출되는 LINQ 질의 연산자들은 평소와는 달리 ParallelEnumerable에 정의되어 있는 일단의 확장 메서드들에 묶인다.
    • ParallelEnumerable에는 각 표준 질의 연산자를 병렬적으로 구현한 확장 메서드가 갖추어져 있다. 본질적으로 이 확장 메서드들은 입력 순차열을 여러 덩어리로 분할하고 각각을 개별 스레드에서 처리한 후 그 결과들을 하나의 순차열로 취합한다(아래 그림)
    • 그 출력 순차열은 통상적인 방식으로 접근 또는 열거할 수 있다.

  • AsParallel이 돌려준 ParallelQuery 순차열에 대해 AsSequential을 호출하면 이후의 질의 연산자들은 평소대로 표준 질의 연산자들에 묶여서 순차적으로 처리된다.
    • 부수 효과가 있거나 스레드에 안전하지 않은 메서드를 호출하려면 반드시 이 단계를 먼저 밟아야 한다.
  • 입력 순차열 두 개를 받는 질의 연산자(Join, GroupJoin, Concat, Union, Intersect, Except, Zip)를 사용하려면 반드시 두 입력 순차열 모두에 AsParallel을 적용해야 한다.(그렇지 않으면 예외가 발생한다)
    • 그러나 질의의 단계마다 매번 AsParallel을 적용할 필요는 없다. PLINQ의 질의 연산자들은 또 다른 ParallelQuery 순차열을 출력하기 때문이다.
    • 사실 AsParallel을 다시 호출하면 그때까지의 결과들을 취합하고 순차열을 다시 분할해야 하므로 오히려 성능이 떨어진다.
mySequence.AsParallle()  // 순차열을 ParallelQuery<int>로 감싼다.
  .Where(n => n > 100)  // 또 다른 ParalleQuery<int>를 출력한다.
  .AsParallel()  // 불필요하며 오히려 비효율적이다.
  .Select(n => n * n)
  • 모든 질의 연산자가 효과적으로 병렬화되지는 않는다. 효과적으로 병렬화할 수 없는 연ㅅ나자들에 대한 PLINQ의 연산자들은 그냥 순차적으로 구현되어 있다.
    • 또한 병렬화에 따른 부담 때문에 주어진 질의의 실행이 오히려 느려질 가능성이 있으면 PLINQ가 질의를 순차적으로 실행할 수도 있다.
  • PLINQ는 지역 컬렉션에만 작동한다. LINQ to SQL이나 Entity Framework에는 작동하지 않는데, 그런 종류의 LINQ 질의는 런타임이 SQL 문으로 번역해서 데이터베이스 서버에서 실행하기 때문이다.
    • 그러나 데이터베이스 질의에서 얻은 결과 집합에 대한 추가적인 지역 질의에 PLINQ를 적용하는 것은 가능하다.
  • PLINQ 질의에서 예외가 발생하면, 그 예외는 AggregateException 형식의 예외로 다시 던져진다. 원래의 예외는 그 AggregateException 예외의 InnerExceptions 속성에 들어 있다.

AsParallel이 기본이 아닌 이유

  • AsParallel이 LINQ 질의를 투명하게 병렬화해준다는 점을 생각하면, ‘애초에 표준 질의 연산자들을 병렬화해서 PLINQ를 기본으로 해도 되지 않을까?’ 라는 의문이 생길만도 하다.
  • 그러나 PLINQ를 기본이 아니라 선택 사항으로 두는게 합당한 이유가 몇 가지 있다. 첫째로 PLINQ가 유용하려면 일꾼 스레드들을 충분히 활용할 수 있을 정도로 계산량이 많은 일거리가 있어야 한다.
    • 대부분의 LINQ to Objects 질의는 아주 빨리 끝나므로 병렬화가 필요 없을 뿐만 아니라, 사실 분할과 취합, 추가적인 스레드들의 관리 때문에 속도가 더 느려질 수 있다.
  • 그 외에도 다음과 같은 이유가 있다.
    • 요소들의 순서와 관련해서 PLINQ 질의의 출력이 (기본적으로) LINQ 질의의 출력과 다를 수 있다
    • PLINQ는 예외를 AggregateException으로 감싼다(하나의 질의에서 여러 개의 예외가 던져질 수도 있기 때문이다)
    • 질의가 스레드에 안전하지 않은 메서드를 호출하면 PLINQ가 신뢰할 수 없는 결과를 낸다.
  • 마지막으로 PLINQ는 조율과 조정을 위한 확장 지점을 꽤 많이 제공한다. 표준 LINQ to Objects API에 그런 미묘한 기능들을 부여하면 프로그래머의 주의를 흐트러뜨릴 위험이 있다.

병렬 실행의 득과 실

  • 보통의 LINQ 질의처럼 PLINQ 질의들도 게으르게 평가된다. 즉, 질의는 질의 결과를 소비하는 지점에 도달했을 때 비로소 실행된다.
    • 질의 결과를 소비하는 지점은 흔히 foreach 문이지만,  ToArray 같은 변환 연산자나 하나의 요소 또는 값을 돌려주는 연산자일 수도 있다.
  • 그러나 결과의 여럭에 의해 PLINQ 질의가 실제로 실행되는 시점부터는 진행 방식이 보통의 순차적인 질의와 조금 다르다.
    • 순차 질의의 과정은 전적으로 소비자가 끌어오기(pull) 방식으로 주도한다. 즉, 입력 순차열의 요소들은 소비자의 요구에 의해 하나씩 끌려 나온다.
    • 그러나 일반적으로 병렬 질의에서는 소비자의 요구보다 조금 앞서서 개별적인 스레드들이 입력 순차열에서 요소들을 주동적으로 가져온다(뉴스 진행자나 연사 앞에 원고를 띄워 주는 프롬프터 장치나 CD 재생기의 튐 방지 버퍼링과 비슷하다)
    • 그런 다음 스레드들은 그 요소들을 질의 사슬을 따라 병렬로 처리하고, 그 결과를 작은 버퍼에 담아 두었다가 나중에 소비자가 요구하면 바로 제공한다.
    • 만일 소비자가 열거를 일시 정지하거나 일찍 종료하면 CPU 시간이나 메모리를 낭비하지 않도록 질의 처리기도 정지 또는 중단된다.
  • PLINQ의 버퍼링 방식을 AsParallel 호출 다음에 WithMergeOptions를 호출해서 변경할 수 있다.
    • 보통의 경우 기본 값인 AutoBuffered가 전반적으로 가장 나은 결과를 낸다. NotBuffered를 지정하면 버퍼링이 완전히 비활성화 된다.
    • 이는 결과를 최대한 빠릴 보고 싶을 때 유용하다. FullyBuffered는 결과 집합 전체를 버퍼에 담았다가 소비자에게 제공한다.(OrderBy와 Reverse 연산자는 애초에 이런 방식으로 작동하며, 요소, 집계, 변환 연산자들도 그렇다)

PLINQ와 요소 순서

  • 질의 연산자들을 병렬화할 떄 생기는 한 가지 부작용은 결과들을 취합한 최종 출력 순차열의 요소들이 원래 제출된 것과는 순서가 다를 수 있다는 것이다. 다른 말로 하면 PLINQ 질의는 LINQ 질의의 통상적인 순차열 요소 순서 유지 특징을 보장하지 않는다.
  • 강제로라도 요소들의 순서를 유지하고 싶으면 AsParallel 호출 다음에 AsOrdered를 호출해야 한다.
myCollection.AsParallel().AsOrdered()...
  • 요소들이 아주 많은 경우 AsOrdered를 호출하면 성능에 부담이 된다. PLINQ가 각 요소의 원래 위치를 일일이 기억해야 하기 때문이다.
  • 한 질의에서 AsOrdered의 효과를 나중에 다시 무효화할 수도 있다. AsUnordered를 호출하면 된다. 그러면 질의 안에 하나의 ‘무작위 섞기 지점’이 생기며, 그 지점부터는 질의가 좀 더 효율적으로 실행될 수 있다.
    • 예컨대 다음은 처음 두 질의 연산자까지만 입력 순자쳘 순서를 유지하는 예이다.
inputSequence.AsParallel().AsOrdered()
  .질의_연산자1()
  .질의_연산자2()
  .AsUnordered()  // 여기서부터는 순서 관계가 중요하지 않다.
  .질의_연산자3()
  ...
  • AsOrdered가 기본이 아닌 이유는 대부분의 질의에서 원래의 입력 순서가 중요하지 않기 때문이다. 만일 AsOrdered를 기본으로 했다면, 대부분의 병렬 질의에서 최상의 성능을 얻으려면 AsUnordered를 추가해야 했을 것이다. 이는 프로그래머에게 불필요한 부담일 뿐이다.

PLINQ의 한계

  • 현재 PLINQ가 병렬화할 수 있는 것에는 몇 가지 현실적인 한계가 있다. 이러한 한계들은 이후의 서비스 팩이나 .NET Framework 버전에서 완화될 수 있다.
  • 원본 요소들이 원래의 색인화 위치에 있지 않는 한, 다음 질의 연산자들을 사용하는 질의는 병렬화 되지 않는다.
    • Select, SelectMany, ElementAt의 색인화 버전
  • 대부분의 질의 연산자는 요소들의 색인화 위치를 변경한다(Where처럼 요소들을 제거하는 연산자도 포함해서) 따라서 일반적으로 병렬 질의에 위의 연산자들을 사용하려면 질의의 시작에서 사용해야 한다.
  • 다음 질의 연산자들은 병렬화할 수 있긴 하지만 분할 전략의 비용이 높기 때문에 순차 처리보다 실행이 느릴 수 있다.
    • Join, GroupBy, GroupJoin, Distinct, Union, Intersect, Except
  • 표준 Aggregate 연산자의 종잣값 있는 중복적재 버전들은 병렬화되지 않는다. 이를 위해 PLINQ는 특별한 중복적재들을 제공한다.
  • 그 외의 모든 연산자는 병렬화할 수 있지만, 그런 연산자들만 사용한다고 해서 질의가 반드시 병렬화되는 것은 아니다.
    • PLINQ는 만일 병렬화의 추가부담 때문에 오히려 속도가 느려질 여지가 있으면 질의를 순차적으로 처리한다.
    • 만일 PLINQ의 판단을 무시하고 병렬 처리를 강제하고 싶다면 AsParallel 호출 다음에 다음을 호출하면 된다.
.WithExecutionMode(ParallelExcutionMode.ForceParallelism)

예제: 병렬 영어 단어 철자 검사

  • (PLINQ를 이용한 병렬 처리 예제 생략. AsParallel 이후에는 LINQ와 동일하다)
    • (예제에 사용된 Contains는 어느 정도 시간이 걸리는 연산이기 때문에 이를 병렬화하는 것은 도움이 된다.)

PLINQ의 바람직한 적용대상

  • 어쩌면 독자의 기존 응용 프로그램에서 LINQ 질의들을 모두 찾아서 병렬화해보고 싶을지도 모르겠다. 그러나 대체로 그렇게 해서 이득을 얻을 수 있는 경우는 드물다. 왜냐면 LINQ가 최고의 해답임이 자명한 문제들은 대부분 아주 빠르게 처리되므로 이득이 없기 때문이다.
    • 더 나은 접근방식은 CPU 사용량이 많은 병목을 찾아서 ‘이것을 LINQ 질의로 표현할 수 있을까?’를 고찰하는 것이다.(이러한 코드 수정의 또 다른 장점은 일반적으로 LINQ 질의를 사용하면 코드가 더 짧아지고 가독성도 좋아진다는 것이다)
  • PLINQ는 ‘대놓고 병렬적인 문제’들에 잘 맞는다. 그러나 화상 처리(imaging)에는 잘 맞지 않을 수 있다. 수백 만개의 픽셀을 하나의 출력 순차열로 취합하는 부분이 병목이 되기 때문이다.
    • 화상 처리에는 픽셀들을 배열 또는 비관리 메모리 블록에 직접 기록하고 Parallel 클래스나 작업 병렬성 전략을 이용해서 다중 스레드 처리를 관리하는 것이 더 낫다
    • 한편 ForAll을 이용해서 결과 취합을 아예 생략할 수도 있다. 애초에 LINQ 질의로 만들기에 적합한 형태의 화상 처리 알고리즘이라면 이 방법이 합당할 것이다.

함수적 순수성

  • PLINQ는 질의를 여러 개의 스레드에서 병렬로 처리하므로 질의에 스레드에 안전하지 않은 연산지 끼어들지 않게 하는 것이 중요하다.
    • 특히 다음 예처럼 변수에 값을 기록하는 것은 부수효과(side effect)에 해당하며, 따라서 스레드에 안전하지 않다.
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;
  • i의 증가를 자물쇠로 잠가서 스레드에 안전하게 만들 수도 있지만, 그렇다고 해도 병렬 처리 때문에 i가 해당 입력 요소의 위치가 아닐 수도 있다는 문제는 해결되지 않는다.
    • 이 문제는 AsOrdered를 추가한다고 해도 해결되지 않는다. AsOrdered는 오직 출력의 요소들이 질의를 순차적으로 실행했을 때와 같은 순서가 되게 할 뿐, 실제로 그 요소들을 순차적으로 처리하지는 않기 때문이다.
    • 대신 질의를 다음과 같이 Select의 색인화 버전을 사용하도록 고쳐야 한다.
var query = from n in Enumerable.Range(0,999).AsParallel().Select((n, i) n * i);
  • 병렬 처리를 위해서는 질의 연산자에서 호출하는 모든 메서드가 스레드에 안전해야 하며, 최상의 성능을 위해서는 스레드에 안전한 이유가 ‘필드나 속성을 수정하지 않음’이어야 한다.
    • 즉 메서드에 부수 효과가 없어야 한다. 그런 메서드를 ‘함수적으로 순수한(functionally pure)’ 함수라고 부른다.
    • 잠금 때문에 스레드에 안전해진 메서드를 질의 연산자가 호출하면, 병렬성의 잠재력이 잠금 기간을 그 메서드가 사용하는 전체 시간으로 나눈 비율만큼 제한된다.

병렬도 설정

  • 기본적으로 PLINQ는 현재 쓰이는 프로세서에 맞는 최적의 병렬도(degree of parallelism)를 선택한다. 그러나 AsParallel 호출 이후 WithDegreeOfParallelism을 호출해서 이 병렬도를 직접 지정할 수도 있다.
...AsParallel().WithDegreeOfParallelism(4)...
  • 병렬도를 코어 개수 이상으로 증가하는 것이 바람직한 예로는 입출력 한정(I/O-bound) 작업이 있다.(이를테면 다수의 웹 페이지를 한꺼번에 내려받는 등).
    • 그러나 .NET Framework 4.5부터는 작업 조합기와 비동기 함수가 PLINQ 질의만큼이나 쉬운, 그리고 좀 더 효율적인 해법이다.
    • Task 객체와는 달리 PLINQ로 입출력 한정 작업을 처리하려면 스레드 차단이 반드시 일어난다(게다가 그 스레드들은 스레드 풀에서 가져온 스레드들이라서 문제가 더 심해진다)

병렬도 변경

  • 하나의 PLINQ 질의가 진행되는 동안에는 WithDegreeOfParallelism을 한 번만 호출할 수 있다. 이를 다시 호출하려면 AsParallel을 호출해서 질의 겨로가를 병합하고 다시 분할해야 한다.
"The Quick Brown Fox"
  .AsParallel().WithDegreeOfParallelism(2)
  .Where(c => !char.IsWhiteSpace(c))
  .AsParallel().WithDegreeOfParallelism(3)  // 병합과 분할을 강제한다.
  .Select(c => char.ToUpper(c))

취소

  • foreach 루프 때문에 실행된 PLINQ 질의는 취소하기 쉽다. 그냥 foreach 루프를 벗어나면 열거자가 암묵적으로 처분되면서 질의도 자동으로 취소된다.
  • 변환 연산자나 요소 또는 집계 연산자로 끝나는 질의는 다른 스레드에서 취소 토큰을 이용해서 취소할 수 있다.
    • 토큰을 삽입하려면 AsParallel 호출 후에 CancellationTokenSource 객체의 Token 속성을 인수로 해서 WithCancellation을 호출하면 된다.
    • 그런 다음 다른 스레드에서 토큰 원본에 대해 Cancel을 호출하면, 잘의 결과를 소비하는 쪽에서 OperationCanceledException 예외가 발생한다.
IEnumerable<int> million = Enumberable.Range(3, 1000000);

var cancelSource = new CancellationTokenSource();

var primNumberQuery =
  from n in million.AsParallel().WithCancellation(cancelSource.Token)
  where Enumerable.Range(2, (int)Math.Sqrt(n)).All(i => n % i > 0)
  select n;

new Thread(() => {
  Thread.Sleep(100);  // 100ms 후에
  cancelSource.Cancel();  // 질의를 취소한다.
}).Start();

try
{
  // 질의 실행을 시작한다.
  int[] primes = primNumberQuery.ToArray();
  // 다른 스레드가 이 스레드를 취소하므로 여기에는 절대 도달하지 않는다.
}
catch (OperationCaceledException)
{
  Console.WriteLine("Query canceled");
}
  • PLINQ는 스레드들을 선점적으로 취소하지 않는다. 그러면 프로그램이 불안정해질 위험이 있기 때문이다. 대신 PLINQ는 취소가 요청되면 모든 일꾼 스레드가 현재 요소의 처리를 끝낼 때까지 기다린 후 질의 실행을 끝낸다.
    • 이는 질의 연산자가 호출한 모든 메서드가 중간에 강제로 종료되지 않고 끝까지 실행됨을 의미한다.

PLINQ 최적화

출력 쪽 최적화

  • PLINQ가 편리한 점 하나는 병렬화된 일거리의 처리 결과를 단일한 하나의 출력 순차열로 취합해 준다는 것이다. 그런데 출력 순차열로 하는 일이 다음 예처럼 그냥 각 요소에 대해 어떤 함수를 실행하는 것뿐일 때가 종종 있다.
foreach(int n in parallelQuery)
  DoSomething(n);
  • 이런 경우에는 그리고 요소들의 처리 순서가 중요하지 않다면, PLINQ의 ForAll 메서드를 이용해서 효율성을 높일 수 있다.
  • ForAll 메서드는 ParallelQuery의 모든 출력 요소에 대해 하나의 대리자를 실행한다. PLINQ는 결과 취합 및 열거 과정을 생략하고, 각 스레드에서 대리자를 직접 호출하게 한다.
    • 다음은 ForAll을 사용하는 간단한 예이다.
"abcdef".AsParallel().Select(c => char.ToUpper(c)).ForAll(Console.Write);

  • 결과 취합 및 열거의 비용이 아주 높지는 않기 때문에, ForAll 최적화로 최고의 이득을 보려면 요소들의 수가 많고 한 요소의 처리에 걸리는 시간이 짧아야 한다.

입력 쪽 최적화

  • PLINQ가 입력 요소들을 스레드에 배정하는데 사용하는 분할 전략은 다음 세 가지이다.
전략 요소 할당 상대 성능
덩어리 분할 동적 평균
범위 분할 정적 나쁨 ~ 훌륭함
해시 분할 정적 나쁨

 

  • 요소들을 비교해야 하는 질의 연산자(GroupBy, Join, GroupJoin, Intersect, Except, Union, Distinct)를 사용할 때는 선택의 여지가 없다.
    • 그런 경우 PLINQ는 항상 해시 분할을 사용한다. 해시 분할은 해시 코드가 같은 요소들을 같은 스레드에 배정한다.
    • 모든 요소의 해시 코드를 미리 계산해야 하므로 상대적으로 비효율적이다.
    • 이 때문에 질의가 너무 느리게 실행된다면 대안은 AsSequential을 호출해서 병렬화를 포기하는 것뿐이다.
  • 다른 모든 질의 연산자에 대해서는 범위 분할(range partitioning; 또는 구간 분할) 전략과 덩어리 분할(chunk partitioning) 전략 중 하나가 쓰인다. PLINQ의 기본적인 분할 전략 선택 방법은 다음과 같다.
    • 만일 입력 순차열이 색인화가 가능한 형식이면(즉, 배열이거나 IList<T>를 구현하면, PLINQ는 범위 분할을 선택한다)
    • 그 외의 경우에는 덩어리 분할을 선택한다.
  • 대체로 모든 요소의 처리 속도(CPU 시간 사용량)가 비슷하고 요소들의 개수가 많으면 범위 분할이 더 빠르고, 그 외의 경우에는 덩어리 분할이 더 빠르다.
  • 범위 분할이 선택되게 하는 방법은 다음과 같다.
    • 질의가 Enumerable.Range로 시작한다면 그것을 ParallelEnumerable.Range로 바꾼다.
    • 그렇지 않으면 그냥 입력 순차열에 대해 ToList나 ToArray를 호출한다(물론, 이 호출 자체가 성능에 일정한 부담을 준다는 점도 고려해야 한다)
  • ParallelEnumerable.Range가 단지 Enumerable.Range(…).AsParallel()의 단축 표기는 아님을 주의해야 한다. ParallelEnumerable.Range는 범위 분할을 활성화하며, 그 때문에 질의의 성능이 바뀐다.
  • 덩어리 분할이 선택되게 하려면 다음처럼 입력 순차열을 Partitioner.Create로 감싸야 한다.
int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery
  Partitioner.Create(numbers, true).AsParallel()
  .Where(...)
  • Partitioner.Create의 둘째 인수는 질의 실행시 부하 분산(load-balancing) 기능을 사용할 것인지를 뜻한다. 이 인수에 true를 지정하는 것 역시 덩어리 분할 전략을 사용하겠다고 PLINQ에 요청하는 것에 해당한다.
  • 덩어리 분할 전략에서 각 스레드는 입력 순차열에서 말 그대로 한 ‘덩어리’의 요소들을 가져와서 처리 한다.(아래 그림)

  • PLINQ는 처음에는 아주 작은 덩어리들(덩어리당 요소 한두 개)로 시작해서 질의가 진행됨에 따라 덩어리 크기를 점차 키운다.
    • 이렇게 하면 작은 입력 순차열이 효과적으로 병렬화될 뿐만 아니라 큰 순차열에서 스레드들이 너무 자주 덩어리를 가져오는 일도 방지된다. ‘만만한(즉, 빠르게 처리할 수 있는)’ 요소들을 가져온 일꾼 스레드는 더 많은 덩어리를 처리하게 된다.
    • 이러한 체계에서 모든 스레드는 모두 비슷한 수준으로 바쁘게 돌아간다. 다른 말로 하면 코어들에 ‘부하가 분산된다’
    • 이 전략의 유일한 단점은 공유 입력 순차열에서 요소들의 덩어리를 가져올 때 동기화가 필요하다는 것이다(흔히 독점 자물쇠를 이용한다) 이 동기화 때문에 약간의 추가부담과 경합이 발생한다.
  • 범위 분할 전략은 통상적인 입력 쪽 열거를 건너뛰고, 각 일꾼 스레드에게 동일한 개수의 요소들을 미리 배정한다. 따라서 입력 순차열에 대한 경합은 발생하지 않는다.
    • 그러나 만만한 요소들을 받아서 작업을 일찍 끝낸 스레드들은 다른 스레드들이 일하는 동안 빈둥거리게 된다. 예컨대 앞의 소수 계산기 예제에 범위 분할 적용을 한다면 그런 현상이 심해서 성능이 나쁠 것이다.
    • 범위 분할이 적합한 예로 다음은 1에서 1000만까지의 제곱근들의 합을 구하는 질의이다.
ParallelEnuemrable.Range(1, 10000000).Sum(i => Math.Sqrt(i))
  • ParallelEnumerable.Range는 ParalleQuery<T>를 돌려주므로, AsParallel 호출을 붙이지 않아도 질의가 병렬로 처리된다.
  • 범위 분할이 요소 범위들을 반드시 연속된 블록으로 배정하지는 않는다. 대신 ‘건너뛰기(striping)’ 방식이 쓰일 수도 있다.
    • 예컨대 일꾼 스레드가 둘일 때 한 스레드는 홀수 번째 요소들만, 다른 한 스레드는 짝수 번째 요소들만 받을 수도 있다.
    • TakeWhile 연산자에는 거의 혹실하게 건너뛰기 방식이 쓰인다(순차열 뒤쪽 요소들의 불필요한 처리를 피하려고)

커스텀 집계 연산자의 최적화

  • 프로그래머가 개입하지 않아도 PLINQ는 표준 Sum, Average, Min, Max 연산자를 병렬화한다. 그러나 커스텀 집계 연산자, 즉 Aggregate의 병렬화는 PLINQ에게 어려운 문제이다. 9장에서 설명했듯이 Aggregate는 커스텀 집계 연산을 수행한다.
    • 다음은 Sum 연산자처럼 주어진 수들의 합을 계산한느 커스텀 연산자의 예이다.
int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate(0, (total, n) => total + n);  // 6
  • 또한 9장에서 보았듯이 종잣값 없는(unseeded) 집계 연산에는 반드시 교환법칙과 결합법칙을 만족하는 대리자를 지정해야 한다.
    • 만일 이 규칙을 어기면 PLINQ는 부정확한 결과를 낸다. 이는 PLINQ가 입력 순차열의 여러 분할을 동시에 집계하기 위해 입력 순차열로부터 여러 개의 종잣값을 가져오기 때문이다.
  • 종잣값을 명시적으로 지정한 집계 연산에는 PLINQ를 안심하고 적용할 수 있을 것 같지만, 안타깝게도 보통의 경우 그런 연산은 하나의 종잣값에 의존하기 때문에 순차적으로 실행된다.
    • 다수의 종잣값을 적용할 수 있도록, PLINQ는 다수의 종갓값 또는 종잣값 팩토리 함수를 받는 또 다른 Aggregate 중복적재 버전을 제공한다.
    • 이 버전을 사용하는 경우 각 스레드는 그 팩토리 함수를 호출해서 종잣값을 생성하며, 그것을 스레드 지역 누산기(thread-local accumulator)로 사용해서 요소들을 지역적으로 집계한다.
  • 또한 커스텀 집계 연산자를 병렬화하려면 지역 누산기와 주 누산기의 결합 방식을 알려주는 함수도 이 Aggregate 중복적재 버전에 지정해야 한다.
    • 마지막으로 이 Aggregate는 집계 결과에 대해 임의의 최종적인 변환을 수행하는 대리자도 받는다.
  • 정리하자면 Aggregate 호출 시 지정해야 하는 대리자는 다음 네 가지이다.
    • seedFactory
      • 새 지역 누산기를 돌려준다.
    • updateAccumulatorFunc
      • 하나의 요소를 지역 누산기에 쌓는다.
    • combineAccumulatorFunc
      • 지역 누산기를 주 누산기에 합친다.
    • resultSelector
      • 집계 결과에 대해 임의의 최종 변환을 적용한다.
  • 간단한 시나리오에서는 종잣값 팩토리 대신 하나의 종잣값을 지정할 수도 있다. 그러나 이 접근방식은 만일 그 종잣값이 변이하고자 하는 참조 형식이면 원하는 결과를 얻지 못한다.
    • 그런 경우 같은 종잣값 인스턴스를 모든 스레드가 공유하기 때문이다.
  • 아주 간단한 예로 다음은 numbers 배열에 있는 값들의 합을 계산한다는 질의이다.
numbers.AsParallel().Aggregate(
  () => 0,  // seedFactory
    (localTotal, n) => localTotal + n,  // updateAccumulatorFunc
    (mainTot, localTot) => mainTot + localTot,  // combineAccumulatorFunc
    finalResult => finalResult)  // resultSelector
  • 더 간단한 접근방식으로도 같은 결과를 얻을 수 있다는 점에서(종잣값 없는 집계를 사용해도 되고, 아니면 그냥 Sum을 사용하는 것이 더 낫다) 이 예제는 다소 작위적이다.
    • 좀 더 현실적인 예로 주어진 문자열에서 영어 알파벳의 각 문자가 나오는 빈도를 계산한다고 하자. 다음은 간단한 순차적 해법이다.
string text = "Let's suppose this is a really long string";
var letterFrequencies = new int[26];

foreach (char c in text)
{
  int index = char.ToUpper(c) - 'A';
  if (index >= 0 && index <= 26) letterFrequencies[index]++;
}
  • 이 순차 질의를 병렬화하는 한 방법은 foreach 문을 잠시 후에 설명할 Parallel.ForEach 호출로 바꾸는 것이다.
    • 그러나 그렇게 하면 공유 배열에 대한 동시성 문제를 처리해야 한다. 그리고 그 배열 주변을 자물쇠로 잠그면 병렬화의 잠재력이 훼손될 뿐이다.
  • 이 문제에 대해 Aggregate는 깔끔한 해결책을 제공한다. 이 해법에서도 누산기는 이전 예제의 letterFrequencies처럼 하나의 배열이다.
    • 다음은 Aggregate를 이용하는 순차적 버전이다.
int[] result = 
  text.Aggregate(
    new int[26],  // '누산기'를 생성한다.
    (letterFrequencies, c) =>  // 글자 하나를 누산기에 쌓는다.
    {
      int index = char.ToUpper(c) - 'A';
      if (index >= 0 && index <= 26) letterFrequencies[index]++;
      return letterFrequencies;
    });
  • 그리고 다음은 PLINQ의 특별한 Aggregate 중복적재 버전을 이용한 병렬 버전이다.
int[] result = 
  text.AsParallel().Aggregate(
    () => new int[26],  // 새 지역 누산기를 생성한다.
    (letterFrequencies, c) =>  // 지역 누산기에 쌓는다.
    {
      int index = char.ToUpper(c) - 'A';
      if (index >= 0 && index <= 26) letterFrequencies[index]++;
      return letterFrequencies;
    },
        // 지역 누산기를 주 누산기에 합친다.
    (mainFreq, localFreq) =>
      mainFreq.Zip(localFreq, (f1, f2) => f1 + f2).ToArray(),
    finalResult => finalResult  // 집계 겨로가에 대해 임의의 최종 변환을 수행한다.
);
  • 지역 누산 함수가 localFrequencies를 변이한다(mutate)는 점에 주목하기 바란다. 이러한 최적화를 수행하는 이 능력은 중요하다. 그리고 이런 최적화가 가능한 것은 localFrequencies가 각 스레드의 지역 배열이기 때문이다.

Parallel 클래스

  • PFX는 Parallel 클래스의 다음 세 정적 메서드를 통해서 기본적인 형태의 구조적 병렬성을 제공한다.
    • Parallel.Invoke
      • 대리자들의 배열을 병렬로 실행한다.
    • Parallel.For
      • C# for 루프의 기능을 병렬로 수행한다.
    • Parallel.ForEach
      • C# foreach 루프의 기능을 병렬로 수행한다.
  • 세 메서드 모두 모든 작업이 끝날 때까지 차단된다. PLINQ에서처럼 처리 되지 않은 예외가 발생하면 나머지 일꾼 스레드들은 현재 요소를 처리한 후에 중지되며, 발생한 예외(또는 예외들)를 감싼 AggregateException 예외가 호출자에게 던져진다.

Parallel.Invoke 메서드

  • Parallel.Invoke는 배열에 담긴 Action 대리자들을 병렬로 수행하고, 모든 대리자가 완료되길 기다린다. 이 메서드의 가장 간단한 버전은 다음과 같이 정의되어 있다.
public static void Invoke(params Action[] actions);
  • PLINQ처럼 Parallel의 메서드들은 입출력 한정 작업이 아니라 계산량 한정 작업에 최적화되어 있다.
    • 그렇긴 하지만, Parallel.Invoke의 사용법을 간단히 파악하기에는 다음처럼 두 웹 페이지를 한꺼번에 내려 받는 예제게 적합하다.
Parallel.Invoke(
  () => new WebClient().DownloadFile("http://www.linqpad.net", "lp.html"),
  () => new WebClient().DownloadFile("http://www.jaoo.dk", "jaoo.html"));
  • 겉으로 보기에는 이것이 그냥 두 개의 스레드 한정 Task 객체를 생성해서 완료를 기다리는 작업을 짧게 표기하는 편의 수단 정도인 것 같지만, 사실은 중요한 차이점이 있다.
    • 바로 대리자 백만 개의 배열을 지정해도 Parallel.Invoke는 여전히 효울적으로 작동한다는 것이다.
    • 이는 Parallel.Invoke가 대리자마다 개별적인 Task 객체를 생성하는 것이 아니라, 대리자들을 적당한 개수의 단위로 분할해서 적당한 개수의 바탕 Task 객체에 배정하기 때문이다.
  • Parallel의 다른 모든 메서드처럼, 결과들을 취합하는 것은 프로그래머인 독자의 몫이다. 따라서 독자는 대리자들의 스레드 안전성을 반드시 염두에 두어야 한다.
    • 예컨대 다음은 스레드에 안전하지 못하다.
var data = new List<string>();
Parallel.Invoke(
  () => data.Add(new WebClient().DownloadString("http://www.foo.com")),
  () => data.Add(new WebClient().DownloadString("http://www.far.com")));
  • 목록 추가 부분을 잠그면 스레드 안전성이 생기지만, 만일 빨리 실행되는 대리자들을 아주 많이 지정한다면 그 부분이 병목이 될 것이다.
    • 더 나은 해법은 이번 장에서 자중에 설명하는 스레드 안전 컬렉션 중 하나를 사용하는 것이다. 지금 예라면 ConcurrentBag이 이상적이다.
  • Parallel.Invoke에는 ParallelOptions 객체를 받는 중복적재 버전도 있다.
public static void Invoke(ParallelOptions options, params Action[] actions);
  • ParallelOptions 객체를 이용해서 취소 토큰을 지정할 수 있으며, 최대 동시성을 제한하거나 커스텀 작업 스케쥴러를 지정할 수도 있다. 취소 토큰은 코어 수보다 많은(어림잡아) 작업을 실행할 때 중요하다.
    • 취소 시 아직 시작되지 않은 대리자들은 모두 폐기되지만, 이미 실행 중인 대리자는 끝까지 실행된다.

Parallel.For와 Parallel.ForEach

  • Parallel.For와 Parallel.ForEach는 C#의 for 루프와 foreach 루프와 비슷하되, 반복들을 차례로 실행하는 것이 아니라 병렬로 실행한다. 다음은 이들의 (가장 간단한) 서명이다.
public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body)
public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
  • 예컨대 다음과 같은 순차적 for 루프를
for (int i = 0; i < 100; i++)
  Foo(i);
  • 다음과 같이 병렬화 할 수 있다.
Parallel.For(0, 100, i => Foo(i));
  • 또는 다음과 같이 좀 더 간결하게 표기할 수도 있다.
Parallel.For(0, 100, Foo);
  • 마찬가지로 다음과 같은 순차적 foreach 루프를
foreach (char c in "Hello, world")
  Foo(c);
  • 다음과 같이 병렬화 할 수 있다.
Parallel.ForEach("Hello, world", Foo);
  • 좀 더 실용적인 예로, 다음은 System.Security.Cryptography 이름공간에 있는 클래스를 이용해서 여섯 개의 공개키/비밀키 쌍 문자열을 병렬로 생성하는 예이다.
var keyPairs = new string[6];
Parallel.For(0, keyPairs.Length, i => keyPairs[i] = RSA.Create().ToXmlString(true));
  • Parallel.Invoke처럼, Parallel.For와 Parallel.ForEach는 많은 수의 항목을 지정해도 효율적으로 실행된다(다수의 요소를 비교적 적은 수의 작업으로 나누어 처리함으로써)

바깥쪽 루프 대 안쪽 루프

  • Parallel.For와 Parallel.ForEach는 안쪽 루프보다는 바깥쪽 루프에 적용하는 것이 바람직하다. 안쪽 루프에 적용하면 병렬화할 작업 항목들의 ‘덩어리’가 더 커지기 때문에 관리상의 추가부담이 증가해서 병렬화의 이득이 상쇄되기 때문이다. 그리고 안쪽과 바깥쪽 루프를 모두 병렬화해야 하는 경우는 별로 없다.
    • 예컨대 다음의 이중 루프에서 안쪽 루프의 병렬화가 이득이 되려면 코어가 100개 이상이어야 한다.
Parallel.For(0, 100, i =>
{
  Parallel.For(0, 50, j => Foo(i, j));  // 안쪽 루프에는 순차 루프가 더 나을 것이다.
});

색인화된 Parallel.ForEach

  • 각 반복에서 루프 반복 색인을 사용해야 하는 경우도 종종 있다. 순차 foreach에서는 간단하다.
int i = 0;
foreach(char c in "Hello, world")
  Console.WriteLine(c.ToString() + i++);
  • 그러나 병렬 루프에서 이는 공유 변수를 갱신하는 것이므로 스레드에 안전하지 않다. 대신 ForEach의 다음과 같은 버전을 사용해야 한다.
public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)
  • ParallelLoopState는 잠시 후에 다루기로 하고 일단은 Action의 셋째 형식 매개변수에 집중하자. 이 형식 매개변수는 루프 색인의 형식(지금 예에서는 long)에 해당한다. 이제 앞의 루프를 다음과 같이 병렬화 할 수 있다.
Parallel.ForEach("Hello, world", (c, state, i) =>
{
  Console.WriteLine(c.ToString() + i++);
});
  • (다른 예제 생략)

ParallelLoopState를 이용한 이른 루프 종료

  • 병렬 For나 ForEach의 루프 본문은 하나의 대리자이므로 break 문을 이용해서 루프를 일찍 벗어날 수 없다. 대신 ParallelLoopState 객체에 대해 Break나 Stop을 호출해야 한다.
  • ParallelLoopState 객체를 얻는 것은 쉽다. For와 Foreach의 모든 버전은 Action<TSoure, ParallelLoopState> 형식의 루프 본문(대리자)을 받도록 중복적재 되어 있다.
    • 예컨대 다음과 같은 루프를
foreach(char c in "Hello, world")
{
  if (c == ",")
    break;
  else
    Console.WriteLine(c);
}
  • 다음과 같이 병렬화 할 수 있다.
Parallel.ForEach("Hello, world", (c, loopState) =>
{
  if (c == ',')
    loopState.Break();
  else
    Console.Write(c);
});

// 출력
// Hlloe
  • 출력에서 보듯이, 루프 본문들이 무작위 순으로 완료될 수도 있다. 순서가 다를 수는 있지만, 적어도 Break 호출 시 산출되는 요소들이 루프를 순차적으로 실행 했을 때 나오는 요소들과 동일하다는 점은 보장된다. 지금 예제는 항상 H, e, l, l, o를 임의의 순서로 출력한다.
    • 그러나 Break 대신 Stop을 호출하면 모든 스레드는 현재 반복을 마친 즉시 종료된다.
    • 지금 예제의 경우, 일부 스레드의 실행이 조금 늦는 상태에서 Stop이 호출되면 H, e, l, l, o 중 일부 글자는 출력되지 않는다.
    • Stop은 컬렉션에서 원하는 뭔가를 찾았으므로 더 이상의 반복이 무의미할 때 또는 뭔가 잘못되어서 결과를 얻을 필요가 없어졌을 때 유용하다.
  • Parallel.For 메서드와 Parallel.ForEach 메서드는 ParallelLoopResult 객체를 돌려주는데, 이 객체에는 IsCompleted와 LowestBreakIteration이라는 속성이 있다.
    • 이들은 루프가 끝까지 돌았는지, 아니라면 몇 번째 반복에서 루프가 종료되었는지를 알려준다.
    • 만일 LowestBreakIteration이 null이면 Break가 아니라 Stop에 의해 루프가 끝났다는 뜻이다.
  • 루프 본문이 길다면 다른 스레드가 Break나 Stop으로 일찍 루프를 끝냈을 때 현재 스레드의 루프 본문 역시 일찍 루프를 종료하는 것이 바람직할 것이다.
    • 그런 경우 코드의 여러 지점에서 ShouldExitCurrentIteration 속성을 점검하면 된다.
    • 이 속성은 Stop 호출 직후에, 또는 Break 호출 얼마 후에 true가 된다.
    • ShouldExitCurrentIteration은 취소 요청이 처리되었거나 루프 본문에서 예외가 발생해도 true가 된다.
  • 다른 스레드에서 예외가 발생했는지는 IsExceptional 속성으로 알아낼 수 있다.
    • 처리되지 않은 예외가 발생하면 각 스레드의 현재 반복에서 루프가 중지된다. 이를 피하려면 예외를 반드시 루프 본문 안에서 명시적으로 처리해야 한다.

지역 값을 이용한 최적화

  • Parallel.For와 Parallel.ForEach는 TLocal이라는 형식 매개변수가 있는 일단의 제네릭 중복적재 버전들을 제공한다. 이 중복적재들은 반복 횟수가 많은 루프에서 자료의 취합을 최적화하는 용도로 만들어진 것이다.
  • 그러나 실무에서 이런 형태의 중복적재 버전들이 필요한 경우는 드물다. 이들로 해결할 수 있는 문제들을 대부준 PLINQ로도 해결할 수 있기 때문이다. (이들의 서명과 사용법이 좀 복잡하다는 점에서 이는 다행이다)
  • 이들으로 해결할 수 있는 문제를 잘 보여주는 예로 1에서 1000만까지의 제곱근들의 합을 계산하는 문제를 생각해 보자.
    • 제곱근 1000만개를 계산하는 것 자체는 쉽게 병렬화 할 수 있지만, 그 제곱근들을 모두 합하는 것의 병렬화는 간단하지 않다. 총합을 갱신하는 부분을 자물쇠로 보호해야 하기 때문이다.
object locker = new object();
double total = 0;
Parallel.For(1, 10000000, i => { lock(locker) total += Math.Sqrt(i); });
  • 이 경우 자물쇠를 1,000만개 만드는데 드는 비용과 그로 인한 스레드 차단 떄문에 병렬화의 이득이 사라진다.
  • 그런데 사실 자물쇠를 실제로 1000만개나 만들 필요는 없다. 비유하자면, 일단의 자원봉사자들이 행사장에 남겨진 쓰레기를 치운다고 하자.
    • 만일 쓰레기통이 하나라면 자원봉사자들은 쓰레기통까지 먼 거리를 왕복해야 하며, 다른 자원봉사잗르과 쓰레기통을 두고 경합을 펼쳐야 하므로 청소 과정이 극도로 비효율적일 것이다.
    • 자명한 해결책은 각 자원봉사자가 개인용 쓰레기통, 즉 ‘지역’ 쓰레기통을 사용하고, 가끔 자신의 쓰레기통을 중앙 쓰레기통에 비우게 하는 것이다.
  • TLocal 형식 매개변수가 있는 For와 Foreach 딱 그런 식으로 작동한다.
    • 이 경우 자원봉사자들은 내부 일꾼 스레드들이고, 지역 쓰레기통은 대리자에게 전달되는 TLocal 객체이다. 이를 지역 값(local value)이라고 부른다.
    • 지역 값을 사용하는 For와 ForEach를 호출할 때는 루프 본문 대리자와 함께 다음과 같은 대리자들도 지정해야 한다.
      1. 새 지역 값을 초기화하는 대리자
      2. 지금까지 집계된 지역 값을 주 집계 값(주 쓰레기통)에 결합하는 대리자
  • 그 외에 루프 본문 대리자는 void 가 아니라 집계된 지역 값을 돌려주어야 한다.
    • 다음은 앞의 제곱근 계산 예제를 개선한 코드이다.
object locker = new object();
double grandTotal = 0;
Parallel.For(1, 10000000, 
  () => 0.0,  // 지역 값을 초기화 한다.
  (i, state, localTotal) =>  // 루프 본문 대리자. 갱신된 지역 값을 
    localTotal + Math.Sqrt(i),  // 돌려준다는 점을 주목 할 것.
    localTotal =>  // 지역값을
     { lock (locker) grandTotal += localTotal; }  // 주 집계 값에 더한다.
);
  • 자물쇠가 아예 없어지지는 않았지만, 지역 값을 총합에 추가하는 부분에만 잠금이 적용된다는 점이 중요하다. 이 덕분에 전체 처리 과정의 효율성이 극적으로 증가한다.
  • 앞에서 언급했듯이, 이런 용도에는 PLINQ가 적합할 때가 많다. 지금 예제를 PLINQ로 병렬화한다면 다음과 같은 모습이 될 것이다.
ParallelEnumerable.Range(1, 10000000).Sum(i => Math.Sqrt(i))
  • 범위 분할 전략이 선택되게 하려고 ParallelEnumerable을 사용했음을 주목할 것. 지금 예제에서는 모든 수의 처리 속도가 같으므로 범위 전략을 사용하면 성능이 향상된다.
    • 좀 더 복잡한 시나리오라면 Sum 대신 LINQ의 Aggregate 연산자를 사용할 수도 있다.
    • Parallel.For에 지역 값 대리자를 제공하는 것은 Aggregate 연산자에 지역 종잣값 팩토리를 지정하는 것과 다소 비슷하다.

작업 병렬성

  • 작업 병렬성(task parallelism) 전략은 PFX를 이용한 병렬화 접근방식 중 가장 낮은 수준에 해당한다. 다음은 이 수준의 병렬 프로그래밍에 쓰이는 클래스들로 모두 System.Threading.Tasks 이름공간에 있다.
클래스 용도
Task 작업(일거리의 최소 단위)의 관리
Task<TResult> 반환값이 있는 일거리 단위의 관리
TaskFactory 작업 객체 생성
TaskFactory<TResult> 반환 형식이 같은 작업 및 연속 작업들의 생성
TaskScheduler 작업 일정 수립 관리
TaskCompletionSource 한 작업의 흐름을 직접 제어

 

  • TPL(Task Parallel Library)을 이용하면 최소의 추가부담으로 수백 개의(심지어는 수천 개의) 작업을 생성할 수 있다. 그러나 수백만 개의 작업을 생성해야 한다면, 효율성을 위해서는 그 작업들을 더 큰 일거리 단위들로 나눌 필요가 있다. Parallel 클래스와 PLINQ는 그러한 분할을 자동으로 처리해준다.
  • Visual Studio는 작업 감시를 위한 창을 제공한다(디버그 -> 창 -> 병렬 작업). 이 창은 스레드 창과 비슷하되, 작업 관련 객체들을 보여준다는 점이 다르다. 그리고 ‘병렬 스택’ 창에도 작업 객체들을 볼 수 있는 특별한 모드가 있다.

작업의 생성과 실행

  • Task.Run 메서드는 Task나 Task<TResult> 객체를 생성해서 실행한다. 사실 이 메서드는 Task.Factory.StartNew 호출의 단축 표기에 해당한다.
    • Task.Factory.StartNew의 추가적인 중복적재 버전들을 이용하면 작업 객체를 좀 더 유연한 방식으로 생성할 수 있다.

상태 객체 지정

  • Task.Factory.StartNew를 이용하면 대상 메서드에 전달할 상태 객체를 지정할 수 있다. 상태 객체를 지정하는 경우, 대상 메서드로는 object 형식 매개변수 하나를 받는 메서드를 지정해야 한다.
static void Main()
{
  var task = Task.Factory.StartNew(Greet, "Hello");
  task.Wait();  // 작업이 완료되길 기다린다.
}

static void Greet (object state) { Console.Write(state); }  // Hello
  • 만일 상태 객체를 사용하지 않았다면, StartNew 호출시 Greet 자체가 아니라 Hello로 Greet를 호출하는 람다식을 지정해야 했을 것이다. 이처럼 상태 객체를 이용하면 람다식 생성 비용을 절약할 수 있다.
    • 그러나 이는 미시적인 최적화이며, 실무에서 이런 최적화가 꼭 필요한 경우는 드물다.
    • 상태 객체의 좀 더 나은 용도는 작업에 의미 있는 이름을 부여하는 것이다. 그 이름은 이후 AsyncState 속성으로 조회할 수 있다.
static void Main()
{
  var task = Task.Factory.StartNew(state => Greet("Hello"), "Greeting");
  Console.WriteLine(task.AsyncState);  // Greeting
  task.Wait(); 
}

static void Greet (string message) { Console.Write(message); }
  • Visual Studio는 각 작업 객체의 AsyncState를 병렬 작업 창에 표시하므로 이 예제처럼 작업 객체에 의미 있는 이름을 부여하면 디버깅이 훨씬 쉬워진다.

TaskCreationOptions 열거형

  • StartNew를 호출할 때(또는 Task 객체를 인스턴스화 할 때) TaskCreationOptions 열거형의 한 값을 지정해서 작업 객체의 실행 방식을 조율할 수 있다. TaskCreationOptions는 다음과 같은 값들(조합 가능)로 이루어진 플래그 열거형이다.
LongRunning, PreferFairness, AttachedToParent
  • StartNew 호출시 LongRunning을 지정하면 스케줄러는 작업 객체에 전담 스레드를 배정한다. 그리고 14장에서 설명했듯이, 이는 입출력 한정 작업이나 오래 실행되는 작업에 알맞는 방식이다(만일 그런 작업에 짧게 전담 스레드를 배정하지 않는다면, 짧게 실행되는 작업들에는 비합리적일 정도로 긴 시간이 지난 후에야 실행 일정이 부여될 위험이 있다)
  • PreferFairness를 지정하면 스케줄러는 그런 작업들의 실행 일정을 그 생성 순서에 따라 정하려 한다. 보통은 그렇게 하지 않을 가능성이 크다. 왜냐면 스케줄러는 내부적인 일거리 훔치기 대기열(work-stealing queue)을 이용해서 작업들의 일정을 최적화하기 때문이다.
    • 일거리 훔치기 대기열은 보통의 단일 일거리 대기열에서 발생할 수 있는 경합 부담 없이도 자식 작업들을 생성할 수 있도록 하는 일종의 최적화이다.
    • 자식 작업 객체를 직접 생성하고 싶으면 StartNew 호출 시 AttachedToParent를 지정하면 된다.

자식 작업

  • 한 작업에서 다른 작업을 실행할 때, 필요하다면 둘 사이에 부모-자식 관계를 만들 수 있다.
Task parent = Task.Factory.StartNew(() =>
{
  Console.WriteLine("부모 작업");

  Task.Factory.StartNew(() =>
  {
    Console.WriteLine("개별 작업");
  });

  Task.Factory.StartNew(() =>
  {
    Console.WriteLine("자식 작업");
  }, TaskCreatoinOptions.AttachedToParent);
});
  • 자식 작업의 특징은 자식 작업에서 부모 작업의 완료를 기다리면 그 부모 작업의 다른 모든 자식 작업의 완료도 기다리게 된다는 것이다. 또한 대기가 끝났을 때 자식 작업들에서 발생한 모든 예외가 대기 지점까지 올라온다.
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew(() =>
{
  Task.Factory.StartNew(() =>  // 자식
  {
    Task.Factory.StartNew(() => { throw null; }, atp);  // 손자
  }, atp);
});

// 다음 호출은 NullReferenceExceptoin(실제로는 중첩된 AggregatteException들로 감싸인)을 던진다.
parent.Wait();
  • 이러한 기능은 자식 작업이 하나의 연속 작업일 때 특히나 유용하다.

여러 작업 기다리기

  • 한 작업의 완료를 기다리려면 해당 객체에 대해 Wait 메서드를 호출하면 된다. Task<TResult> 객체의 경우에는 Result 속성에 접근해도 완료 대기가 진행된다. 그리고 여러 개의 작업을 동시에 기다리는 것도 가능하다.
    • 정적 메서드 Task.WaitAll이나 Task.WaitAny를 호출하면 된다.
    • WaitAll은 각 작업을 차례로 기다리는 것과 비슷하나, 많아야 한 번의 문맥 전환만 필요하므로 그보다 효율적이다. 또한 하나 이상의 작업들이 미처리 예외를 던져도 WaitAll은 모든 작업의 완료를 기다린다.
    • 대신 대기가 풀리면 실패한 작업들이 던진 예외들을 모두 모은 AggregateException 예외를 다시 던진다(이것이 AggregateException이 진정으로 유용한 상황이다)
    • WaitAll이 하는 일을 풀어서 표현하면 다음과 같다.
// t1, t2, t3이 작업 객체라고 할 때
var exceptions = new List<Exception>();
try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add(ex); }
try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add(ex); }
try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add(ex); }
if (exceptions.Count > 0) throw new AggregateException (exceptions);
  • WaitAny를 호출하는 것은 실행이 완료된 임의의 작업 객체로부터 신호를 받는 ManualResetEventSlim 객체를 기다리는 것에 해당한다.
  • Wait* 메서드들에는 만료 시간과 취소 토큰을 받는 중복적재 버전들도 있다. 이때 취소 토큰은 작업 객체가 아니라 대기 연산을 취소하기 위한 것이다.

작업 취소

  • 작업을 시작할 때 취소 토큰을 지정할 수도 있다. 그런 경우 만일 그 토큰을 통해서 작업이 취소되면 작업 객체 자체는 ‘취소됨(Canceled)’ 상태로 진입한다.
var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
cts.CancelAfter(500);

Task task = Task.Factory.StartNew(() =>
{
  Thread.Sleep(1000);
  token.ThrowIfCancellationRequested();  // 취소 요청을 결정한다.
}, token);

try { task.Wait(); }
catch (AggregateException ex)
{
  Console.WriteLine(ex.InnerException is TaskCanceledException);  // True
  Console.WriteLine(task.IsCanceled);  // True
  Console.WriteLine(task.Status);  // Canceled
}
  • TaskCanceledException은 OperationCanceledException의 파생 클래스이다.
    • OperationCanceledException을 명시적으로 던지려면 반드시 OpertaionCanceledException의 생성자에 취소 토큰을 전달해야 한다.
    • 그렇게 하지 않으면 작업 객체는 TaskStatus.Canceled 상태가 되지 않으며, 따라서 OnlyOnCanceled 연속 작업이 실행되지 않는다.
  • 시작하기도 전에 취소된 작업에는 실행 일정이 배정되지 않으며, 그 작업 객체에 대해 즉시 OperationCanceledException 예외가 던져진다.
  • 취소 토큰은 다른 API들도 인식하므로 취소 토큰을 다른 코드 요소에 넘겨주어도 취소 처리가 매끄럽게 전파된다.
var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;

Task task = Task.Factory.StartNew(() =>
{
  // 취소 토큰을 PLINQ 질의에 전달한다.
  var query = someSequence.AsParallel().WithCancellation(token)...
  // ... 여기서 질의를 열거한다...
});
  • 이 예제에서 cancelSource에 대해 Cancel을 호출하면 PLINQ 질의가 취소되며, 그러면 작업 본문에서 OperationCanceledException 예외가 발생한다. 이에 의해 작업 자체가 취소된다.
  • Wait나 CancelAndWait 같은 메서드에 취소 토큰을 전달하면 작업 자체가 아니라 대기 연산이 취소된다.

연속 작업

  • ContinueWith 메서드는 주어진 대리자를 해당 작업이 끝난 직후에 실행한다. 다음 예를 보자.
Task task1 = Task.Factory.StartNew(() => Console.Write("antecedant.."));
Task task2 = task1.ContinueWith(ant => Console.Write("...Continuation"));
  • 이 예에서 task1이 완료되거나, 실패하거나, 취소되자마자 task2가 실행된다. 이때 task1이 선행 작업(antecedent task)이고, task2가 연속 작업(continuatoin task)이다. (만일 둘째 줄이 실행되기 전에 task1이 완료되었다면, task2는 즉시 실행이 되도록 일정이 잡힌다.)
    • 연속 작업의 람다식에 전달된 ant 인수는 선행 작업에 대한 참조이다. ContinueWith 메서드 자체는 하나의 작업 객체를 돌려준다. 이 덕분에 연속 작업들을 메서드 호출 연쇄 방식으로 손쉽게 추가할 수 있다.
  • 기본적으로 선행 작업과 연속 작업이 서로 다른 스레드에서 실행될 수 있다. 만일 두 작업이 같은 스레드에서 실행되게 하고 싶으면 ContinueWith 호출 시 TaskContinuationOptions.ExecuteSynchronously를 지정하면 된다. 그렇게 하면 간접이 줄어드므로 아주 세밀한 연속 작업들의 경우에는 성능이 향상될 수 있다.

연속 작업과 Task<TResult>

  • 보통의 작업 객체처럼, 연속 작업 객체 역시 Task<TResult> 형식일 수 있다. 다른 말로 하면, 연속 작업이 어떤 값을 돌려주게 하는 것도 가능하다.
    • 다음은 여러 작업을 메서드 호출 연쇄로 엮어서 Math.Sqrt(8*2)를 계산하고 그 결과를 출력하는 예이다.
Task.Factory.StartNew<int>(() => 8)
  .ContinueWith(ant => ant.Result * 2)
  .ContinueWith(ant => Math.Sqrt(ant.Result))
  .ContinueWith(ant => Console.WriteLine(ant.Result));  // 4

연속 작업과 예외

  • 선행 작업의 실패 여부를 연속 작업에서 알아내야 한다면, 선행 작업 객체의 Exception 속성을 조회해도 되지만, 그냥 Result나 Wait를 호출해서 AggregateException 예외를 잡는 것이 더 간단할 것이다.
    • 선행 작업과 연속 작업 둘 다 실패하면 해당 예외는 관찰되지 않은 것으로 간주되며, 이후 쓰레기 수거기가 작업 객체를 수거할 때 TaskScheduler.UnobservedTaskException 이벤트가 발동된다.
  • 안전성을 위해 미관찰/미처리 예외를 피하는데 유용한 패턴은 연속 작업에서 선행 작업의 예외들을 다시 던지는 것이다. 누군가가 연속 작업에 대해 Wait를 호출하는 한, 그 예외는 Wait 호출자에게 다시 던져진다.
Task continuation = Task.Factory.StartNew(() => { throw null; }).ContinueWith(ant =>
{
  ant.Wait();
  // 처리를 계속한다...
});

continuation.Wait();  // 호출자에게 예외가 던져진다.
  • 예외를 다루는 또 다른 방법은 예외적인 결과와 정상적인 결과에 대해 서로 다른 연속 작업을 지정하는 것이다. 이때 필요한 것이 TaskContinuationOptions 열거형이다.
Task task1 = Task.Factory.StartNew(() => { throw null; });
Task error = task1.ContinueWith(ant => Console.Write(ant.Exception), TaskContinuationOptions.OnlyOnFaulted);
Task ok = task1.ContinueWith(ant => Console.Write("Success!"), TaskContinuationOptions.NotOnFaulted);
  • 이 패턴은 자식 작업들에 사용할 때 특히나 유용하다.
  • 다음 확장 메서드는 작업의 미처리 예외를 ‘삼켜버린다.’
public static void IgnoreExceptions(this Task task)
{
  task.ContinueWith(t => { var ignore = t.Exception; }, TaskContinuationOptions.OnlyOnFaulted);
}
  • 실제 응용에서는 예외의 내용을 로그에 기록하는 것이 바람직할 것이다. 다음은 이 메서드를 사용하는 예이다.
Task.Factory.StartNew(() => { throw null; }).IgnoreExceptions();

연속 작업과 자식 작업

  • 연속 작업의 아주 유용한 특징 하나는 모든 자식 작업이 완료되었을 때만 연속 작업이 실행된다는 점이다. (아래 그림 참조). 모든 자식 작업이 완료되면 자식 작업들이 던진 모든 예외가 연속 작업에게 인도된다.

  • 다음 예제는 자식 작업 세 개를 시작한다. 세 자식 작업 모두 NullReferenceException을 던지는데, 부모 작업을 선행 작업으로 하는 연속 작업에서 그 예외들을 한꺼번에 잡는다.
TaskCreationOptions atp = taskCreationOptions.AttachedToParent;
Task.Factory.StartNew(() =>
{
  Task.Factory.StartNew(() => { throw null; }, atp);
  Task.Factory.StartNew(() => { throw null; }, atp);
  Task.Factory.StartNew(() => { throw null; }, atp);
}).ContinueWith(p => Console.WriteLine(p.Exception), TaskContinuationOptions.OnlyOnFaulted);

조건부 연속 작업

  • 기본적으로 연속 작업은 선행 작업이 완료되든, 예외를 던지든, 취소되든 상관없이 무조건 실행되도록 일정이 잡힌다.
    • 그런 방식이 싫다면 TaskContinuationOptions 열거형의 플래그를 적절히 지정하면 된다.(필요하다면 여러 플래그를 조합해서)
    • 다음은 조건부 연속 작업 제어에 쓰이는 주요 플래그들이다.
NotOnRanToCompletoin = 0x1000,
NotOnFaulted = 0x2000,
NotOnCanceled = 0x4000,
  • 이 플래그들은 감산적이다. 즉, 더 많이 지정할수록 연속 작업이 실행될 여지가 줄어든다. 편의를 위해, TaskContinuationOptions는 다음과 같이 미리 조합된 값들도 제공한다.
OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled,
OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled,
OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted
  • 모든 Not* 플래그를 모두 결합하는 것은 무의미하다. 그러면 연속 작업이 항상 취소되기 때문이다.
  • 플래그들에서 “RanToCompletion”은 선행 작업이 무사히, 즉 취소나 미처리 예외 없이 완료되었음을 뜻한다.
  • “Faulted”는 선행 작업에서 미처리 예외가 발생했음을 뜻한다.
  • “Canceled”는 다음 둘 중 하나를 뜻한다.
    • 선행 작업이 취소 토큰을 통해서 취소되었다. 다른 말로 하면, 선행 작업에서 OperationCanceledException 예외가 던져졌으며, 그 CancellationToken 속성이 선행 작업을 시작할 때 지정한 취소 토큰과 부합한다.
    • 조건부 연속 작업의 실행 조건들을 선행 작업이 만족하지 못해서 선행 작업이 암묵적으로 취소되었다.
  • 여기서 중요한 점 하나는, 이 플래그들 때문에 연속 작업이 실행되지 않았다고 해서 그것이 잊혀지거나 폐기된 것은 아니라는 것이다. 그런 경우 연속 작업은 그냥 취소된 것이다.
    • 따라서 그 연속 작업을 선행 작업으로 하는 추가적인 연속 작업은 여전히 실행된다(물론, 그 연속 작업 자체에 NotOnCanceled를 실행 조건으로 걸지 않았다고 할 때)
Task t1 = Task.Factory.StartNew(...);
Task fault = t1.ContineWith(ant => Console.WriteLine("fault"), taskContinuationOptions.OnlyOnFaulted);
Task t3 = fault.ContinueWith(ant => Console.WriteLine("t3"));
  • 이 예에서 t3은 항상 실행 일정이 잡힌다. 특히 t1이 예외를 던지지 않아도 실행된다(아래 그림). 이는 t1이 성공해도 fault는 취소될 뿐이기 때문이다. 게다가 t3에는 아무런 조건도 걸려 있지 않으므로, 결과적으로 t3은 무조건 실행된다.

  • fault가 실제로 실행되었을 때만 t3이 실행되게 하려면 다음처럼 해야 한다.
Task t3 = fault.ContinueWith(ant => Console.WriteLine("t3"), TaskContinuationOptions.NotOnCanceled);
  • 아니면 OnlyOnRanToCompletion을 지정해도 된다. 차이점은 fault 안에서 예외가 던져지면 t3이 실행되지 않는다는 것이다.

선행 작업이 여러 개인 연속 작업

  • TaskFactory 클래스의 ContinueWhenAll과 ContinueWhenAny를 이용하면 여러 개의 선행 작업이 완료되었을 때 연속 작업이 실행되게 할 수도 있다. 그러나 이 메서드들이 하는 일은 .NET Framework에 새로 도입된 작업 조합기들로도 할 수 있다. 한 예로 다음과 같은 두 작업을 생각해 보자.
var task1 = Task.Run(() => Console.Write("X"));
var task2 = Task.Run(() => Console.Write("Y"));
  • 이 두 작업이 모두 완료되었을 때 연속 작업이 실행되게 하려면 다음과 같이 하면 된다.
var continuation = Task.Factory.ContinueWhenAll(new[] { task1, task2 }, tasks => Console.WriteLine("Done"));
  • 그러나 WhenAll 작업 조합기로도 같은 결과를 얻을 수 있다.
var continuation = Task.WhenAll(task1, task2).ContinueWith(ant => Console.WriteLine("Done"));

선행 작업 하나에 대한 여러 개의 연속 작업

  • 같은 작업 객체에 대해 ContinueWith를 여러 번 호출함으로써 하나의 선행 작업에 여러 개의 연속 작업을 부착할 수 있다. 그 선행 작업이 완료되면 모든 연속 작업이 동시에 실행을 시작한다.(단, TaskContinuationOptions.ExecuteSynchronously를 지정하면 연속 작업들이 차례로 실행된다.)
  • 다음 코드는 1초 지연후 “XY” 또는 “YX”를 출력한다.
var t = Task.Factory.StartNew(() => Thread.Sleep(1000));
t.ContinueWith(ant => Console.Write("X"));
t.ContinueWith(ant => Console.Write("Y"));

작업 스케쥴러

  • 추상 클래스 TaskScheduler로 대표되는 작업 스케쥴러는 작업 객체들을 스레드들에 배정하는 역할을 담당한다. .NET Framework는 두 가지 구현 클래스를 제공한다. 하나는 CLR 스레드 풀을 사용하는 기본 스케줄러에 해당하는 클래스이고, 다른 하나는 동기화 문맥 스케줄러에 해당하는 클래스이다.
    • 후자는 (기본적으로) WPF나 Windows Forms의 스레드 적용 모형을 위해 만들어진 것이다. 그런 스레드 적용 모형에서 UI 요소나 컨트롤에는 그것을 생성한 스레드만 접근할 수 있다.
    • 동기화 문맥이 존재한다고 할 때, 어떤 작업 또는 연속 작업이 그 문맥 안에서 실행되게 하려면 다음과 같이 하면 된다.
// 현재 스레드가 Windows Forms/WPF 응용 프로그램의 한 UI 스레드라고 가정할 떄
_uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
  • Foo가 문자열을 돌려주는 계산량 한정 메서드이고, lblResult가 WPF나 Windows Forms의 이름표 컨트롤이라고 할 때, 다음은 위의 작업이 완료된 후 이름표를 안전하게 갱신하는 코드이다.
Task.Run(() => Foo()).ContinueWith(ant => lblResult.Content = ant.Result, _uiScheduler);
  • 물론 이런 종류의 일은 그냥 C#의 비동기 함수들을 사용해서 해결하는 경우가 더 많다.
  • 독자만의 작업 스케쥴러를 작성하는 것도 가능하지만(ITaskScheduler를 파생해서) 그런 일은 아주 특별한 상황에서나 필요하다. 그냥 TaskCompletionSource를 사용해서 작업의 실행 일정을 적절히 조정하는 것으로 충분한 경우가 더 많다.

TaskFactory

  • 정적 속성 Task.Factory는 기본 TaskFactory 객체를 돌려준다. TaskFactory는 작업 생성을 위한 팩토리를 대표하는 클래스로 다음 세 종류의 작업을 생성할 수 있다.
    • 보통 작업(StartNew로 생성)
    • 선행 작업이 여러 개인 연속 작업(ContinueWhenAll이나 ContinueWhenAny로 생성)
    • 이제는 필요 없는 비동기 프로그래밍 모형을 따르는 메서드를 감싸는 작업(FromAsync로 생성)
  • Task를 인스턴스화하고 Start를 호출해서 작업을 생성하는 방법도 있지만, 이 방법으로는 보통의 작업만 생성할 수 있을 뿐 연속 작업은 생성할 수 없다.

커스텀 작업 팩토리 만들기

  • TaskFactory는 추상 클래스가 아니다. 실제로 이 클래스를 인스턴스화 해서 사용할 수 있다. 같은(그리고 기본 값은 아닌) TaskCreationOptions 값이나 TaskContinuationOptions, TaskScheduler 값들을 지정해서 작업을 거듭해서 생성해야 할 때 TaskFactory가 유용하다.
    • 예컨대 오래 실행되며 부모가 있는(parented) 작업을 거듭 생성해야 한다고 하자. 그런 경우 일단 다음과 같은 커스텀 작업 팩토리를 만든다.
var factory = new TaskFactory(TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
  • 그런 다음에는 그냥 이 팩토리에 대해 StartNew만 호출하면 된다.
Task task1 = factory.StartNew(Method1);
Task task2 = factory.StartNew(Method2);
...
  • 커스텀 연속 작업 옵션들은 ContinueWhenAll이나 ContinueWhenAny를 호출할 때 적용된다.

AggregateException 다루기

  • 앞에서 보았듯이 PLINQ와 Parallel 클래스, 그리고 Task는 발생한 예외를 자동으로 소비자에게 인도한다. 그런 처리가 필요한 이유를 살펴보기 위해 다음과 같이 첫 반복에서 DivideByZeroException을 던지는 LINQ 질의를 생각해 보자.
try
{
  var query = from i in Enumerable.Range(0, 1000000)
      select 100 / i;
  ...
}
catch (DivideByZeroException)
{
  ...
}
  • 만일 PLINQ로 이 질의를 병렬화했는데, PLINQ가 앞에서 말한 방식으로 예외를 처리하지 않는다면, DivideByZeroException 예외가 현재 스레드가 아닌 다른 스레드에서 던져질 수 있다. 그러면 예외가 catch 블록에서 걸리지 않으므로 응용 프로그램이 죽게 된다.
  • 이 때문에 병렬화 라이브러리들은 예외를 자동으로 잡아서 호출자에게 다시 던진다. 그러나 안타깝게도 이런 상황을 그냥 DivideByZeroException 예외 하나를 잡아서 처리할 수는 없다. 이 라이브러리들은 다수의 스레드를 활용하므로 둘 이상의 예외가 동시에 던져질 가능성이 실제로 존재한다.
    • 모든 예외를 확실히 보고하기 위해 라이브러리들은 자신이 잡은 예외들을 하나의 AggregateException 컨테이너로 감싼다. 잡힌 예외들은 AggregateException 객체의 InnerExceptions 속성에 담겨 있다.
try
{
  var query = from i in Enumerable.Range(0, 1000000)
      select 100 / i;
  ...
}
catch (AggregateException aex)
{
  foreach(Exception ex in aex.InnerExceptions)
    Console.WriteLine(ex.Message);
}
  • PLINQ와 Parallel 클래스 둘 다 첫 예외 발생시 질의 또는 루프 실행을 끝낸다. 여기서 ‘끝낸다’는 더 이상의 요소를 처리하지 않거나 루프 본문을 더 반복하지 않음을 뜻한다.
    • 그러나 현재 요소 또는 현재 반복을 마치기 전에 또 다른 예외가 발생할 수는 있다. AggregateException의 첫 예외는 InnerException 속성에서 볼 수 있다.

Flatten 메서드와 Handle 메서드

  • AggregateException 클래스에는 예외 처리를 단순화 하는데 도움이 되는 메서드가 두 개 있다. 바로 Flatten과 Handle이다.

Flatten

  • 한 AggregateException 객체가 또 다른 AggregateException 객체를 담는 경우도 흔한다. 예컨대 자식 작업이 예외를 던지면 그런 일이 발생할 수 있다.
    • 예외 처리를 단순화 하기 위해 예외들의 내포 관계를 제거하고 싶다면 Flatten을 호출하면 된다. 이 메서든느 내부 예외들이 그냥 단순하게 나열된 형태의 새 AggregateException 객체를 돌려준다.
catch (AggregateException aex)
{
  foreach(Exception ex in aex.Flatten().InnerExceptions)
    Console.WriteLine(ex.Message);
}

Handle

  • 특정 형식의 예외들만 잡고 그 외의 예외들은 다시 던져야 할 때가 종종 있는데, AggregateException의 Handle 메서드를 이용하면 그런 처리를 간단히 수행할 수 있다. 이 메서드는 주어진 예외 술어(predicate)를 모든 내부 예외에 적용한다.
public void Handle(Func<Exception, bool> predicate)
  • 만일 술어가 true를 돌려주면 이 메서드는 그 예외를 ‘처리된’ 것으로 간주한다. 모든 내부 예외에 대해 술어를 적용한 후, 이 메서드는 내부 예외들을 다음과 같이 처리한다.
    • ‘처리된’ 예외. 즉 술어 대리자가 true를 돌려준 예외들은 다시 던지지 않는다.
    • ‘처리되지 않은’ 예외. 즉 술어 대리자가 false를 돌려준 예외들을 새 AggregateException에 담아서 다시 던진다.
  • 예컨대 다음 코드는 하나의 NullReferenceException을 담은 또 다른 AggregateException 객체를 다시 던진다.
var parent = Task.Factory.StartNew(() =>
{
  // 자식 작업 세 개를 이용해서 예외 세 개를 동시에 던진다.
  int[] numbers = { 0 };

  var childFactory = new TaskFactory(TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
  childFactory.StartNew(() => 5 / numbers[0]);  // 0으로 나누기
  childFactory.StartNew(() => 5 / numbers[1]);  // 범위 밖 색인
  childFactory.StartNew(() => { throw null; });  // 널 참조
});

try { parent.Wait(); }
catch (AggregateException aex)
{
  aex.Flatten().Handle(ex =>   // Flatten은 여전히 호출해 주어야 함을 주의할 것
  {
    if (ex is DivideByZeroException)
    {
      Console.WriteLine("0으로 나누기");
      return true;  // '처리된' 예외
    }
    if (ex is DivideByZeroException)
    {
      Console.WriteLine("범위 밖 색인");
      return true;  // '처리된' 예외
    }
    return fasle;  // 그 밖의 모든 예외는 다시 던진다.
  });
}

동시적 컬렉션

  • .NET Framework 4.0에는 System.Collections.Concurrent라는 이름공간이 추가되었는데, 여기에는 다음과 같은 새로운 컬렉션들이 들어 있다. 이들은 모두 스레드에 완전히 안전하다.
동시적 컬렉션 해당 비동시적 버전
ConcurrentStack<T> Stack<T>
ConcurrentQueue<T> Queue<T>
ConcurrentBag<T> (없음)
ConcurrentDictionary<TKey, TValue> Dictionary<TKey, TValue>

 

  • 동시적 컬렉션들은 동시성이 높은 시나리오에 최적화되어 있지만, 스레드에 안전한 컬렉션이 필요한 상황이라면 언제라도 보통의 컬렉션 주변을 잠그는 대신 이 컬렉션들을 유용하게 사용할 수 있다. 단, 다음과 같은 사항들을 조심해야 한다.
    • 동시성이 높은 상황을 제외하면 보통의 컬렉션이 동시적 컬렉션보다 성능이 우월하다.
    • 스레드에 안전한 컬렉션을 사용한다고 해서 코드가 저절로 스레드에 안전해지는 것은 아니다.
    • 동시적 컬렉션을 열거하는 도중에 다른 스레드가 그 컬렉션을 수정해도 예외가발생하지 않는다. 그냥 기존 내용과 새 내용이 뒤섞여 나올 뿐이다.
    • List<T>의 동시적 버전은 없다.
    • 동시적 스택, 대기열, 자루(bag) 클래스는 내부적으로 연결 목록(linked list)를 사용한다. 그래서 비동시적 Stack 클래스나 Queue 클래스보다 메모리 사용이 비효울적이다. 그러나 동시적 접근에는 연결 목록이 유리하다. 연결 목록은 무잠금(lock-free)이나 저잠금(low-lock) 구현에 도움이 되기 때문이다. (이는 한 노드를 연결 목록에 삽입할 때 그냥 참조 두 개만 갱신하면 되기 때문이다. 반면 List<T> 같은 자료구조에 요소를 삽입하려면 수천 개의 기존 요소들을 이동해야 할 수도 있다.)
  • 다른 말로 하면, 이 컬렉션들은 단지 보통의 컬렉션을 자물쇠로 감싸는 수준으로 간단하게 구현된 것이 아니다. 한 예로 만일 다음과 같이 동시적 컬렉션을 사용하는 코드를 하나의 스레드에서 실행하면
var d = new ConcurrentDictionary<int, int>();
for (int i = 0; i < 1000000; i++)
  d[i] = 123;
  • 다음과 같이 보통의 컬렉션을 사용하는 경우보다 세 배나 느리다.
var d = new Dictionary<int, int>();
for (int i = 0; i < 1000000; i++)
  lock (d)
    d[i] = 123;
  • 단, ConcurrentDictionary를 읽는 것은 빠르다. 읽기 연산에는 잠금이 없기 때문이다.
  • 보통의 컬렉션과의 또 다른 차이점은 동시적 컬렉션은 원자적인 ‘검사 후 작동(test-and-act)’ 연산을 수행하는 특별한 메서드들을 제공한다.
    • 예컨대 TryPop이 그러한 메서드이다. 이런 메서드들은 대부분 IProducerConsumerCollection<T> 인터페이스를 통해서 통합된다.

IProducerConsumerCollection<T> 클래스

  • 다음 두 용범을 지원하는 컬렉션이면 어떤 것이라도 생산자/소비자 컬렉션(producer/consumer collection)으로 사용할 수 있다.
    • 컬렉션에 요소를 추가한다(‘생산’)
    • 컬렉션에서 요소를 가져온다. 그 요소는 컬렉션에서 제거된다(‘소비’)
  • 이런 컬렉션의 전형적인 예가 스택과 대기열이다. 효율적인 무잠금 구현에 도움이 된다는 점에서 생산자/소비자 컬렉션은 병렬 프로그래밍에서 중요하다.
  • 인터페이스 IProducerConsumerCollection<T>는 스레드에 안전한 생산자/소비자 컬렉션을 나타낸다. .NET Framework에서 이 인터페이스를 구현하는 구체 클래스는 다음 세 가지이다.
ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>
  • IProducerConsumerCollection<T>는 ICollection을 확장한다. 특히, 다음 네 메서드를 추가한다.
void CopyTo(T[] array, int index);
T[] ToArray();
bool TryAdd(T item);
bool TryTake(out T item);
  • TryAdd 메서드와 TryTake 메서드는 추가/제거 연산을 수행할 수 있는지 점검해서 할 수 있으면 수행한다. 점검과 수행이 원자적으로 일어나므로 자물쇠가 필요하지 않다. 보통의 컬렉션이라면 다음 예처럼 점검과 수행에 자물쇠를 걸어야 한다.
int result;
lock (myStack) if (myStack.Count > 0) result = myStack.Pop();
  • TryTake는 만일 컬렉션이 비어 있으면 항상 false를 돌려준다. 앞의 세 구체 클래스의 경우 TryAdd는 항상 성공하며, 따라서 항상 true를 돌려준다.
    • 예컨대 중복 요소를 금지하는 커스텀 동시적 컬렉션(이를테면 집합(set)을 나타내는 동시적 컬렉션)을 작성하는 경우에는 만일 주어진 요소가 컬렉션이 이미 존재하면 false를 돌려주도록 TryAdd를 구현해야 할 것이다.
  • TryTake가 구체적으로 어떤 요소를 제거하는지는 구체 클래스마다 다르다.
    • 스택(ConcurrentStack<T>)의 TryTake는 가장 최근에 추가된 요소를 제거한다.
    • 대기열(ConcurrentQueue<T>)의 TryTake는 가장 오래전에 추가된 요소를 제거한다.
    • 자루(ConcurrentBag<T>)의 TryTake는 자신이 가장 효율적으로 제거할 수 있는 요소를 제거한다.
  • 세 구체 클래스 중 대기열과 스택은 TryTake와 TryAdd 메서드를 명시적으로 구현해서 숨기고, 대신 해당 기능성을 각각 TryDequeue와 TryPop이라는 좀 더 구체적인 이름의 공용 메서드를 통해서 제공한다.

ConcurrentBag<T> 클래스

  • ConcurrentBag<T>는 객체들의 순서 없는 컬렉션을 저장한다(중복 요소를 허용 함). ConcurrentBag<T>는 ‘푸대자루’처럼 막 사용할 수 있는 동시적 컬렉션을 대표한다. 특히 이 컬렉션은 Take나 TryTake를 호출했을 때 구체적으로 어떤 요소가 나오는지가 중요하지 않을 때 적합하다.
  • 동시적 대기열이나 스택보다 ConcurrentBag<T>가 우월한 점은, 여러 스레드가 동시에 Add 메서드를 호출해도 경합이 거의 없다는 점이다. 반면 대기열이나 스택에 대해 Add를 병렬로 호출하면 약간의 경합이 생긴다(비동시적 컬렉션을 잠글 때보다는 경합이 훨씬 적지만).
    • Take의 호출도 아주 효율적이다. 단, 스레드가 자신이 Add로 추가한 것보다 더 많은 요소를 꺼내려 하면 효율이 떨어진다.
  • 동시적 자루에 접근하는 모든 스레드는 각자 고유한 전용 연결 목록을 사용하게 된다. 한 스레드에서 Add를 호출하면 해당 요소는 그 스레드의 전용 목록에 추가된다.
    • 자루 전체를 열거하면 열거자는 각 스레드의 적용 목록을 훑으면서 요소들을 차례로 산출한다.
  • Take를 호출하면 자루는 먼저 현재 스레드의 전용 목록을 본다. 만일 그 목록에 요소가 하나라도 있으면 그냥 그 요소를 돌려준다. 이 경우에는 어떠한 경합도 일어나지 않는다.
    • 목록이 비었으면, 다른 스레드의 전용 목록에서 요소를 ‘훔쳐와야’ 한다. 이떄는 약간의 경합이 발생할 수 있다.
  • 좀 더 구체적으로 말해서 Take를 호출하면 그 스레드의 목록에 가장 최근에 추가된 요소가 반환된다. 만일 그 스레드의 목록에 요소가 하나도 없으면 자루는 무작위로 다른 스레드를 하나 선택해서 그 스레드가 마지막으로 추가한 요소를 돌려준다.
  • 동시적 자루는 컬렉션에 대한 병렬 연산의 대부분이 요소 추가(Add 호출)일 때, 또는 Add와 Take 호출 횟수가 거의 비슷할 때 이상적이다.
    • 다음은 이전의 병렬 철자 검사 예제에서 Parallel.ForEach를 이용해서 시험용 단어 목록을 채우는 예를 동시적 자루를 이용해서 다시 구현한 것이다.
var missspellings = new ConcurrentBag<Tuple<int, string>>();

Parallel.ForEach(wordsToTest, (word, state, i) =>
{
  if (!wordLookup.Contains(word))
    misspellings.Add(Tuple.Create((int)i, word));
});
  • 동시적 자루를 생산자/소비자 대기열로 사용하는 것은 바람직하지 않다. 생산자/소비자 모형에서는 요소들을 추가하는 스레드와 제거하는 스레드가 서로 다를 수 있기 때문이다.

BlockingCollection<T> 클래스

  • 앞 절에서 다음 세 생산자/소비자 컬렉션을 소개했다.
ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>
  • 이런 컬렉션들이 비어 있는 상태에서 TryTake 메서드를 호출하면 메서드는 즉시 false를 돌려준다. 그런데 그보다는 컬렉션에 요소가 추가될 때까지 기다리는(즉, 메서드 호출이 차단되는) 것이 더 유용할 때도 있다.
  • PFX 설계자들은 그런 기능성을 위해 TryTake 메서드를 중복적재하는 대신 (TryTake는 이미 취소 토큰과 만료 시간을 지원하기 위해 여러 번 중복 적재되어 있다), 그런 기능성을 담은 BlockingCollection<T>라는 래퍼 클래스를 추가했다.
    • ‘차단 컬렉션(blocking collection)’을 대표하는 이 클래스는 IProducerConsumerCollection<T>를 구현하는 임의의 컬렉션을 감싸며, Take 호출 시 바탕 컬렉션에 요소가 하나도 없으면 요소가 생길 때까지 기다린다.
  • 차단 컬렉션은 만일 컬렉션이 일정 크기에 도달하면 생산자를 차단함으로써 컬렉션의 전체 크기를 제한하는 기능도 제공한다. 그런 식으로 크기가 제한된 컬렉션을 유계 차단 컬렉션(bounded blocking collection)이라고 부른다.
  • BlockingCollection<T>를 사용하는 방법은 다음과 같다.
    1. 클래스를 인스턴스화 한다. 이때 필요하면 바탕 컬렉션(IProducerConsumerCollection<T> 구현 객체)과 그 컬렉션의 최대 크기(상계)를 지정한다.
    2. Add나 TryAdd를 호출해서 바탕 컬렉션에 요소들을 추가한다(‘생산’)
    3. Take나 TryTake를 호출해서 바탕 컬렉션에서 요소들을 조회 및 제거한다(‘소비’)
  • BlockingCollection<T> 생성자 호출 시 바탕 컬렉션을 지정하지 않으면 클래스가 자동으로 ConcurrentQueue<T>를 인스턴스화 한다.
    • 생산 및 소비 메서드들에는 취소 토큰과 만료 시간을 받는 중복적재 버전들도 있다.
    • 최대 크기를 지정해서 생성한 컬렉션에 대한 Add와 TryAdd 호출은 차단될 수 있다.
    • 한편 컬렉션이 비어 있으면 Take와 TryTake 호출이 차단된다.
  • 요소들을 소비하는 또 다른 방법은 GetConsumingEnumerable을 호출하는 것이다. 이 메서드는 요소가 생길 때마다 산출하는, 잠재적으로 무한한 순차열을 돌려준다.
    • 순차열을 강제로 종료하려면 CompleteAdding을 호출하면 된다. 그러면 요소들을 더 이상 추가할 수 없게 되는 효과도 생긴다.
  • BlockingCollection은 또한 AddToAny와 TakeFromAny라는 정적 메서드도 제공한다. 이들을 이용하면 여러 차단 컬렉션에 하나의 요소를 추가하거나 여러 차단 컬렉션에서 하나의 요소를 가져올 수 있다.
    • 이 정적 메서드들은 추가 또는 제거가 가능한 첫 번째 컬렉션에 대해 해당 연산을 수행한다.

생산자/소비자 대기열 작성

  • 생산자/소비자 대기열은 병렬 프로그래밍과 일반적인 동시성 상황 모두에서 유용한 자료구조이다. 생산자/소비자 대기열읠 전형적인 용법은 다음과 같다.
    • 일거리 항목 또는 해당 일거리에 쓰이는 자료를 담는 대기열을 만든다.
    • 어떤 작업을 실행할 때가 되면 해당 목록을 대기열에 추가한다. 추가한 스레드는 그 즉시 자신의 다음 일로 넘어간다.
    • 배경에서 하나 이상의 일꾼 스레드가 대기열에서 일거리 항목을 가져가서 수행한다.
  • 생산자/소비자 대기열을 이용하면 한 번에 실행되는 일꾼 스레드의 개수를 세밀하게 제어할 수 있다. 이는 CPU 소비량 뿐만 아니라 다른 자원의 사용량을 제한하는데도 유용하다.
    • 예컨대 작업이 디스크 입출력을 많이 수행하는 경우 운영체제와 다른 응용 프로그램의 디스크 입출력 기회를 빼앗지 않으려면 동시성을 적절히 제한할 필요가 있다.
    • 생산자/소비자 대기열의 또 다른 장점은 대기열이 살아 있는 동안 동적으로 일꾼 스레드들을 추가하거나 제거할 수 있다는 점이다. CLR의 스레드 풀 자체가 일종의 생산자/소비자 대기열(짧게 실행되는 계산량 한정 작업에 최적화된)이다.
  • 일반적으로 생산자/소비자 대기열에는 작업 수행시 사용할 자료 항목들을 담는다. 그리고 자료 항목마다 동일한 작업을 수행한다. 이를테면 대기열에 파일 일므들을 담아 두고, 일꾼 스레드들이 그 파일 이름을 이용해서 해당 파일을 암호화하는 예를 생각할 수 있다.
    • 그러나 대기열에 대리자들을 담게 하면 일꾼 스레드들이 서로 다른 일을 하 ㄹ수 있는 좀 더 범용적인 생산자/소비자 대기열이 된다.
  • AutoResetEvent를 이용해서(그리고 나중에는 Monitor의 Wait와 Pulse를 이용해서) 생산자/소비자 대기열을 처음부터 직접 작성하는 방법이 http://albahari.com/threading에 나온다.
    • 그러나 .NET Framework 4.0 부터는 생산자/소비자 대기열을 독자가 처음부터 직접 만들 필요가 없다. 필요한 대부분의 기능성을 BlockingCollection<T>가 제공하기 때문이다.
    • 다음은 BlockingCollection<T>를 이용한 생산자/소비자 대기열 클래스의 예이다.
public class PCQueue : IDisposable
{
  BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();

  public PCQueue(int workerCount)
  {
    // 소비자마다 개별적인 Task 객체를 만들어서 실행한다.
    for (int i = 0; i < workerCount; i++)
      Task.Factory.StartNew(Consume);
  }
  
  public void Enqueue(Action action) { _taskQ.Add(action); }

  void Consume()
  {
    // 다음의 순차열 열거는 가져올 요소가 없으면 차단되며, CompleteAdding이 호출되면 종료된다.
    foreach (Action action in _taskQ.GetConsumingEnumerable())
      action();
  }

  public void Dispose() { _taskQ.CompleteAdding(); }
}
  • 인수 없이 BlockingCollection 생성자를 호출했으므로 BlockingCollection은 자동으로 동시적 대기열을 인스턴스화 한다. 만일 ConcurrentStack 객체를 넘겨 주었다면 생산자/소비자 대기열이 아니라 생산자/소비자 스택이 만들어졌을 것이다.

Task 클래스 활용

  • 조금 전에 만든 생산자/소비자 대기열은 대기열에 추가한 일거리 항목들을 추적할 수 없다는 점에서 다소 유연하지 못하다. 다음과 같은 기능들을 추가한다면 좋을 것이다.
    • 일거리 항목이 완료되었는지 알나낸다(그리고 await를 적용해서 완료를 기다린다)
    • 일거리 항목을 취소한다.
    • 일거리 항목이 던진 모든 예외를 매끄럽게 처리한다.
  • 이상적인 해법은 위의 기능들을 제공하는 어떤 객체를 돌려주도록 Enqueue 메서드를 개선하는 것이다. 다행히 바로 그런 기능들을 제공하는 클래스가 이미 있다.
    • 바로 Task 클래스이다. 이 클래스의 객체는 TaskCompletionSource로 생성해도 되고, 아니면 생성자를 이용해서 직접 인스턴스화해도 된다(이 경우 시작되지 않은, 즉 차가운 작업 객체가 만들어진다)
public class PCQueue : IDisposable
{
  BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();

  public PCQueue(int workerCount)
  {
    // 소비자마다 개별적인 Task 객체를 만들어서 실행한다.
    for (int i = 0; i < workerCount; i++)
      Task.Factory.StartNew(Consume);
  }
  
  public Task Enqueue(Action action, CancellationToken cancelToken = default(CancellationToken)) 
  { 
    var task = new Task(action, cancelToken);
    _taskQ.Add(action); 
    return task;
  }
  
  public Task Enqueue(Func func, CancellationToken cancelToken = default(CancellationToken)) 
  { 
    var task = new Task(func, cancelToken);
    _taskQ.Add(action); 
    return task;
  }

  void Consume()
  {
    // 다음의 순차열 열거는 가져올 요소가 없으면 차단되며, CompleteAdding이 호출되면 종료된다.
    foreach (Action action in _taskQ.GetConsumingEnumerable())
    {
      try
      {
        it (!task.IsCanceled) task.RunSynchronously();
      }
      catch (InvalidOperationException) { }  // 경쟁 조건
    }
  }

  public void Dispose() { _taskQ.CompleteAdding(); }
}
  • 이 예제에서 Enqueue 메서드는 일거리 항목을 대기열에 추가한 후, 생성만 하고 실행을 시작하지는 않은 작업 객체를 호출자에게 돌려준다.
  • 한편 Consume 메서드는 작업 객체를 소비자의 스레드에서 동기적으로 실행한다. 또한 그 작업이 취소되었는지 점검하는 지점과 그 작업을 실행하는 지점 사이에서 작업이 취소되는(그럴 확률이 낮긴 하지만) 상황을 해결하기 위해 InvalidOperationException 예외를 잡아서 처리한다.
  • 다음은 이 클래스의 사용법을 보여주는 예이다.
var pcQ = new PCQueue(2);  // 최대 동시성은 2
string result = await pcQ.Enequeue(() => "참 쉽죠?");
...

 

[ssba]

The author

지성을 추구하는 디자이너/ suyeongpark@abyne.com

댓글 남기기

This site uses Akismet to reduce spam. Learn how your comment data is processed.