piątek, 18 lipca 2014

Wielowątkowość - zmienne atomowe i nieblokujące algorytmy

Wady blokad


Synchronizacja jest czasami zbyt ciężkim rozwiązaniem np do implementowania licznika.
Dodatkowo mechanizm ten pociąga za sobą niebezpieczeństwo związane z żywotnością a także ogranicza skalowalność i wydajność - np zablokowany proces nie może nic robić w trakcie jak oczekuje na blokadę.
Część struktur danych da się zaimplementować nie korzystając z synchronizacji. Część z nich można zaimplementować za pomocą słowa kluczowego volatile - ale może ono być stosowane jedyne w prostych pojedyńczych operacjach w których poprzedni stan nie wpływa na przyszły, czyli np nie można jej zastosować do liczników które są dość prostym rodzajem stanu.
Rozwiązaniem tego problemu są zmienne atomowe i nieblokujące algorytmy które można dzięki nim zaimplementować.

Sprzętowe wspomaganie wielowątkowości


Blokowanie często można nazwać pesymistycznym a czasami wystarczy porównać czy zmienna którą chcemy ustawić została zmieniona od czasu kiedy ją odczytaliśmy i zmieniliśmy - nazywa się to blokowaniem optymistycznym (używanym także np hibernacie do updatowania danych w bazie danych). Dzisiaj prawie wszystkie procesory posiadają jakąś implementację atomowej operacji odczytu-zapisu. Najpopularniejszą z nich jest compare and swap - CAS.
CAS ma trzy argumenty - V - miejsce w pamięci do zmiany, A - oczekiwana aktualna wartość, B - nowej wartość. CAS atomowo zmieni V na B tylko jeśli aktualna wartość to A w przeciwnym razie nic zmieni w miejscu V.
W takim wypadku jeśli wiele wątków zmienia V to część z nich przegra - ale nie są one blokowane i mogą ponowić swoją operacje, dodatkowo mamy pewność że jeden na pewno wygra a to powoduje że zawsze nastąpi postęp. Właśnie z takich instrukcji korzysta JVM do implementacji zmiennych atomowych.

Zmienne atomowe


Istnieje 12 klas zmiennych atomowych:
  • skalary: AtomicLong, AtomicInteger, AtomicBoolean, AtomicReference
  • tablice: AtomicLongArray, AtomicIntegerArray, AtomicReferenceArray
  • aktualizatory pól: AtomicLongFieldUpdater, AtomicIntegerFieldUpdater...
  • złożone zmienne: AtomicStampedReference, AtomicMarkedReference
Zmienne atomowe można traktować jako takie lepsze zmienne volatile. Ich dodatkowym atrybutem jest możliwość zmiany w stosunku do poprzedniego stanu. Tą funkcjonalność na zmiennych volatile można uzyskać korzystając z aktualizatorów pól np klasy AtomicLongFieldUpdater i ich metod np compareAndSet - należy pamiętać że jest to rozwiązanie trochę słabsze bo działa jedynie kiedy wszystkie zmiany na zmiennej dokonywane są aktualizatorem - jeśli nie to rozwiązanie nie będzie działać.

W kwestii wydajności zmienne atomowe w porównaniu z rozwiązaniami bazującymi na blokadach wypadają dużo lepiej przy średnim obciążeniu zmian na polu. Jeśli zmian na polu jest bardzo dużo (co jest nierealistyczne) to zmienne atomowe mogą działać gorzej - jest to oczywiste ponieważ w takim wypadku większość wątków nie może wykonać zmiany na polu i powtarza operację wiele razy co powoduje wiele przełączeń kontekstu.

Algorytmy nieblokujące


Dzięki zmiennym atomowym i operacjami CAS można stworzyć algorytmy które, jeśli dobrze zaprojektowane, będą działać szybko w obu przypadkach - dużym i małym obciążeniu, oczywiście przy nierealistycznie dużym obciążeniu algorytm może działać słabo, należy jednak pamiętać że w takich algorytmach zawsze przynajmniej jeden wątek wykonuje postęp a inne nie są zablokowane więc zawsze ktoś wykona postęp.

Przykład stosu nieblokującego:

public class NonBlockingStack<T> {
private AtomicReference<Node<T>> top = new AtomicReference<>();
public void push(T data) {
Node<T> expextedTop;
Node<T> newTop = new Node<T>(data);
do {
expextedTop = top.get();
newTop.setNext(expextedTop);
} while (!top.compareAndSet(expextedTop, newTop));
}
public T pop() {
Node<T> expextedTop;
do {
expextedTop = top.get();
if (expextedTop == null) {
return null;
}
} while (top.compareAndSet(expextedTop, expextedTop.getNext()));
return expextedTop.getData();
}
private static class Node<T> {
private T data;
private Node<T> next;
public Node(T data) {
this.data = data;
}
public T getData() {
return data;
}
public Node<T> getNext() {
return next;
}
public void setNext(Node<T> next) {
this.next = next;
}
}
}


czwartek, 17 lipca 2014

Wielowątkowość - klasy zależne od stanu

Metody zależne od stanu klas


Wiele klas posiada logikę która pozwala wykonać daną operacje na podstawie stanu. Np nie można wyjąć danych z pustego stosu. Programy jednowątkowe powinny na takich operacjach wyrzucić błąd ale te wielowątkowe mogą poczekać aż operacja może być wykonana.
Struktura takiej operacji może wyglądać tak

1. Uzyskać blokadę na stanie
2. Dopóki (warunek nie jest spełniony)
2.1. Zwolnij blokadę
2.2. Poczekaj aż warunek może być spełniony
2.3. Opcjonalnie wyrzuć wyjątek jeśli jeśli wystąpi przerwanie albo upłynął limit czasu
2.4. Uzyskaj blokadę i wróć do punktu 2
3. Wykonaj operację
4. Zwolniej blokadę

Do takich operacji służą kolejki warunkowe (condition queues) -  metody wait, notify, notifyAll jeśli korzystamy z zwykłych blokad instrinct lock - za pomocą słowa kluczowego synchronized.
Przykład stosu z ograniczoną wielkością (dość toporny ale działający):

public class BoundedStack<N> {
    private Node<N> top;
    private int maxElements;
    private int count = 0;

    public BoundedStack(int maxElements) {
        this.maxElements = maxElements;
    }
    private static class Node<N> {
        private final N element;
        private final Node<N> next;
        public Node(N element, Node<N> next) {
            super();
            this.element = element;
            this.next = next;
        }
    }
    public synchronized void push(N element) throws InterruptedException {
        while (isFull()) {
            wait();
        }

        top = new Node<>(element, top);
        count++;
        notifyAll();
    }
    public synchronized N pop() throws InterruptedException {
        while (isEmpty()) {
            wait();
        }

        N result = top.element;
        top = top.next;
        count--;
        notifyAll();
        return result;
    }
    private synchronized boolean isEmpty() {
        return top == null;
    }
    private synchronized boolean isFull() {
        return count == maxElements;
    }
}


