piątek, 18 lipca 2014

Wielowątkowość - zmienne atomowe i nieblokujące algorytmy

Wady blokad


Synchronizacja jest czasami zbyt ciężkim rozwiązaniem np do implementowania licznika.
Dodatkowo mechanizm ten pociąga za sobą niebezpieczeństwo związane z żywotnością a także ogranicza skalowalność i wydajność - np zablokowany proces nie może nic robić w trakcie jak oczekuje na blokadę.
Część struktur danych da się zaimplementować nie korzystając z synchronizacji. Część z nich można zaimplementować za pomocą słowa kluczowego volatile - ale może ono być stosowane jedyne w prostych pojedyńczych operacjach w których poprzedni stan nie wpływa na przyszły, czyli np nie można jej zastosować do liczników które są dość prostym rodzajem stanu.
Rozwiązaniem tego problemu są zmienne atomowe i nieblokujące algorytmy które można dzięki nim zaimplementować.

Sprzętowe wspomaganie wielowątkowości


Blokowanie często można nazwać pesymistycznym a czasami wystarczy porównać czy zmienna którą chcemy ustawić została zmieniona od czasu kiedy ją odczytaliśmy i zmieniliśmy - nazywa się to blokowaniem optymistycznym (używanym także np hibernacie do updatowania danych w bazie danych). Dzisiaj prawie wszystkie procesory posiadają jakąś implementację atomowej operacji odczytu-zapisu. Najpopularniejszą z nich jest compare and swap - CAS.
CAS ma trzy argumenty - V - miejsce w pamięci do zmiany, A - oczekiwana aktualna wartość, B - nowej wartość. CAS atomowo zmieni V na B tylko jeśli aktualna wartość to A w przeciwnym razie nic zmieni w miejscu V.
W takim wypadku jeśli wiele wątków zmienia V to część z nich przegra - ale nie są one blokowane i mogą ponowić swoją operacje, dodatkowo mamy pewność że jeden na pewno wygra a to powoduje że zawsze nastąpi postęp. Właśnie z takich instrukcji korzysta JVM do implementacji zmiennych atomowych.

Zmienne atomowe


Istnieje 12 klas zmiennych atomowych:
  • skalary: AtomicLong, AtomicInteger, AtomicBoolean, AtomicReference
  • tablice: AtomicLongArray, AtomicIntegerArray, AtomicReferenceArray
  • aktualizatory pól: AtomicLongFieldUpdater, AtomicIntegerFieldUpdater...
  • złożone zmienne: AtomicStampedReference, AtomicMarkedReference
Zmienne atomowe można traktować jako takie lepsze zmienne volatile. Ich dodatkowym atrybutem jest możliwość zmiany w stosunku do poprzedniego stanu. Tą funkcjonalność na zmiennych volatile można uzyskać korzystając z aktualizatorów pól np klasy AtomicLongFieldUpdater i ich metod np compareAndSet - należy pamiętać że jest to rozwiązanie trochę słabsze bo działa jedynie kiedy wszystkie zmiany na zmiennej dokonywane są aktualizatorem - jeśli nie to rozwiązanie nie będzie działać.

W kwestii wydajności zmienne atomowe w porównaniu z rozwiązaniami bazującymi na blokadach wypadają dużo lepiej przy średnim obciążeniu zmian na polu. Jeśli zmian na polu jest bardzo dużo (co jest nierealistyczne) to zmienne atomowe mogą działać gorzej - jest to oczywiste ponieważ w takim wypadku większość wątków nie może wykonać zmiany na polu i powtarza operację wiele razy co powoduje wiele przełączeń kontekstu.

Algorytmy nieblokujące


Dzięki zmiennym atomowym i operacjami CAS można stworzyć algorytmy które, jeśli dobrze zaprojektowane, będą działać szybko w obu przypadkach - dużym i małym obciążeniu, oczywiście przy nierealistycznie dużym obciążeniu algorytm może działać słabo, należy jednak pamiętać że w takich algorytmach zawsze przynajmniej jeden wątek wykonuje postęp a inne nie są zablokowane więc zawsze ktoś wykona postęp.

Przykład stosu nieblokującego:

public class NonBlockingStack<T> {
private AtomicReference<Node<T>> top = new AtomicReference<>();
public void push(T data) {
Node<T> expextedTop;
Node<T> newTop = new Node<T>(data);
do {
expextedTop = top.get();
newTop.setNext(expextedTop);
} while (!top.compareAndSet(expextedTop, newTop));
}
public T pop() {
Node<T> expextedTop;
do {
expextedTop = top.get();
if (expextedTop == null) {
return null;
}
} while (top.compareAndSet(expextedTop, expextedTop.getNext()));
return expextedTop.getData();
}
private static class Node<T> {
private T data;
private Node<T> next;
public Node(T data) {
this.data = data;
}
public T getData() {
return data;
}
public Node<T> getNext() {
return next;
}
public void setNext(Node<T> next) {
this.next = next;
}
}
}


czwartek, 17 lipca 2014

Wielowątkowość - klasy zależne od stanu

Metody zależne od stanu klas


Wiele klas posiada logikę która pozwala wykonać daną operacje na podstawie stanu. Np nie można wyjąć danych z pustego stosu. Programy jednowątkowe powinny na takich operacjach wyrzucić błąd ale te wielowątkowe mogą poczekać aż operacja może być wykonana.
Struktura takiej operacji może wyglądać tak

1. Uzyskać blokadę na stanie
2. Dopóki (warunek nie jest spełniony)
2.1. Zwolnij blokadę
2.2. Poczekaj aż warunek może być spełniony
2.3. Opcjonalnie wyrzuć wyjątek jeśli jeśli wystąpi przerwanie albo upłynął limit czasu
2.4. Uzyskaj blokadę i wróć do punktu 2
3. Wykonaj operację
4. Zwolniej blokadę

Do takich operacji służą kolejki warunkowe (condition queues) -  metody wait, notify, notifyAll jeśli korzystamy z zwykłych blokad instrinct lock - za pomocą słowa kluczowego synchronized.
Przykład stosu z ograniczoną wielkością (dość toporny ale działający):

public class BoundedStack<N> {
    private Node<N> top;
    private int maxElements;
    private int count = 0;

    public BoundedStack(int maxElements) {
        this.maxElements = maxElements;
    }
    private static class Node<N> {
        private final N element;
        private final Node<N> next;
        public Node(N element, Node<N> next) {
            super();
            this.element = element;
            this.next = next;
        }
    }
    public synchronized void push(N element) throws InterruptedException {
        while (isFull()) {
            wait();
        }

        top = new Node<>(element, top);
        count++;
        notifyAll();
    }
    public synchronized N pop() throws InterruptedException {
        while (isEmpty()) {
            wait();
        }

        N result = top.element;
        top = top.next;
        count--;
        notifyAll();
        return result;
    }
    private synchronized boolean isEmpty() {
        return top == null;
    }
    private synchronized boolean isFull() {
        return count == maxElements;
    }
}


Kolejki warunkowe dla bloakad synchronized mogą być powiązane z wieloma warunami - jak w przykładowym ograniczonym stosie - mamy dwa warunki - isFull, isEmpty.
Metoda wait() zwalnia blokadę i usypia wątek po jego przebudzeniu wątek musi znowu ją uzyskać, nie jest w żaden sposób uprzywilejowany w stosunku do innych wątków próbujących uzyskać daną blokadę. Po uzyskaniu blokady należy po raz kolejny sprawdzić warunek ponieważ nie musi on być spełniony - dlatego należy metodę wait wywołać w pętli z niespełnionym warunkiem.
Należy nie zapomnieć o notyfikacji innych wątków po tym jak stan się zmieni - jeśli tego nie zrobimy wątki nigdy nie wybudzą się z metod wait. Do notyfikacji służą metody notify i notifyAll.
Można korzystać z metody notify jeśli istnieje tylko jeden warunek i każdy wątek wykonuje taką samą logikę po wyjściu z metody wait. Dodatkowo tylko jeden wątek ma prawo wykonywać pracę po wybudzeniu z notyfikacji. Jęśli nie jesteśmy tego pewni powinniśmy korzystać z metody notifyAll ponieważ jest ona bezpieczniejsza i gwarantuje że choć jeden wątek wykona progres. Jednak należy pamiętać że każdy z wybudzonych wątków będzie próbował uzyskać blokadę aby na nowo sprawdzić warunek - może to powodować dużo przełączeń kontekstu ale jeśli nie program działa dość szybko to nie należy się tym przejmować.

Metoda wait występuje w 3 odmianach -bezparametrowo i z limitem czasu jest także wrażliwa na przerwania.

Obiekty warunków - praca z obiektami Lock


Obiekty Lock umożliwiają tworzenie wielu warunków na jednej blokadzie - dzięki temu można je nazwać i nie wybudzać wszystkich wątków po wykonaniu operacji.
Na przykład nasza zmieniona klasa stosu może mieć postać:

public class BoundedStackWithLock<N> {
    private Node<N> top;
    private int maxElements;
    private int count = 0;
    private Lock stackLock = new ReentrantLock();
    private Condition isEmptyCondition = stackLock.newCondition();
    private Condition isFullCondition = stackLock.newCondition();
    public BoundedStackWithLock(int maxElements) {
        this.maxElements = maxElements;
    }
    private static class Node<N> {
        private final N element;
        private final Node<N> next;
        public Node(N element, Node<N> next) {
            super();
            this.element = element;
            this.next = next;
        }
    }
    public void push(N element) throws InterruptedException {
        stackLock.lock();
        while (isFull()) {
            isFullCondition.await();
        }
        top = new Node<>(element, top);
        count++;
        isEmptyCondition.signal();
        stackLock.unlock();
    }
    public N pop() throws InterruptedException {
        stackLock.lock();
        while (isEmpty()) {
            isEmptyCondition.await();
        }
        N result = top.element;
        top = top.next;
        count--;
        isFullCondition.signal();
        stackLock.unlock();
        return result;
    }
    private boolean isEmpty() {
        return top == null;
    }
    private boolean isFull() {
        return count == maxElements;
    }
}


Należy pamiętać aby przez pomyłkę nie wywołać metod wait, notify, notifyAll na obiektach condition. Ich odpowiedniki to await, signal, signalAll. Jest także dodatkowa metoda awaitUninterruptibly która jak sama nazwa wskazuje nie jest wrażliwa na przerwania.



poniedziałek, 14 lipca 2014

Wielowątkowość - blokady interfejsu Lock

Interfejs Lock


Interfejs lock specyfikuje kilka interesujących metod odróżniających go od blokad nienazwanych (intrinsic locks) które działają na podstawie słowa kluczowego synchornized:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}


Klasa ReentrantLock implementuje ten interfejs oferując wilowejściową blokadę podobną do blokady synchronized. Więc po co ten nowy interfejs? Ponieważ blokady synchronized mają swoje ograniczenia a zastąpieniem ich obiektem Lock pomaga je wyeliminować. Te ograniczenia to:
  • blokada synchronized jest ograniczona tylko do jednego bloku, jej bloku - jest to także jej zaleta bo blokady synchronized się nie zwalnia a blokadę stworząną przy użyciu obiektu lock już trzeba zwolnić - zwykle w bloku finally
  • wątku oczekującego na blokadzie nie można przerwać a interfejs lock ma metodę lockinterruptibly lub tryLock dzięki którym można stworzyć zadanie które mimo oczekiwania na blokadę będzie można anulować poprzez przerwanie (interrupt)
  • blokada Lock ma możliwość pytania o blokadę i nie blokowania jeśli jej nie może uzyskać dzięki metodzie tryLock (bez parametrów) dzięki czemu można uzyskać blokadę bez blokowania wątku
  • blokada Lock ma możliwość uzyskania blokady z limitem czasu dzięki czemu można pisać zadania z budżetem czasu na ich wykonanie - metoda tryLock z parametrami
  • blokada instrinct lock umożliwia wywoływanie metod wait, notify i notifyAll ale nie są one powiązane z warunkami których może być wiele, dzięki blokadzie Lock można stworzyć wiele warunków dla jeden blokady i na warunkach wywoływać metody await, signal, signalAll (interfejs Condition i metoda newCondition)
  • klasa ReentrantLock umożliwia stworzenie blokady która będzie sprawiedliwa (fair)
  • w javie 5 obiekty Lock działały dużo szybciej niż blokady synchronized jednak poprawiono to i w javie 6 nie ma już różnic w wydajności
Typowe użycie metody lock:

Lock lock = new ReentrantLock();
lock.lock();
try {
    // access the resource protected by this lock
} finally {
    lock.unlock();
}


Typowe użycie metody tryLock:

Lock lock = new ReentrantLock();
if (lock.tryLock()) {
    try {
        // manipulate protected state
    } finally {
        lock.unlock();
    }
} else {
    // perform alternative actions
}


Sprawiedliwość 

 

Tak jak wcześniej wspomniano możliwe jest stworzenie blokady która jest sprawiedliwa, domyślnie blokada lock jak i synchronized (instrinct lock) nie są sprawiedliwe. To znaczy że nie ma znaczenia który wątek w jakiej kolejności poprosił o dostęp do blokady. Statystycznie rzecz biorąc każdy w końcu otrzyma blokadę i nie ma tu zagrożenia żywotności a wybudzanie specjalnie wątku który jest pierwszy w kolejce może być - w praktyce jest - bardzo kosztowne. Także blokadę otrzymuje ten wątek który może ją otrzymać i użyć najszybciej - więc nie jest np uśpiony.
Sprawiedliwe blokady mogą się tylko przydać jeśli są trzymane bardzo długo.

Który sposób blokowania wybrać

 

Powinno się używać interfejsu lock tylko jeśli go potrzebujemy - czyli potrzebujemy jego zaawansowanych możliwości - w innym przypadku nie ma to sensu. Dużo prościej jest używać słowa kluczowego synchronized, nie trzeba także zwalniać blokad synchronized co sprawia że programowanie jest bezpieczniejsze. Blokady synchronized wcale nie są przestarzałym narzędziem cały czas są i będą używane.