Kolejki warunkowe dla bloakad synchronized mogą być powiązane z wieloma warunami - jak w przykładowym ograniczonym stosie - mamy dwa warunki - isFull, isEmpty.
Metoda wait() zwalnia blokadę i usypia wątek po jego przebudzeniu wątek musi znowu ją uzyskać, nie jest w żaden sposób uprzywilejowany w stosunku do innych wątków próbujących uzyskać daną blokadę. Po uzyskaniu blokady należy po raz kolejny sprawdzić warunek ponieważ nie musi on być spełniony - dlatego należy metodę wait wywołać w pętli z niespełnionym warunkiem.
Należy nie zapomnieć o notyfikacji innych wątków po tym jak stan się zmieni - jeśli tego nie zrobimy wątki nigdy nie wybudzą się z metod wait. Do notyfikacji służą metody notify i notifyAll.
Można korzystać z metody notify jeśli istnieje tylko jeden warunek i każdy wątek wykonuje taką samą logikę po wyjściu z metody wait. Dodatkowo tylko jeden wątek ma prawo wykonywać pracę po wybudzeniu z notyfikacji. Jęśli nie jesteśmy tego pewni powinniśmy korzystać z metody notifyAll ponieważ jest ona bezpieczniejsza i gwarantuje że choć jeden wątek wykona progres. Jednak należy pamiętać że każdy z wybudzonych wątków będzie próbował uzyskać blokadę aby na nowo sprawdzić warunek - może to powodować dużo przełączeń kontekstu ale jeśli nie program działa dość szybko to nie należy się tym przejmować.

Metoda wait występuje w 3 odmianach -bezparametrowo i z limitem czasu jest także wrażliwa na przerwania.

Obiekty warunków - praca z obiektami Lock


Obiekty Lock umożliwiają tworzenie wielu warunków na jednej blokadzie - dzięki temu można je nazwać i nie wybudzać wszystkich wątków po wykonaniu operacji.
Na przykład nasza zmieniona klasa stosu może mieć postać:

public class BoundedStackWithLock<N> {
    private Node<N> top;
    private int maxElements;
    private int count = 0;
    private Lock stackLock = new ReentrantLock();
    private Condition isEmptyCondition = stackLock.newCondition();
    private Condition isFullCondition = stackLock.newCondition();
    public BoundedStackWithLock(int maxElements) {
        this.maxElements = maxElements;
    }
    private static class Node<N> {
        private final N element;
        private final Node<N> next;
        public Node(N element, Node<N> next) {
            super();
            this.element = element;
            this.next = next;
        }
    }
    public void push(N element) throws InterruptedException {
        stackLock.lock();
        while (isFull()) {
            isFullCondition.await();
        }
        top = new Node<>(element, top);
        count++;
        isEmptyCondition.signal();
        stackLock.unlock();
    }
    public N pop() throws InterruptedException {
        stackLock.lock();
        while (isEmpty()) {
            isEmptyCondition.await();
        }
        N result = top.element;
        top = top.next;
        count--;
        isFullCondition.signal();
        stackLock.unlock();
        return result;
    }
    private boolean isEmpty() {
        return top == null;
    }
    private boolean isFull() {
        return count == maxElements;
    }
}


Należy pamiętać aby przez pomyłkę nie wywołać metod wait, notify, notifyAll na obiektach condition. Ich odpowiedniki to await, signal, signalAll. Jest także dodatkowa metoda awaitUninterruptibly która jak sama nazwa wskazuje nie jest wrażliwa na przerwania.



poniedziałek, 14 lipca 2014

Wielowątkowość - blokady interfejsu Lock

Interfejs Lock


Interfejs lock specyfikuje kilka interesujących metod odróżniających go od blokad nienazwanych (intrinsic locks) które działają na podstawie słowa kluczowego synchornized:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}


Klasa ReentrantLock implementuje ten interfejs oferując wilowejściową blokadę podobną do blokady synchronized. Więc po co ten nowy interfejs? Ponieważ blokady synchronized mają swoje ograniczenia a zastąpieniem ich obiektem Lock pomaga je wyeliminować. Te ograniczenia to:
  • blokada synchronized jest ograniczona tylko do jednego bloku, jej bloku - jest to także jej zaleta bo blokady synchronized się nie zwalnia a blokadę stworząną przy użyciu obiektu lock już trzeba zwolnić - zwykle w bloku finally
  • wątku oczekującego na blokadzie nie można przerwać a interfejs lock ma metodę lockinterruptibly lub tryLock dzięki którym można stworzyć zadanie które mimo oczekiwania na blokadę będzie można anulować poprzez przerwanie (interrupt)
  • blokada Lock ma możliwość pytania o blokadę i nie blokowania jeśli jej nie może uzyskać dzięki metodzie tryLock (bez parametrów) dzięki czemu można uzyskać blokadę bez blokowania wątku
  • blokada Lock ma możliwość uzyskania blokady z limitem czasu dzięki czemu można pisać zadania z budżetem czasu na ich wykonanie - metoda tryLock z parametrami
  • blokada instrinct lock umożliwia wywoływanie metod wait, notify i notifyAll ale nie są one powiązane z warunkami których może być wiele, dzięki blokadzie Lock można stworzyć wiele warunków dla jeden blokady i na warunkach wywoływać metody await, signal, signalAll (interfejs Condition i metoda newCondition)
  • klasa ReentrantLock umożliwia stworzenie blokady która będzie sprawiedliwa (fair)
  • w javie 5 obiekty Lock działały dużo szybciej niż blokady synchronized jednak poprawiono to i w javie 6 nie ma już różnic w wydajności
Typowe użycie metody lock:

Lock lock = new ReentrantLock();
lock.lock();
try {
    // access the resource protected by this lock
} finally {
    lock.unlock();
}


Typowe użycie metody tryLock:

Lock lock = new ReentrantLock();
if (lock.tryLock()) {
    try {
        // manipulate protected state
    } finally {
        lock.unlock();
    }
} else {
    // perform alternative actions
}


Sprawiedliwość 

 

Tak jak wcześniej wspomniano możliwe jest stworzenie blokady która jest sprawiedliwa, domyślnie blokada lock jak i synchronized (instrinct lock) nie są sprawiedliwe. To znaczy że nie ma znaczenia który wątek w jakiej kolejności poprosił o dostęp do blokady. Statystycznie rzecz biorąc każdy w końcu otrzyma blokadę i nie ma tu zagrożenia żywotności a wybudzanie specjalnie wątku który jest pierwszy w kolejce może być - w praktyce jest - bardzo kosztowne. Także blokadę otrzymuje ten wątek który może ją otrzymać i użyć najszybciej - więc nie jest np uśpiony.
Sprawiedliwe blokady mogą się tylko przydać jeśli są trzymane bardzo długo.

Który sposób blokowania wybrać

 

Powinno się używać interfejsu lock tylko jeśli go potrzebujemy - czyli potrzebujemy jego zaawansowanych możliwości - w innym przypadku nie ma to sensu. Dużo prościej jest używać słowa kluczowego synchronized, nie trzeba także zwalniać blokad synchronized co sprawia że programowanie jest bezpieczniejsze. Blokady synchronized wcale nie są przestarzałym narzędziem cały czas są i będą używane.

Blokady do odczytu i zapisu: ReadWriteLock

 

Kolejnym sposobem na zwiększenie wydajności w dostępie do danych współdzielonych jest użycie blokad read write. Blokady te są użyteczne jeśli dominującą operacją na danych jest odczyt. W takim wypadku dostęp do czytania ma wiele wątków, a jeden wątek może pisać. Dane nie są w żaden sposób zagrożone i wątki zawsze widzą prawdziwą i ostatnią zapisaną wartość.
Implementajcją interfejsu ReadWriteLock jest klasa ReentrantReadWriteLock, ma ona możliwość określenia sprawiedliwości, jeśli blokada jest trzymana przez czytających i pojawia się wątek który chce pisać to więcej czytających nie dostaje dostępu zanim wątek zapisujący nie dostanie i zwolni blokady, możliwe jest zmiana blokady z zapisywania do czytania ale nie na odwrót.  

czwartek, 10 lipca 2014

Wielowątkowość - wydajność i skalowalność

Skalowalność  i wydajność



Skalowalność określa możliwość aby zwiększyć przepustowość lub wielkość wykonywanej pracy kiedy dostępne są dodatkowe zasoby takie jak procesor, pamięć, przepustowość I/O.