Blokady do odczytu i zapisu: ReadWriteLock

 

Kolejnym sposobem na zwiększenie wydajności w dostępie do danych współdzielonych jest użycie blokad read write. Blokady te są użyteczne jeśli dominującą operacją na danych jest odczyt. W takim wypadku dostęp do czytania ma wiele wątków, a jeden wątek może pisać. Dane nie są w żaden sposób zagrożone i wątki zawsze widzą prawdziwą i ostatnią zapisaną wartość.
Implementajcją interfejsu ReadWriteLock jest klasa ReentrantReadWriteLock, ma ona możliwość określenia sprawiedliwości, jeśli blokada jest trzymana przez czytających i pojawia się wątek który chce pisać to więcej czytających nie dostaje dostępu zanim wątek zapisujący nie dostanie i zwolni blokady, możliwe jest zmiana blokady z zapisywania do czytania ale nie na odwrót.  

czwartek, 10 lipca 2014

Wielowątkowość - wydajność i skalowalność

Skalowalność  i wydajność



Skalowalność określa możliwość aby zwiększyć przepustowość lub wielkość wykonywanej pracy kiedy dostępne są dodatkowe zasoby takie jak procesor, pamięć, przepustowość I/O.

Należy pamiętać aby najpierw robić aplikacje która działa prawidłowo a następnie wykonywać optymalizacje i tylko jeśli jest ona potrzebna czyli aplikacja nie spełnia oczekiwań.

Prawo Amdahl'a

 

Cześć pracy można wykonać szybciej kiedy dostępnych jest więcej zasobów np zbieranie truskawek, natomiast część nie dlatego że jakaś cześć zadania może być wykonana tylko przez jeden wątek np rodzenie dzieci (9 kobiet nie urodzi 1 dziecka w miesiąc).
Większość wielowątkowych programów ma właśnie takie części które może wykonać wiele wątków i takie które mogą być wykonane tylko przez jeden wątek. Prawo Amdahla określa maksymalne przyspieszenie przy znanej części serializowalnej (jednowątkowej) i liczbie procesorów w stosunku do początkowej liczby procesorów.

Speedup <= 1/ (F + (1-F)/N)

F - ułamek pracy wykonywanej jednowątkowo
N - liczba procesorów

Przykładowo jeśli program nie ma części jednowątkowych (co jest niemożliwe) to przyspieszenie wyniesie 1/(0 + 1/N) = N czyli przy 5 procesorach będzie działać 5 razy szybciej.
Jeśli połowa programu jest wykonana jednowątkowo (F=0.5) to przy 5 procesorach przyspieszenie nie będzie większe niż = 1/ (0.5 + 0,5/5) = 1.67 czyli nie zwięszy się nawet dwukrotnie, tak naprawdę przyspieszenie będzie dążyć tutaj do 2.
Jeśli zwiększymy liczbę procesorów do nieskończoności to wartość przyspieszenia nie będzie większa niż 1/F.

To sprawia jak wielkie znaczenie ma cześć programu która może być wykonana przez jeden wątek dla skalowalności.
Nawet jeśli w naszej aplikacji mamy perfekcyjnie niezależne zadania to muszą one być pobierane np z kolejki i wyniki też najpewniej muszą być zapisywane więc nawet w takim idealnym przypadku część pracy jest wykonywana tylko przez jeden wątek.

Koszty związane z pracą z wieloma wątkami

 

  • Context switching - przeładowanie kontekstu
Jeśli liczba wątków jest większa niż liczba procesorów to muszą one działać naprzemiennie. Podczas zawieszania wątku i zaczynania innego następuje przeładowanie kontekstu. Nie jest to darmowa operacja ponieważ wymaga manipulowania strukturami danych w systemie operacyjnym i JVM. Po starcie nowego wątku jego dane nie będą w cachu procesora więc bedzię on działał na początku wolniej. To powoduje że każdy wątek bez względu ile czeka innych wątków dostaje jakiś minimalny czas pracy. Jeśli wątek zostaje zablokowany poprzez blokadę lub IO to może on być przełączony szybciej niż jego minimalny czas. Programy z dużą liczbą blokad i operacji IO mają dużo operacji przełączania kontekstu.
Przełączanie kontekstu to około 5000 do 10000 cykli procesora.

  • Synchronizacja danych
Synchronizowane dane poprzez blokady lub słowo kluczowe violatile powoduje to że nie mogą być wykonywane pewne optymalizacje ponieważ należy zagwarantować widzialność danych po ich zapisaniu. To pociąga też za sobą używanie specjalnych instrukcji nazywanych memory barriers.
Mogą one inwalidować cache co powoduje dodatkowe zwiększenie kosztów.
W przypadku synchronizacji należy rozróżnić blokady obciążone (contended) od nieobciążonych (uncontended). Np synchronizacja violatile jest nieobciążona i nie jest bardzo kosztowna (20-250 cykli procesora). Kompilator może także optymalizować wielokrotne blokady w jedną a także określić że blokada jest niepotrzebna bo dana jest niedostępna dla innych wątków np. vector który używany jest w metodzie (czyli dostępny jest na stosie nie stercie).

  • Bloki synchronizowane
Blokowanie które jest obciążone (contended) powoduje że wątek jest albo wstrzymywany i czeka na to aż inny wątek zwolni blokadę i go wybudzi, albo powtarza próbę dostępu do blokady (spin-waiting).
Pierwsze rozwiązanie jest dużo bardziej efektywne ale jeśli blokady są trzymane bardzo któtko to JVM może się przełączyć na pewnych z nich na spin-waiting. 
Wstrzymywanie wątku pociąga za sobą 2 przełączenia wątku.


Redukowanie obciążenia blokad


Jak widać wyżej - blokady powodują że cześć programu jest wykonywana tylko przez jeden wątek.
Obciążenie blokad może znacząco ogranicza skalowalność systemu.
Aby więc zredukować obciążenie należy:
  • zredukować czas trzymania blokady
  • zredukować częstotliwość używania blokad
  • zastąpić blokady innymi mechanizmami które zwiększą współbieżność
Należy pamiętać aby blokada była trzymana tak krótko jak tylko potrzeba - wszystkie niepotrzebne operacje powinny być wyjęte z bloku synchronizowanego.

Innym przykładem zredukowania obciążenia blokady jest jej podzielenie (lock striping) co jest np wykorzystywane w klasie ConcurentHashMap która ma 16 blokad każda obsługuje N%16 kubełek z danymi - to powoduje znaczną poprawę współbieżności.
Należy pamiętać aby ograniczyć używanie takzwanych gorących pól - np liczników itp które muszą być zmieniane przy każdej operacji - lub używać do nich zmiennych atomowych (AtomicInteger, AtomicReference itp).
Alternatywą mogą być jak już wspomniano zmienne atomowe lub blokady ReadWrite które mogę zwiększyć współbieżność jeśli większość operacji jest tylko do odczytu.


wtorek, 8 lipca 2014

Wielowątkowość - żywotność

Problemy żywotności aplikacji wielowątkowych


  • Deadlock
  • Livelock
  • Starvation

Deadlock - zakleszczenie

 

Najczęstrzą i najpoważniejszym problemem żywotności programów są zakleszczenia. Klasyczny problem zakleszczenia to wątek A posiadający blokadę M i czekający na N i wątek B posiadający blokadę N i czekający na M. Takie wątki nigdy nie wykonają postępu. Problem ten da się rozszerzyć na wiele wątków.
Oczekiwanie na blokadę można zilustrować jako graf oczekiwania na odblokowanie, jeśli graf ten posiada cykl to wystąpiło zakleszczenie.

Klasycznym przykładem tego problemu jest problem transferowania pieniędzy między kontami:

public class AccountTransfer {
    private static class Account {
        private String number;
        private Integer balance;
        public Account(String number, Integer balance) {
            this.number = number;
            this.balance = balance;
        }
        public void setNewBalance(Integer amount) {
            balance += amount;
        }
        public String getNumber() {
            return number;
        }
        public Integer getBalance() {
            return balance;
        }
    }
    private static class TransferTask implements Runnable {
        private Account from;
        private Account to;  
        public TransferTask(Account from, Account to) {
            this.from = from;
            this.to = to;
        }
        public void transfer(Account from, Account to, Integer amount) {
            synchronized (from) {
                synchronized (to) {
                    if (from.getBalance() > amount) {
                        from.setNewBalance(-1 * amount);
                        to.setNewBalance(amount);
                    }
                }
            }
        }
        @Override
        public void run() {
            Random r = new Random();
            while (true) {
                int nextInt = r.nextInt(100);
                System.out.println("Transfering " + nextInt);
                transfer(from, to, nextInt);
            }
        }
    }
    public static void main(String[] args) {
        Account account1 = new Account("1", 10000);
        Account account2 = new Account("2", 20000);
        new Thread(new TransferTask(account1, account2)).start();
        new Thread(new TransferTask(account2, account1)).start();
    }
}


Problemem przy korzystaniu z wielu blokad jest kolejność ich zakładania - ta musi być określona i jednakowa - do określienia kolejności można użyć kodu hash code, albo innego identyfikatora obiektu który jest unikalny. Czyli najpierw wykonujemy porówanie hash code obiektów i ten który ma mniejszy hascode ma zakładaną blokadę szybciej, w przypadku kolizji można użyć dodatkowej blokady.