Należy pamiętać aby najpierw robić aplikacje która działa prawidłowo a następnie wykonywać optymalizacje i tylko jeśli jest ona potrzebna czyli aplikacja nie spełnia oczekiwań.

Prawo Amdahl'a

 

Cześć pracy można wykonać szybciej kiedy dostępnych jest więcej zasobów np zbieranie truskawek, natomiast część nie dlatego że jakaś cześć zadania może być wykonana tylko przez jeden wątek np rodzenie dzieci (9 kobiet nie urodzi 1 dziecka w miesiąc).
Większość wielowątkowych programów ma właśnie takie części które może wykonać wiele wątków i takie które mogą być wykonane tylko przez jeden wątek. Prawo Amdahla określa maksymalne przyspieszenie przy znanej części serializowalnej (jednowątkowej) i liczbie procesorów w stosunku do początkowej liczby procesorów.

Speedup <= 1/ (F + (1-F)/N)

F - ułamek pracy wykonywanej jednowątkowo
N - liczba procesorów

Przykładowo jeśli program nie ma części jednowątkowych (co jest niemożliwe) to przyspieszenie wyniesie 1/(0 + 1/N) = N czyli przy 5 procesorach będzie działać 5 razy szybciej.
Jeśli połowa programu jest wykonana jednowątkowo (F=0.5) to przy 5 procesorach przyspieszenie nie będzie większe niż = 1/ (0.5 + 0,5/5) = 1.67 czyli nie zwięszy się nawet dwukrotnie, tak naprawdę przyspieszenie będzie dążyć tutaj do 2.
Jeśli zwiększymy liczbę procesorów do nieskończoności to wartość przyspieszenia nie będzie większa niż 1/F.

To sprawia jak wielkie znaczenie ma cześć programu która może być wykonana przez jeden wątek dla skalowalności.
Nawet jeśli w naszej aplikacji mamy perfekcyjnie niezależne zadania to muszą one być pobierane np z kolejki i wyniki też najpewniej muszą być zapisywane więc nawet w takim idealnym przypadku część pracy jest wykonywana tylko przez jeden wątek.

Koszty związane z pracą z wieloma wątkami

 

  • Context switching - przeładowanie kontekstu
Jeśli liczba wątków jest większa niż liczba procesorów to muszą one działać naprzemiennie. Podczas zawieszania wątku i zaczynania innego następuje przeładowanie kontekstu. Nie jest to darmowa operacja ponieważ wymaga manipulowania strukturami danych w systemie operacyjnym i JVM. Po starcie nowego wątku jego dane nie będą w cachu procesora więc bedzię on działał na początku wolniej. To powoduje że każdy wątek bez względu ile czeka innych wątków dostaje jakiś minimalny czas pracy. Jeśli wątek zostaje zablokowany poprzez blokadę lub IO to może on być przełączony szybciej niż jego minimalny czas. Programy z dużą liczbą blokad i operacji IO mają dużo operacji przełączania kontekstu.
Przełączanie kontekstu to około 5000 do 10000 cykli procesora.

  • Synchronizacja danych
Synchronizowane dane poprzez blokady lub słowo kluczowe violatile powoduje to że nie mogą być wykonywane pewne optymalizacje ponieważ należy zagwarantować widzialność danych po ich zapisaniu. To pociąga też za sobą używanie specjalnych instrukcji nazywanych memory barriers.
Mogą one inwalidować cache co powoduje dodatkowe zwiększenie kosztów.
W przypadku synchronizacji należy rozróżnić blokady obciążone (contended) od nieobciążonych (uncontended). Np synchronizacja violatile jest nieobciążona i nie jest bardzo kosztowna (20-250 cykli procesora). Kompilator może także optymalizować wielokrotne blokady w jedną a także określić że blokada jest niepotrzebna bo dana jest niedostępna dla innych wątków np. vector który używany jest w metodzie (czyli dostępny jest na stosie nie stercie).

  • Bloki synchronizowane
Blokowanie które jest obciążone (contended) powoduje że wątek jest albo wstrzymywany i czeka na to aż inny wątek zwolni blokadę i go wybudzi, albo powtarza próbę dostępu do blokady (spin-waiting).
Pierwsze rozwiązanie jest dużo bardziej efektywne ale jeśli blokady są trzymane bardzo któtko to JVM może się przełączyć na pewnych z nich na spin-waiting. 
Wstrzymywanie wątku pociąga za sobą 2 przełączenia wątku.


Redukowanie obciążenia blokad


Jak widać wyżej - blokady powodują że cześć programu jest wykonywana tylko przez jeden wątek.
Obciążenie blokad może znacząco ogranicza skalowalność systemu.
Aby więc zredukować obciążenie należy:
  • zredukować czas trzymania blokady
  • zredukować częstotliwość używania blokad
  • zastąpić blokady innymi mechanizmami które zwiększą współbieżność
Należy pamiętać aby blokada była trzymana tak krótko jak tylko potrzeba - wszystkie niepotrzebne operacje powinny być wyjęte z bloku synchronizowanego.

Innym przykładem zredukowania obciążenia blokady jest jej podzielenie (lock striping) co jest np wykorzystywane w klasie ConcurentHashMap która ma 16 blokad każda obsługuje N%16 kubełek z danymi - to powoduje znaczną poprawę współbieżności.
Należy pamiętać aby ograniczyć używanie takzwanych gorących pól - np liczników itp które muszą być zmieniane przy każdej operacji - lub używać do nich zmiennych atomowych (AtomicInteger, AtomicReference itp).
Alternatywą mogą być jak już wspomniano zmienne atomowe lub blokady ReadWrite które mogę zwiększyć współbieżność jeśli większość operacji jest tylko do odczytu.


wtorek, 8 lipca 2014

Wielowątkowość - żywotność

Problemy żywotności aplikacji wielowątkowych


  • Deadlock
  • Livelock
  • Starvation

Deadlock - zakleszczenie

 

Najczęstrzą i najpoważniejszym problemem żywotności programów są zakleszczenia. Klasyczny problem zakleszczenia to wątek A posiadający blokadę M i czekający na N i wątek B posiadający blokadę N i czekający na M. Takie wątki nigdy nie wykonają postępu. Problem ten da się rozszerzyć na wiele wątków.
Oczekiwanie na blokadę można zilustrować jako graf oczekiwania na odblokowanie, jeśli graf ten posiada cykl to wystąpiło zakleszczenie.

Klasycznym przykładem tego problemu jest problem transferowania pieniędzy między kontami:

public class AccountTransfer {
    private static class Account {
        private String number;
        private Integer balance;
        public Account(String number, Integer balance) {
            this.number = number;
            this.balance = balance;
        }
        public void setNewBalance(Integer amount) {
            balance += amount;
        }
        public String getNumber() {
            return number;
        }
        public Integer getBalance() {
            return balance;
        }
    }
    private static class TransferTask implements Runnable {
        private Account from;
        private Account to;  
        public TransferTask(Account from, Account to) {
            this.from = from;
            this.to = to;
        }
        public void transfer(Account from, Account to, Integer amount) {
            synchronized (from) {
                synchronized (to) {
                    if (from.getBalance() > amount) {
                        from.setNewBalance(-1 * amount);
                        to.setNewBalance(amount);
                    }
                }
            }
        }
        @Override
        public void run() {
            Random r = new Random();
            while (true) {
                int nextInt = r.nextInt(100);
                System.out.println("Transfering " + nextInt);
                transfer(from, to, nextInt);
            }
        }
    }
    public static void main(String[] args) {
        Account account1 = new Account("1", 10000);
        Account account2 = new Account("2", 20000);
        new Thread(new TransferTask(account1, account2)).start();
        new Thread(new TransferTask(account2, account1)).start();
    }
}