Metoda transfer bez deadlocku (przy unikalności numerów):

public void transfer(Account from, Account to, Integer amount) {
            int compareTo = from.getNumber().compareTo(to.getNumber());
            if (compareTo > 0) {
                synchronized (from) {
                    synchronized (to) {
                        trasferInternal(from, to, amount);
                    }
                }
            } else {
                synchronized (to) {
                    synchronized (from) {
                        trasferInternal(from, to, amount);
                    }
                }
            }
        }
        private void trasferInternal(Account from, Account to, Integer amount) {
            if (from.getBalance() > amount) {
                from.setNewBalance(-1 * amount);
                to.setNewBalance(amount);
            }
        }



Największym problemem przy wykrywaniu/unikaniu zakleszczeń są obiekty które ze sobą kooperują, jeśli obiekt A w metodzie synchronizowanej (czyli trzymając blokadę) wywołuje metodę obiektu B to jest to tak zwana alien method - potencjalnie niebezpieczny kod - ponieważ metoda w obiekcie B jest enkapsulowana i nie wiadomo jakie blokady zakłada - czyli nie można zdeterminować ich kolejności.
W tym wypadku należy - o ile to możliwe ograniczyć blok synchroniczny do absolutnego minimum i wykonywać metodę z obiektu B poza nim - nazywa się to open call.

Zapobieganie zakleszczeniom

 

Najważniejszym przy zapobieganiu zakleszczeniom jest poprawne - jednakowe uszeregowanie blokad. Należy unikać wykonywaniu metod z obiektów obcych podczas trzymania blokady.
Można też użyć czasowych metod tryLock obiektu Lock - nie da się tego niestety zrobić z blokiem synchronized.
Pomocnym może być też analiza blokad za pomocą zrzutu wątków (thread dump).
Program jvisualvm wykrywa zakleszczenia i pokazuje komunikat. Można także użyć programu z konsoli: jstack [PID].

Przykładowy zrzut dla programu na wyżej:

Found one Java-level deadlock:
=============================
"Thread-1":
  waiting to lock monitor 0x00000000026356a0 (object 0x00000000d96a4938, a com.java.ro.concurency.chapter10.AccountTransfer$Account),
  which is held by "Thread-0"
"Thread-0":
  waiting to lock monitor 0x00000000026355f8 (object 0x00000000d96a4950, a com.java.ro.concurency.chapter10.AccountTransfer$Account),
  which is held by "Thread-1"

Java stack information for the threads listed above:
===================================================
"Thread-1":
    at com.java.ro.concurency.chapter10.AccountTransfer$TransferTask.transfer(AccountTransfer.java:40)
    - waiting to lock <0x00000000d96a4938> (a com.java.ro.concurency.chapter10.AccountTransfer$Account)
    - locked <0x00000000d96a4950> (a com.java.ro.concurency.chapter10.AccountTransfer$Account)
    at com.java.ro.concurency.chapter10.AccountTransfer$TransferTask.run(AccountTransfer.java:55)
    at java.lang.Thread.run(Thread.java:722)
"Thread-0":
    at com.java.ro.concurency.chapter10.AccountTransfer$TransferTask.transfer(AccountTransfer.java:40)
    - waiting to lock <0x00000000d96a4950> (a com.java.ro.concurency.chapter10.AccountTransfer$Account)
    - locked <0x00000000d96a4938> (a com.java.ro.concurency.chapter10.AccountTransfer$Account)
    at com.java.ro.concurency.chapter10.AccountTransfer$TransferTask.run(AccountTransfer.java:55)
    at java.lang.Thread.run(Thread.java:722)

Found 1 deadlock.



Livelock

 

Livelock występuje jeśli pomimo że wątek nie czeka na blokadzie nie może wykonać postępu ponieważ jego operacja jest wycofywana. Typowym przykładem livelocku jest przetwarzanie wiadomości z kolejki - jeśli wykonuje to jeden wątek i nie może sobie poradzić z wiadomością - np przez błąd programistyczny to wiadomość zostaje cofnięta na początek kolejki i znów jest pobierana i zwracana, itd.
Innym przykładem są dwa wątki które są zbyt "miłe" i wycofują swoją operacje ponieważ widzą że inny ją wykonuje. Jeśli czas do powtórzenia jest taki sam dla obu wątków to operacje będą wykonywane bez końca. Pomocnym może być tutaj randomizowanie czasu oczekiwania przed ponowieniem operacji.

Starvation - zagłodzenie

 

Zagłodzenie występuje wtedy kiedy wątek jest permanentnie pozbawiony zasobów których potrzebuje aby wykonać postęp. Zwykle tym zasobem jest procesor.
Jest to dość rzadki przypadek i może wystąpić jedynie wtedy gdy nada się priorytety wątkom inne niż normalne - nie jest to zalecane. Ogólnie nie powinno się tego robić bo wtedy program może się inaczej zachowywać na innych systemach operacyjnych - np tylko na jednym może wystąpić zagłodzenie.




niedziela, 29 czerwca 2014

Wielowątkowość - używanie ThreadPoolExecutora

Powiązanie między zadaniami a ich polityka wykonywania

 

Można powiedzieć że ExecutorService rozdziela tworzenie zadań od ich wykonywania - jednak jest to prawdą tylko wtedy gdy zadania są niezależne.
Nie wszystkie zadania można łatwo wywoływać używając ExecutorService np:
  • zadania zależne - jeśli jedno zadanie zależy od innego to w przypadku Executora z ograniczoną liczbą wątków może dojść do zakleszczenia: thread starvation deadlock, najprostszym przykładem jest wykonywanie zadania na Executorze ograniczonym do jednego wątku które wywołuje inne zadanie na tej samej "puli" i czekanie na wynik. Taki wątek będzie czekał w nieskończoność bo wątek na którego czeka nigdy się nie zacznie - należy pamiętać że najlepiej w tym wypadku stosować executor z nieograniczoną liczbą wątków
  • zadania które wykorzystują "przywiązanie do wątku" (thread confinement) - takie zadania powinny być wykonywane na executorze z jednym wątkiem - zmiana liczby wątków może spowodować duże problemy
  • zadania które wykorzystują zmienne ThreadLocal - należy pamiętać że dla każdego zadania nie jest tworzony nowy wątek - jest on wykorzystywany ponownie, dodawane są też nowe i usuwane nieużywane. To powoduje że jest sens korzystania z tych zmiennych tylko jeśli zadanie zawsze wyczyści daną zmienną przed zakończeniem działania
  • zadania które powinny się skończyć szybko - niektóre zadania powinny się skończyć szybciej od innych np zadanie związane z GUI - jeśli Executor jest zapełniony długimi zadaniami tą zadanie GUI może na tym ucierpieć
Najlepszymi zadaniami do wykonywania w jednym Executorze są zadania o podobnej wielkości i niezależne.

Zrównolegalnie algorytmów rekursywnych

 

Jeśli wykonujemy pętlę i jej iteracje nie zależą od siebie a praca w nich wykonywana jest na tyle duża że jej wykonanie równoważy pracę związaną z tworzeniem nowego zadania można ją wykonać równolegle:

public void processSequencially(List<String> elements) {
        for (String element : elements) {
            processElement(element);
        }
    }
public void processInParallel(List<String> elements) {
        for (final String element : elements) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    processElement(element);
                }
            });
           
        }
    }


Tak samo jak zrównoleglenie pętli można zrównoleglić algorytm rekurencyjny - na takich samych warunkach niezależności:

    public void recursiveSequencial(List<Node> nodes, List<String> results) {
        for (Node node : nodes) {
            processNode(node, results);
            recursiveSequencial(node.getChildren(), results);
        }
    }
   
    public void recursiveParallel(List<Node> nodes, final List<String> results) {
        for (final Node node : nodes) {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    processNode(node, results);
                }
            });
            processNode(node, results);
           
            recursiveParallel(node.getChildren(), results);
        }
    }


Należy pamiętać że kiedy submitujemy zadania do executora należy poczekać na skończenie zadań przez CompletionService lub wykonując zamykanie normalne - sutdown i awaitTermination wtedy wszystkie zadania zostaną zakończone.

środa, 25 czerwca 2014

Wielowątkowość - anulowanie zadań, kończenie serwisów

Anulowanie zadań

 

Najprościej anulowanie zadań można dokonać poprzez zmienną lokalną volatile np. finish. Gdzie zmienna będzie co pewien okres czasu sprawdzana i zadanie będzie można zakończyć.