Problemem przy korzystaniu z wielu blokad jest kolejność ich zakładania - ta musi być określona i jednakowa - do określienia kolejności można użyć kodu hash code, albo innego identyfikatora obiektu który jest unikalny. Czyli najpierw wykonujemy porówanie hash code obiektów i ten który ma mniejszy hascode ma zakładaną blokadę szybciej, w przypadku kolizji można użyć dodatkowej blokady.

Metoda transfer bez deadlocku (przy unikalności numerów):

public void transfer(Account from, Account to, Integer amount) {
            int compareTo = from.getNumber().compareTo(to.getNumber());
            if (compareTo > 0) {
                synchronized (from) {
                    synchronized (to) {
                        trasferInternal(from, to, amount);
                    }
                }
            } else {
                synchronized (to) {
                    synchronized (from) {
                        trasferInternal(from, to, amount);
                    }
                }
            }
        }
        private void trasferInternal(Account from, Account to, Integer amount) {
            if (from.getBalance() > amount) {
                from.setNewBalance(-1 * amount);
                to.setNewBalance(amount);
            }
        }



Największym problemem przy wykrywaniu/unikaniu zakleszczeń są obiekty które ze sobą kooperują, jeśli obiekt A w metodzie synchronizowanej (czyli trzymając blokadę) wywołuje metodę obiektu B to jest to tak zwana alien method - potencjalnie niebezpieczny kod - ponieważ metoda w obiekcie B jest enkapsulowana i nie wiadomo jakie blokady zakłada - czyli nie można zdeterminować ich kolejności.
W tym wypadku należy - o ile to możliwe ograniczyć blok synchroniczny do absolutnego minimum i wykonywać metodę z obiektu B poza nim - nazywa się to open call.

Zapobieganie zakleszczeniom

 

Najważniejszym przy zapobieganiu zakleszczeniom jest poprawne - jednakowe uszeregowanie blokad. Należy unikać wykonywaniu metod z obiektów obcych podczas trzymania blokady.
Można też użyć czasowych metod tryLock obiektu Lock - nie da się tego niestety zrobić z blokiem synchronized.
Pomocnym może być też analiza blokad za pomocą zrzutu wątków (thread dump).
Program jvisualvm wykrywa zakleszczenia i pokazuje komunikat. Można także użyć programu z konsoli: jstack [PID].

Przykładowy zrzut dla programu na wyżej:

Found one Java-level deadlock:
=============================
"Thread-1":
  waiting to lock monitor 0x00000000026356a0 (object 0x00000000d96a4938, a com.java.ro.concurency.chapter10.AccountTransfer$Account),
  which is held by "Thread-0"
"Thread-0":
  waiting to lock monitor 0x00000000026355f8 (object 0x00000000d96a4950, a com.java.ro.concurency.chapter10.AccountTransfer$Account),
  which is held by "Thread-1"

Java stack information for the threads listed above:
===================================================
"Thread-1":
    at com.java.ro.concurency.chapter10.AccountTransfer$TransferTask.transfer(AccountTransfer.java:40)
    - waiting to lock <0x00000000d96a4938> (a com.java.ro.concurency.chapter10.AccountTransfer$Account)
    - locked <0x00000000d96a4950> (a com.java.ro.concurency.chapter10.AccountTransfer$Account)
    at com.java.ro.concurency.chapter10.AccountTransfer$TransferTask.run(AccountTransfer.java:55)
    at java.lang.Thread.run(Thread.java:722)
"Thread-0":
    at com.java.ro.concurency.chapter10.AccountTransfer$TransferTask.transfer(AccountTransfer.java:40)
    - waiting to lock <0x00000000d96a4950> (a com.java.ro.concurency.chapter10.AccountTransfer$Account)
    - locked <0x00000000d96a4938> (a com.java.ro.concurency.chapter10.AccountTransfer$Account)
    at com.java.ro.concurency.chapter10.AccountTransfer$TransferTask.run(AccountTransfer.java:55)
    at java.lang.Thread.run(Thread.java:722)

Found 1 deadlock.



Livelock

 

Livelock występuje jeśli pomimo że wątek nie czeka na blokadzie nie może wykonać postępu ponieważ jego operacja jest wycofywana. Typowym przykładem livelocku jest przetwarzanie wiadomości z kolejki - jeśli wykonuje to jeden wątek i nie może sobie poradzić z wiadomością - np przez błąd programistyczny to wiadomość zostaje cofnięta na początek kolejki i znów jest pobierana i zwracana, itd.
Innym przykładem są dwa wątki które są zbyt "miłe" i wycofują swoją operacje ponieważ widzą że inny ją wykonuje. Jeśli czas do powtórzenia jest taki sam dla obu wątków to operacje będą wykonywane bez końca. Pomocnym może być tutaj randomizowanie czasu oczekiwania przed ponowieniem operacji.

Starvation - zagłodzenie

 

Zagłodzenie występuje wtedy kiedy wątek jest permanentnie pozbawiony zasobów których potrzebuje aby wykonać postęp. Zwykle tym zasobem jest procesor.
Jest to dość rzadki przypadek i może wystąpić jedynie wtedy gdy nada się priorytety wątkom inne niż normalne - nie jest to zalecane. Ogólnie nie powinno się tego robić bo wtedy program może się inaczej zachowywać na innych systemach operacyjnych - np tylko na jednym może wystąpić zagłodzenie.




niedziela, 29 czerwca 2014

Wielowątkowość - używanie ThreadPoolExecutora

Powiązanie między zadaniami a ich polityka wykonywania

 

Można powiedzieć że ExecutorService rozdziela tworzenie zadań od ich wykonywania - jednak jest to prawdą tylko wtedy gdy zadania są niezależne.
Nie wszystkie zadania można łatwo wywoływać używając ExecutorService np:
  • zadania zależne - jeśli jedno zadanie zależy od innego to w przypadku Executora z ograniczoną liczbą wątków może dojść do zakleszczenia: thread starvation deadlock, najprostszym przykładem jest wykonywanie zadania na Executorze ograniczonym do jednego wątku które wywołuje inne zadanie na tej samej "puli" i czekanie na wynik. Taki wątek będzie czekał w nieskończoność bo wątek na którego czeka nigdy się nie zacznie - należy pamiętać że najlepiej w tym wypadku stosować executor z nieograniczoną liczbą wątków
  • zadania które wykorzystują "przywiązanie do wątku" (thread confinement) - takie zadania powinny być wykonywane na executorze z jednym wątkiem - zmiana liczby wątków może spowodować duże problemy
  • zadania które wykorzystują zmienne ThreadLocal - należy pamiętać że dla każdego zadania nie jest tworzony nowy wątek - jest on wykorzystywany ponownie, dodawane są też nowe i usuwane nieużywane. To powoduje że jest sens korzystania z tych zmiennych tylko jeśli zadanie zawsze wyczyści daną zmienną przed zakończeniem działania
  • zadania które powinny się skończyć szybko - niektóre zadania powinny się skończyć szybciej od innych np zadanie związane z GUI - jeśli Executor jest zapełniony długimi zadaniami tą zadanie GUI może na tym ucierpieć
Najlepszymi zadaniami do wykonywania w jednym Executorze są zadania o podobnej wielkości i niezależne.

Zrównolegalnie algorytmów rekursywnych

 

Jeśli wykonujemy pętlę i jej iteracje nie zależą od siebie a praca w nich wykonywana jest na tyle duża że jej wykonanie równoważy pracę związaną z tworzeniem nowego zadania można ją wykonać równolegle:

public void processSequencially(List<String> elements) {
        for (String element : elements) {
            processElement(element);
        }
    }
public void processInParallel(List<String> elements) {
        for (final String element : elements) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    processElement(element);
                }
            });
           
        }
    }


Tak samo jak zrównoleglenie pętli można zrównoleglić algorytm rekurencyjny - na takich samych warunkach niezależności:

    public void recursiveSequencial(List<Node> nodes, List<String> results) {
        for (Node node : nodes) {
            processNode(node, results);
            recursiveSequencial(node.getChildren(), results);
        }
    }
   
    public void recursiveParallel(List<Node> nodes, final List<String> results) {
        for (final Node node : nodes) {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    processNode(node, results);
                }
            });
            processNode(node, results);
           
            recursiveParallel(node.getChildren(), results);
        }
    }


Należy pamiętać że kiedy submitujemy zadania do executora należy poczekać na skończenie zadań przez CompletionService lub wykonując zamykanie normalne - sutdown i awaitTermination wtedy wszystkie zadania zostaną zakończone.

środa, 25 czerwca 2014

Wielowątkowość - anulowanie zadań, kończenie serwisów

Anulowanie zadań

 

Najprościej anulowanie zadań można dokonać poprzez zmienną lokalną volatile np. finish. Gdzie zmienna będzie co pewien okres czasu sprawdzana i zadanie będzie można zakończyć.

public class CancellingUsingLocalVariable {
    public static void main(String[] args) throws InterruptedException {
        CancelledTask task = new CancelledTask();
        Thread thread = new Thread(task);
        thread.start();
        TimeUnit.SECONDS.sleep(3);
        task.cancel();
    }
    private static class CancelledTask implements Runnable {  
        private volatile boolean cancelled = false;
        public void run() {
            while (!cancelled) {
                try {
                    doWork();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        private void doWork() throws InterruptedException {
            TimeUnit.SECONDS.sleep(1);
        }
        public void cancel() {
            this.cancelled = true;
        }
    }
}


Jednak jeśli wątek czeka na metodzie blokującej to musi się ona zakończyć zanim sprawdzony będzie ponownie status, to powoduje że nie jest to idealna metoda na anulowanie zadań.

Mechanizm wątków nie ma bezpośredniego mechanizmu aby zatrzymać inny wątek. Można jednak wykorzystać mechanizm przerwania - każdy wątek posiada zmienną interrupted którą można ustawić za pomocą metody interrupt(). Można sprawdzić za pomocą metody isInterrupted(). Statyczna i źle nazwana metoda statyczna interrupted() resetuje flagę i zwraca jej poprzednią wartość.

Większość metod blokujących co jakiś czas sprawdza flagę interrupted, czyści ją i wyrzuca wyjątek InterruptedException.

Mechanizm przerwania (interruption) jest najlepszą metodą anulowania zadań.
Samo ustawianie flagi nie jest jednoznaczne z przerwaniem wątku - to wątek ustala swoją politykę obsługi przerwań.

Należy pamiętać że działając bezpośrednio na zadaniu (Runnable, Callable) nie powinno się połykań błędy Interrupted exception a powinno się ustawić ponownie flagę interrrupted w aktualnym wątku:
Thread.currentThread().interrupt(). Należy pamiętać że jeśli używamy statycznej metody interrupted() i zwraca ona true jeśli nie rzucamy wyjątku powinniśmy chociać przywrócić daną flagę bo może jest ona obsługiwana przez wątek wykonujący zadanie. Należy pamiętać że to wątek ustala swoją politykę obsługi flagi interrupted!

public class CancellingUsingInterruption {
    public static void main(String[] args) throws InterruptedException {
        CancelledTask task = new CancelledTask();
        Thread thread = new Thread(task);
        thread.start();
        TimeUnit.SECONDS.sleep(3);
        thread.interrupt();
    }
    private static class CancelledTask implements Runnable {  
        public void run() {
            while (Thread.getCurrentThread().isInterrupted()) {
                try {
                    doWork();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }       
        private void doWork() throws InterruptedException {
            TimeUnit.SECONDS.sleep(1);
        }
    }
}



Najlepszą jednak metodą anulowania zadań jest korzystanie z obiektów zwracanych przez ExecutorService: Future. Reprezentują one rezultat zadania wykonywanego asynchronicznie.

public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<?> future = executor.submit(new CancelledTask());
        TimeUnit.SECONDS.sleep(3);
        future.cancel(true);
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.DAYS);
    }







Jednak inną ważną zaletą interfejsu future jest to że przy pobieraniu rezultatów za pomocą metody get wszystkie błędy które zakończyły wykonywanie zadania są znów wyrzucane opakowane w ExecutionException.
Przykład wykonywania zadania przez 5 sekund po czym anulowanie zadania i pokazanie błędu jeśli wcześniej zakończył wykonywanie zadania:

public class CancellingAfterSomeTime {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<String> future = executor.submit(new CancelledTask()); 
        try {
            String result = future.get(5, TimeUnit.SECONDS);
            System.out.println("Task returned: " + result);
        } catch (ExecutionException e) {
            System.out.println("Task throwed exception " + e.getCause().getMessage());
        } catch (TimeoutException e) {
            System.out.println("Timeout - cancelling task");
            future.cancel(true);
        }
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.DAYS);
    }
    private static class CancelledTask implements Callable<String> {
        private Random r = new Random();
        public String call() throws Exception {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    if (r.nextDouble() > 0.85) {
                        throw new RuntimeException("Exception from task");
                    }
                    if (r.nextDouble() > 0.75) {
                        // found result
                        break;
                    }
                    doWork();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return "result";
        }     
        private void doWork() throws InterruptedException {
            TimeUnit.SECONDS.sleep(1);
        }
    }

.

Metody nieodpawiadające na przerwanie


Nie wszystkie metody blokujące sprawdzają falgę interrupted i wyrzucają wyjątek InterrruptedException np:
  • czekanie na blokadzie na metodzie lock obiektu Lock lub na metodzie/bloku synchronized - w przypadku obiektu Lock można zamienić wywołanie metody na lockIterruptibly
  • synchroniczne wywołania I/O z java.io - metody read i write w OutputStream i InputStream nie odpowiadają na przerwanie. Aby je zakończyć należy zamknąć powiązany obiekt Socket i metoda zostanie skończona - w takim wypadu należy przeciążyć metodę interrupt i zamknąć socket co spowoduje zakończenie metody read i write

Zatrzymywanie serwisów opartych na wątkach

 

Jeśli w aplikacji występuje serwis który sam zarządza swoimi wątkami i działa on nieustannie to przy zamykaniu aplikacji należy także zakończyć jego wątki ponieważ inaczej JVM nie zamknie się samoczynnie.
W tym wypadku należało by napisać mechanizm który będzie to robił - najlepiej wzorować się na takich właśnie serwisach - ExecutionService. Metody shutdown, awaitTermination, shutdownNow.

 

niedziela, 22 czerwca 2014

Wielowątkowość - Executor framework

Wykonywanie zadań bezpośrednio

 

Bezpośrednio tworzenie i wykonywanie zadań ma szereg wad:
  • tworzenie i kończenie pracy wątku nie jest darmową operacją i opóźnia wykonanie wątku, jeśli wątków jest wiele a zadanie do wykonania proste to uwidacznia się to jeszcze bardziej zwiększając drastycznie opóźnienie (latency)
  • każdy wątek ma swój stos - konsumuje pamięć którą jest przydzielona JVM
  • nieograniczenie tworzenie wątków może spowodować że systemowi zabraknie zasobów i przestanie działać
 

Executor serivce

 

Executor framework pozwala na oddzielenie tworzenia zadań i od ich uruchamiania. Do uruchamiania zadań służy np klasa ThreadPoolExecutor.

Aby stworzyć executora do wykonywania zadań można skorzystać z klasy Excutors i metody fabrykującej newFixedThreadPool dla executora ze stałą liczbą wątków lub newCachedThreadPool dla executora bez ograniczenia liczby wątków.

Executor nie tworzy dla każdego zadania nowego wątku - jeśli wątek zakończy pracę, bierze następne zadanie a do kolejkowania zadań służy kolekcja blokująca (BlockingQueue).

Używanie interfejsu ExecutorService pozwala nam w każdym momencie podmienić sposób wykonywania zadań, i ustalić wiele innych właściwości:
  • w którym wątku zadanie będzie wykonywane
  • w jakiej kolejności zadania będą wykonywane
  • ile zadań może działać naraz
  • jak wiele zadań może być kolejkowanych
  • specyfikacje czy task może być odrzucony z powodu przeciążenia i w jaki sposób ma to być zrobione
  • czy przed i po wykonaniu zadania wykonać inne zadanie
Wszystko to można dokonać przez użycie bezpośrednio konstruktorów np klasy ThreadPoolExecutor.

ExecutorService ma trzy stanu - running, shutting down, terminated. Metoda shutdown zaczyna bezpieczne zamykanie - wszystkie submitowane i działające taski są kończone a następnie executor jest zamykany. Możliwe jest też użycie metody shotdownNow które stara się anulować aktualnie działające zadania i nie startuje nowych.
Należy pamiętać że aby maszyna wirtualna się zamknęła nie mogą działać żadne jej wątki które nie są demonami więc należy zamknąć ExecutorService aby skończyć jego wątki. Najlepiej zrobić to wykonując metodę shutdown a następnie awaitTermination z odpowiednio dużym czasem.

Inne rodzaje ExecutorService:
  • ForkJoinPool - do wykonywania specjalnych zadań ForkJoinTask  które umożliwiają łatwą implementacje problemów które można podzielić na mniejsze
  • ScheduledThreadPoolExecutor - do tworzenia zadań które będą dostępne (wykonywane) w określonym czasie
 

CompletionService


Pakiet java.util.concurent zawiera także interfejs CompletionService i jego klasę implementującą ExecutorCompletionService. Dzieki tej klasie można w łatwy sposób pobrać wyniki wykonanych zadań jeśli są one potrzebne - nie trzeba pobierać Future dla każdego zadania i iterować po tym. Dzięki temu dostaniemy pierwszy wynik zaraz po jego zakończeniu bo wyniki zapisywane są na kolejce blokującej.
ExecutionCompletionService potrzebuje executora aby działać - więc można tworzyć wiele takich serwisów dla jednego executora, można także przekazać implementację kolejki blokującej której chcemy użyć dla skończonych zadań.

sobota, 21 czerwca 2014

Wielowątkowość - kolekcje synchronizowane i wielowątkowe, synchronizatory

Kolekcje synchronizowane


Java zawiera kolekcje synchronizowane - stare kolekcje Vector i Hashtable. Można także z każdej niesynchronizowanej kolekcji zrobić taką poprzez metodę fabrykującą Collections.synchronizedXXX np synchronizedMap. Fabryka taka tworzy obiekt który dekoruje każdą publiczną metodę opakowując ją blokiem synchronized używając instancji obiektu do blokowania.

Kolekcje synchronizowane nie mają takich metod jak putIfAbsent, ich iteratory są iteratorami fail-fast - czyli jeśli wykryją modyfikacje w trakcie iterowania wyrzucają wykątek ConcurrentModificationException. Blokowanie iteratora na czas iterowania może być bardzo kosztowne i sprawiać że program nie będzie skalowalny.


Kolekcje wielowątkowe (concurrent collections)

 

W javie 5 wprowadzono kolekcje wielowątkowe takie jak ConcurrentHashMap, CopyOnWriteArrayList, CopyOnWriteArraySet, ArrayBlockingQueue, LinkedBlockingQueue, LinkedBlockingDequeue

ConcurrentHashMap nie synchronizuje wszystkich metod zamiast tego wykorzystywany jest tu mechanizm podziału na wiele map każda ze swoją blokadą - lock striping dzięki czemu więcej wątków może symultanicznie wykonywać operacje na obiekcie mapy. Dodatkowo mapa implementuje interfejs ConcurentMap w którym są dodatkowe atomowe metody: putIfAbsent, remove, replace. Iterator takiej mapy nie wyrzuca wyjątku jeśli mapa jest modyfikowana jednak jest iteratorem weakly consistent - czyli nie musi pokazywać zmian po utworzeniu iteratora.

CopyOnWriteArrayList jest klasą thread-safe i zastępuje ArrayList - tworzy ona kopie podległej wewnętrznej tablicy za każdym razem kiedy wykonywana jest operacja dodawania, lub usuwania obiektu - dzięki temu iterator po stworzeniu może iterować na tablicy na której został zainicjowany więc nie wyrzuci wyjątku ConcurrentModificationException. Klasa taka ma zastosowanie tam gdzie dominującą operacją jest iterowanie a nie dodawanie np. listenery.

Kolejki blokujące mogą być użyte do rozwiązania problemu producent-konsumer. Zawierają metody blokujące które czekają jeśli kolejka jest pusta lub pełna.

Kolejki dequeue umożliwają pobieranie elementów także z początku kolejki dzięki czemu mogą być używane w alborytmach work-stealing w których każdy konsument ma jedną kolejkę i jeśli nie ma więcej pracy może wziąć pracę z początku innego konsumenta - dzięki temu wątki mniej będą ze sobą walczyły (less contention) i dzięki temu rozwiązanie jest bardziej skalowalne.


Metody blokujące które rzucają wyjątek IterruptedException


Każdy wątek może zostać zablokowany lub zapauzowany z różnych powodów - oczekiwaniu na blokadzie, czekanie na skończenie operacji I/O, czekanie na wybudzenie z operacji sleep. W takim wypadku proces przechodzi w stan BLOCKED, WAITING, TIMED_WAITING. Wątek który jest zablokowany musi być czekać na operacje na którą nie ma wpływu a która go wybudzi.

Nie da się zatrzymać wątku z innego wątku - można mu tylko ustawić flagę interrupt poprzez metodę interrupt. Mechanizm ten jest w większości wykorzystywane to anulowania długo trwających operacji.

Jak obsłużyć InterruptedException:
  • poprzez propagowanie go wyżej do metody wywołującej
  • poprzez obsługę - jeśli nie można go wyżej propagować - poprzez restorowanie flagi interrupted: Thread.currentThread().interrupt()
Można też pominąć obsługę jeśli pracujemy bezpośrednio na klasie Thread.

Synchronizatory

 

Semaphore - klasa przydatna np przy pulach obiektów. 

CountDownLatch - jednorazowy zatrzask który odlicza się on n do zera, wątki czekają na to aż wartość będzie równa 0. Może być użyty tylko raz.

CyclicBarrier - tak jak zatrzask blokują grupę wątków aż pewna aktywność się skończy - znana - stała liczba wątków dojdzie do bariery, następnie bariera jest resetowana a wątki mogą kontynuwać działanie. Jeśli wątek na barierze zostanie przerwany lub wystąpi timeout to reszta wątków zostanie wybudzona przez BrokenBarrierException a bariera zostanie w stanie broken. 

Phaser - jest bardziej wyrafinowaną barierą, umożliwia rejestracje i wyrejestrowywanie wątków oczekujących.

Exchanger pozwala na wymianę obiektu między wątkami w miejscu "spotkania".

 

 

piątek, 20 czerwca 2014

Wielowątkowość - współdzielenie obiektów

Widzialność

 

Jeśli nie zastosowana jest synchronizacja kompilator może zmienić kolejność wykonywania poleceń więc jeśli dwie zmienne są przypisywane jedna po drugiej - nie ma pewności że ich wartości będą widoczne w tym samym czasie dla innego wątku w tej samej kolejności. Możliwe jest widzenie starych wartości (stale data) a w przypadku zmiennych 64bitowych możliwe jest widzenie połowy wartości z dwóch wartości którą ta zmienna miała ponieważ ich zamiana jest robiona w dwóch krokach.

Jednym z rozwiązań jest stosowanie blokad - jeśli wątek uzyska blokadę następnie zmienni zmienną i zwolni blokadę, to inny wątek który uzyskał tą samą blokadę będzie widzieć wszystkie dane.

Innym przykładem są zmienne volatile. Jeśli jeden wątek A zapisuje do zmiennej volatile wartość a następnie inny wątek B ją czyta to zobaczy dobrą wartość zmiennej jak i innych wcześniej zmienianych zmiennych przez wątek A.

Przykład: Wątek Reader nie widzi zmiennej ready więc będzie działał w nieskończoność

public class NoVolatile {
    private static boolean ready = false;
       private static class Reader extends Thread {
        public void run() {
            int i = 0;
            while(!ready) {
                i++;
            }
        }
    }
     public static void main(String[] args) throws InterruptedException {
        new Reader().start();
        TimeUnit.SECONDS.sleep(1);
        ready = true;
    }
}


Po zastosowaniu słowa kluczowego volatile program się zakończy po sekundzie jak przewidywano:

public class VolatileExample {
    private static volatile boolean ready = false;
    private static class Reader extends Thread {
        public void run() {
            int i = 0;
            while(!ready) {
                i++;
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        new Reader().start();
        TimeUnit.SECONDS.sleep(1);
        ready = true;
    }
}


Powiązanie z wątkiem - thread confinement

 

Jeśli dane (klasa) nie jest dostępna dla wielu wątków wtedy nie trzeba stosować synchronizacji ani tworzyć klas thread-safe. Przykładem jest Swing gdzie jest jeden wątek odpowiedzialny za obsługę eventów. Mechanizm ten jest mechanizmem który może być jedynie zaznaczony w dokumentacji - nie da się wymusić aby klasa była używana tylko w jednym wątku.

Przykłady powiązań z wątkiem:
  • zmienna volatile którą czyta wiele wątków ale zmienia tylko jeden
  • zmienna lokalna w metodzie - jest powiązana ze stosem wywołania
  • zmienna ThreadLocal - dla każdego wątku inna instancja

Niezmienność - Immutability

 

Klasy immutable są zawsze thread-safe. Mogą mieć tylko jeden stan który jest ustawiany przy tworzeniu klasy.
Klasa jest niezmienna (immutable) jeśli:
  • jej stan nie może być zmieniony po konstrukcji
  • wszystkie pola są final
  • jest poprawnie stworzona - jej referencja nie "ucieknie" podczas konstrukcji
Klasa (instancja) immutable  może być bezpiecznie publikowana nawet bez użycia synchronizacji! Wystarczy że została poprawnie stworzona.

Publikacja i ucieczka 

 

Publikacja to udostępnianie zmiennych na zewnątrz.
Ucieczka this jest możliwa np:

public class Escape {
    public Escape(EventSource eventSource) throws InterruptedException {
        eventSource.registerListener(new EventListener() {
            @Override
            public void onEvent() {
                someMethod();
            }
        });
    }
}


Aby bezpiecznie opublikować obiekt obie referencja obiektu jak i jego stan muszą być widoczne dla innych wątków  w tym samym czasie. Aby tego dokonać należy:
  • zainicjować obiekt ze statycznego inicjatora - inicjowanie przy deklarowaniu statycznego pola
  • trzymać jego zmienną w polu volatile albo AtomicReference
  • trzymać jego zmienną w polu final w poprawnie skonstruowanym obiekcie
  • trzymać jego zmienną w polu który jest poprawnie chronione przez blokadę ( tyczy się także kolekcji synchronizowanych albo innych bezpiecznych kolekcji jak ConcurentHashMap, Vector, BlockingQueue itp
Część obiektów można nazwać efektywnie niezmiennymi - czyli nie są one immutable ale po stworzeniu nie są zmieniane aby ich używać należy tylko je bezpiecznie publikować.
Jeśli chodzi o obiekty zmienne - muttable - to należy je bezpiecznie publikować jak i muszą być thread-safe albo muszą być chronione przez blokadę.
Jeśli chodzi o obiekty immutable - to nie muszą być one publikowane bezpiecznie - mogą być jakkolwiek - np przez pole public.


czwartek, 19 czerwca 2014

Wielowątkowość - Thread Safety

 Thread safety


Jeśli wiele wątków dostaje się/może się dostać do zmiennej stanu bez jakiejkolwiek synchronizacji to program nie będzie działał dobrze, aby tak się stało musi być zrobiona jedna z poniższych rzeczy:
  • zmienna stanu nie powinna być dostępna
  • zmienna stanu powinna być niezmienna (immutable)
  • powinno się użyć synchronizacji przy dostępie do zmiennej

Jak zdefiniować klasę która jest thread-safe?
Klasa taka działa dobrze kiedy jest dostępna/używana przez wiele wątków bez względu na przeplatanie się i harmonogram ich działania bez dodatkowej synchronizacji po stronie kodu wywołującego. Klasa taka zawiera wszystkie mechanizmy synchronizacji w sobie.

Klasy bez stanu są zawsze thread-safe.

Atomowość 

 

Problemem mogą być operacje złożone dla których mogą wystąpić race-conditions.
Dwa typy race-conditions:
  • check-then-act
  • read-modify-write
Przykłady race-conditions:
  • "singleton" class z niesynchronizowaną metodą getInstance()
  • niesynchronizowana operacja zwiększania licznika count++
Operacje A and B są wzajemnie do siebie atomowe jeśli z perspektywy wątku wykonującego A jeśli inny wątek wykonuje B to jest ona wykonana w całości albo w ogóle.

Blokowanie 

 

Klasa która ma jedną zmienną tworzącą stan może być thread safe jeśli zmienna jest atomowa (np AtomicLong), po dodaniu drugiej już tak nie musi być jeśli zmienne nie są niezależne. Aby zmiana zmiennych była atomowa należy zastosować blokowanie poprzez użycie bloków lub metod synchronized. Ten rodzaj blokad nazywa się intrinsic lock. Blokady te są wielo-wejściowe (reentrant) więc wątek może uzyskać dostęp do blokady ponownie jeśli już go ma.

Strzeżenie zmiennych blokadami


Aby złożone akcje (check-then-act, read-modify-write) były atomowe muszą się znaleźć w bloku synchronizowanym przez blokadę.

Dla każdej zmiennej do której może mieć dostęp wiele wątków wszystkie dostępy muszą być przeprowadzone z tą samą blokadą.

Dostęp do zmiennych powiązanych musi się odbywać na tej samej blokadzie.

Żywotność i wydajność

 

Należy zmniejszać bloki synchronizowane tylko na czas zmian zmiennych, nie należy w nich wykonywać długich operacji szczególnie tych związanych z siecią i dostępem do I/O.


sobota, 10 maja 2014

Proste połączenie spring, jpa (hibernate), spring data, postgresql

Proste zestawienie aplikacji z JPA i spring data.

1. Tworzymy projekt mavenowy w eclipsie z archetypu maven.archetype.webapp - aplikacja będzie aplikacją webową - za pomocą żądania restowego przetestujemy pobieranie danych z bazy.

2. Dodajemy potrzebne zależności do poma:
zależności mvc, rest: servlet-api, jackson-mapper-asl
mamy dodatkowe zależności do jpa: hibernate-entitymanager, jta, spring-jdbc, spring-orm, spring-data-jpa i postgresql.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.java.ro</groupId>
    <artifactId>Invoices</artifactId>
    <packaging>war</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <name>Invoices Maven Webapp</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>3.2.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <version>2.5</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-entitymanager</artifactId>
            <version>4.3.4.Final</version>
        </dependency>
        <dependency>
            <groupId>javax.transaction</groupId>
            <artifactId>jta</artifactId>
            <version>1.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>3.2.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-orm</artifactId>
            <version>3.2.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.3-1100-jdbc41</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-jpa</artifactId>
            <version>1.4.3.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-aop</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <build>
        <finalName>Invoices</finalName>
    </build>
</project>


3. Konfigurujemy web.xml: dispacher servler oraz listener - dispacher będzie korzystać z pliku kontekstu do mvc (resta) a listener wczyta plik kontekstu do jpa:

<!DOCTYPE web-app PUBLIC
 "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
 "http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
    <display-name>Invoices</display-name>
   
    <!-- The definition of the Root Spring Container shared by all Servlets and Filters -->
    <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>/WEB-INF/spring/root-context.xml</param-value>
    </context-param>
   
    <!-- Creates the Spring Container shared by all Servlets and Filters -->
    <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
    </listener>
    <servlet>
        <servlet-name>appServlet</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>/WEB-INF/spring/servlet-context.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>appServlet</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
</web-app>


 4. Konfigurujemy serwis restowy (servlet-context.xml):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.2.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd">

    <mvc:annotation-driven/>
   
    <context:component-scan base-package="com.java.ro.invoices.controler"/>
   
    <bean class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver">
        <property name="order" value="1"/>
    </bean>
</beans>


5. Konfigurujemy jpa (root-context.xml): do konfiguracji potrzeba 3 podstawowych cegiełek:
  •  entityManagerFactory
  • transactionManager
  • dataSource
 oraz kilku innych beanów:
  • PersistenceAnnotationBeanPostProcessor - aby umożliwić wstrzyiwanie EntityManagera
  • jpa:repositories - określenie bazowego pakietu dla repozytorium
  • tx:annotation-driven - określenie że tranzakcyjność będzie zarządzana anotacjami
 <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jpa="http://www.springframework.org/schema/data/jpa"
    xmlns:repository="http://www.springframework.org/schema/data/repository"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd
        http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa-1.3.xsd
        http://www.springframework.org/schema/data/repository http://www.springframework.org/schema/data/repository/spring-repository-1.6.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd">

    <!-- config with annotation -->
    <context:annotation-config />
   
    <tx:annotation-driven transaction-manager="transactionManager" />
   
    <context:component-scan base-package="com.java.ro.invoices"/>

    <!-- spring data repositories base package -->
    <jpa:repositories base-package="com.java.ro.invoices.model.repository" />

    <!-- use persistence context annotation -->
    <bean
        class="org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor" />

    <bean id="entityManagerFactory"
        class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
        <property name="persistenceUnitName" value="invoicesPersistenceUnit" />
        <property name="dataSource" ref="dataSource" />
        <property name="jpaVendorAdapter">
            <bean class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter">
                <property name="showSql" value="true" />
            </bean>
        </property>
        <property name="jpaPropertyMap">
            <map>
                <entry key="hibernate.dialect" value="org.hibernate.dialect.PostgreSQLDialect" />
                <entry key="hibernate.hbm2ddl.auto" value="create" />
                <entry key="hibernate.format_sql" value="true" />
            </map>
        </property>
    </bean>

    <bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
        <property name="entityManagerFactory" ref="entityManagerFactory" />
    </bean>
    <bean id="dataSource"
        class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="org.postgresql.Driver" />
        <property name="url" value="jdbc:postgresql://localhost:5432/faktury" />
        <property name="username" value="invoice" />
        <property name="password" value="123" />
    </bean>

</beans>


6. W entity manager factory został podany persitence unit, należy stworzyć więc persistence.xml z tym persistent unitem - należy go umieścić w katalogu META-INF (scr/main/resources/META-INF):

<?xml version="1.0" encoding="UTF-8" ?>
<persistence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
  version="2.0" xmlns="http://java.sun.com/xml/ns/persistence">
  <persistence-unit name="invoicesPersistenceUnit">
  </persistence-unit>
</persistence>


7. Stworzenie przykładowej encji:

package com.java.ro.invoices.model.entity;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;

@Entity
public class Invoice implements Serializable {
   
    private static final long serialVersionUID = 1L;

    @Id
    @GeneratedValue
    private Long id;
   
    @Column(length = 20, nullable = false)
    private String number;
   
    @Column(nullable = false)
    private Date dateOfIssue;
   
    @Column(nullable = false)
    private Date maturity;
   
    @Column(nullable = false, precision = 10, scale = 2)
    private BigDecimal totalAmount;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getNumber() {
        return number;
    }

    public void setNumber(String number) {
        this.number = number;
    }

    public Date getDateOfIssue() {
        return dateOfIssue;
    }

    public void setDateOfIssue(Date dateOfIssue) {
        this.dateOfIssue = dateOfIssue;
    }

    public Date getMaturity() {
        return maturity;
    }

    public void setMaturity(Date maturity) {
        this.maturity = maturity;
    }

    public BigDecimal getTotalAmount() {
        return totalAmount;
    }

    public void setTotalAmount(BigDecimal totalAmount) {
        this.totalAmount = totalAmount;
    }
}


8. Tworzymy repozytorium dla encji - repozytorium będzie dziedziczyć z CRUDRepository:

package com.java.ro.invoices.model.repository;

import java.util.List;

import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;

import com.java.ro.invoices.model.entity.Invoice;

@Repository
public interface InvoiceRepository extends CrudRepository<Invoice, Long> {

    List<Invoice> findAll();
}


9. Tworzymy prosty serwis - bardzo ważna jest adnotacja Transactional jeśli chcemy coś zapisywać:

package com.java.ro.invoices.model.service;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.java.ro.invoices.model.entity.Invoice;
import com.java.ro.invoices.model.repository.InvoiceRepository;

@Service
@Transactional
public class InvoiceService {
   
    @Autowired
    private InvoiceRepository repository;
   
    public List<Invoice> getInvoices() {
        return repository.findAll();
    }
}


10. Tworzymy kontroler mvc aby pobrać dane:

package com.java.ro.invoices.controler;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import com.java.ro.invoices.model.entity.Invoice;
import com.java.ro.invoices.model.service.InvoiceService;

@Controller
public class InvoiceControler {
   
   
    @Autowired
    private InvoiceService invoiceService;
   
    @RequestMapping(value="/getInvoices", method= RequestMethod.GET)
    public @ResponseBody List<Invoice> getInvoices() {
        return invoiceService.getInvoices();
    }
   
}


11. Instalujemy postgre sql i konfigurujemy bazę danych i użytkownika.
12. Testujemy aplikację startując ją na tomcacie: wchodzimy na stronę: http://localhost:8080/Invoices/getInvoices:

strona zwracja [] co jest zrozumiałe bo nie ma żadnych danych,
dodajemy dane ręcznie do bazy danych:

INSERT INTO invoice.invoice(
            id, dateofissue, maturity, "number", totalamount)
    VALUES (1, TIMESTAMP '2014-04-01 00:00:00', TIMESTAMP '2014-04-30 00:00:00', 'FV-2014-4', 122.32);

INSERT INTO invoice.invoice(
            id, dateofissue, maturity, "number", totalamount)
    VALUES (2, TIMESTAMP '2014-05-01 00:00:00', TIMESTAMP '2014-05-31 00:00:00', 'FV-2014-5', 445.76);


odświerzamy stronę i otrzymujemy rezultat:


Aplikacja zwraca rekordy z bazy danych.