public class CancellingUsingLocalVariable {
    public static void main(String[] args) throws InterruptedException {
        CancelledTask task = new CancelledTask();
        Thread thread = new Thread(task);
        thread.start();
        TimeUnit.SECONDS.sleep(3);
        task.cancel();
    }
    private static class CancelledTask implements Runnable {  
        private volatile boolean cancelled = false;
        public void run() {
            while (!cancelled) {
                try {
                    doWork();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        private void doWork() throws InterruptedException {
            TimeUnit.SECONDS.sleep(1);
        }
        public void cancel() {
            this.cancelled = true;
        }
    }
}


Jednak jeśli wątek czeka na metodzie blokującej to musi się ona zakończyć zanim sprawdzony będzie ponownie status, to powoduje że nie jest to idealna metoda na anulowanie zadań.

Mechanizm wątków nie ma bezpośredniego mechanizmu aby zatrzymać inny wątek. Można jednak wykorzystać mechanizm przerwania - każdy wątek posiada zmienną interrupted którą można ustawić za pomocą metody interrupt(). Można sprawdzić za pomocą metody isInterrupted(). Statyczna i źle nazwana metoda statyczna interrupted() resetuje flagę i zwraca jej poprzednią wartość.

Większość metod blokujących co jakiś czas sprawdza flagę interrupted, czyści ją i wyrzuca wyjątek InterruptedException.

Mechanizm przerwania (interruption) jest najlepszą metodą anulowania zadań.
Samo ustawianie flagi nie jest jednoznaczne z przerwaniem wątku - to wątek ustala swoją politykę obsługi przerwań.

Należy pamiętać że działając bezpośrednio na zadaniu (Runnable, Callable) nie powinno się połykań błędy Interrupted exception a powinno się ustawić ponownie flagę interrrupted w aktualnym wątku:
Thread.currentThread().interrupt(). Należy pamiętać że jeśli używamy statycznej metody interrupted() i zwraca ona true jeśli nie rzucamy wyjątku powinniśmy chociać przywrócić daną flagę bo może jest ona obsługiwana przez wątek wykonujący zadanie. Należy pamiętać że to wątek ustala swoją politykę obsługi flagi interrupted!

public class CancellingUsingInterruption {
    public static void main(String[] args) throws InterruptedException {
        CancelledTask task = new CancelledTask();
        Thread thread = new Thread(task);
        thread.start();
        TimeUnit.SECONDS.sleep(3);
        thread.interrupt();
    }
    private static class CancelledTask implements Runnable {  
        public void run() {
            while (Thread.getCurrentThread().isInterrupted()) {
                try {
                    doWork();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }       
        private void doWork() throws InterruptedException {
            TimeUnit.SECONDS.sleep(1);
        }
    }
}



Najlepszą jednak metodą anulowania zadań jest korzystanie z obiektów zwracanych przez ExecutorService: Future. Reprezentują one rezultat zadania wykonywanego asynchronicznie.

public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<?> future = executor.submit(new CancelledTask());
        TimeUnit.SECONDS.sleep(3);
        future.cancel(true);
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.DAYS);
    }







Jednak inną ważną zaletą interfejsu future jest to że przy pobieraniu rezultatów za pomocą metody get wszystkie błędy które zakończyły wykonywanie zadania są znów wyrzucane opakowane w ExecutionException.
Przykład wykonywania zadania przez 5 sekund po czym anulowanie zadania i pokazanie błędu jeśli wcześniej zakończył wykonywanie zadania:

public class CancellingAfterSomeTime {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<String> future = executor.submit(new CancelledTask()); 
        try {
            String result = future.get(5, TimeUnit.SECONDS);
            System.out.println("Task returned: " + result);
        } catch (ExecutionException e) {
            System.out.println("Task throwed exception " + e.getCause().getMessage());
        } catch (TimeoutException e) {
            System.out.println("Timeout - cancelling task");
            future.cancel(true);
        }
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.DAYS);
    }
    private static class CancelledTask implements Callable<String> {
        private Random r = new Random();
        public String call() throws Exception {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    if (r.nextDouble() > 0.85) {
                        throw new RuntimeException("Exception from task");
                    }
                    if (r.nextDouble() > 0.75) {
                        // found result
                        break;
                    }
                    doWork();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return "result";
        }     
        private void doWork() throws InterruptedException {
            TimeUnit.SECONDS.sleep(1);
        }
    }

.

Metody nieodpawiadające na przerwanie


Nie wszystkie metody blokujące sprawdzają falgę interrupted i wyrzucają wyjątek InterrruptedException np:
  • czekanie na blokadzie na metodzie lock obiektu Lock lub na metodzie/bloku synchronized - w przypadku obiektu Lock można zamienić wywołanie metody na lockIterruptibly
  • synchroniczne wywołania I/O z java.io - metody read i write w OutputStream i InputStream nie odpowiadają na przerwanie. Aby je zakończyć należy zamknąć powiązany obiekt Socket i metoda zostanie skończona - w takim wypadu należy przeciążyć metodę interrupt i zamknąć socket co spowoduje zakończenie metody read i write

Zatrzymywanie serwisów opartych na wątkach

 

Jeśli w aplikacji występuje serwis który sam zarządza swoimi wątkami i działa on nieustannie to przy zamykaniu aplikacji należy także zakończyć jego wątki ponieważ inaczej JVM nie zamknie się samoczynnie.
W tym wypadku należało by napisać mechanizm który będzie to robił - najlepiej wzorować się na takich właśnie serwisach - ExecutionService. Metody shutdown, awaitTermination, shutdownNow.