Синхронизация между Threads

При стартирането на две или повече нишки те започват да работят асинхронно и независимо една от друга. Когато споделят обаче един и същи ресурс е необходимо да не се позволи на една нишка да повреди или смени данните, които в момента се обработват от друга нишка - синхронизация.

Производител- консуматор. Разпределяне на ресурсите без синхронизация

Във една релация производител - консуматор,  една нишка производител вика метод за писане които записва последователност от числа ( 1, 2, 3, ...)  в поделена зона от паметта. Нишката консуматор чете тези данни:

public class IntShare {
private int intShared = -1;
public void setIntShared( int val ){
System.out.println( Thread.currentThread().getName() +
" set intShared: " + val );
intShared = val;
}
public int getIntShared(){
System.out.println( Thread.currentThread().getName() +
" get intShared " + intShared );
return intShared;
}
}
public class SetShared extends Thread {
private IntShare pGuarde;
public SetShared( IntShare h ){
super( "Set Shared" );
pGuarde = h;
}
public void run(){
for ( int counter = 1; counter <= 10; counter++ ) {
// sleep fot random time
try {Thread.sleep( (int) ( Math.random() * 3000 ) );}
catch( InterruptedException e ) {
System.err.println( e.toString() );
}
pGuarde.setIntShared( counter );
}
System.out.println( getName() + " finished" );
}
}
public class GetShared extends Thread {
private IntShare cGuarde;
public GetShared( IntShare h ){
super( "GetShared" );
cGuarde = h;
}
public void run(){
int val, sum = 0;
do {
// sleep fot random time
try {Thread.sleep( (int) ( Math.random() * 1000 ) );}
catch( InterruptedException e ) {
System.err.println( e.toString() );
}
val = cGuarde.getIntShared();
sum += val;
} while ( val != 10 );
System.out.println(
getName() + " Total: " + sum);
}
}
public class SharedMem {
public static void main( String args[] ){
IntShare h =new IntShare();
SetShared p = new SetShared( h );
GetShared c = new GetShared( h );
p.start();
c.start();
}
}
GetShared get intShared -1
GetShared get intShared -1
Set Shared set intShared: 1
GetShared get intShared 1
GetShared get intShared 1
GetShared get intShared 1
GetShared get intShared 1
Set Shared set intShared: 2
GetShared get intShared 2
Set Shared set intShared: 3
GetShared get intShared 3
GetShared get intShared 3
GetShared get intShared 3
GetShared get intShared 3
GetShared get intShared 3
Set Shared set intShared: 4
GetShared get intShared 4
GetShared get intShared 4
Set Shared set intShared: 5
GetShared get intShared 5
Set Shared set intShared: 6
GetShared get intShared 6
GetShared get intShared 6
GetShared get intShared 6
GetShared get intShared 6
Set Shared set intShared: 7
GetShared get intShared 7
GetShared get intShared 7
Set Shared set intShared: 8
GetShared get intShared 8
GetShared get intShared 8
GetShared get intShared 8
GetShared get intShared 8
Set Shared set intShared: 9
GetShared get intShared 9
GetShared get intShared 9
GetShared get intShared 9
Set Shared set intShared: 10
Set Shared finished
GetShared get intShared 10
GetShared Total: 139

 

Синхронизация

Кода на програмата, който може да предизвика повреждане или промяна на данните ползвани от друга нишка се нарича критична секция. Те са реализирани като методи или част от методи намиращи се в общия ресурс.

 Java Използва така наречените "монитори" за синхронизация между нишките. Мониторът представлява обект, който става собственост на нишката навлязла в критична секция и който й позволява да изпълнява критичната секция. Нарича се заключване на обекта, метода, кода.

За заключване на критичната секция се използва ключовата дума  synchronized  - за метод:

public synchronized int method1{
      ...
}

инструкция (или блок) 

synchronized : System.out.println("...");

или обект

synchronized(object1){
    object1.method2();

}

В първите два случая промянатс се прави в кода на споделения ресурс - в частта реализираща критична секция. В третия случай промяната се прави в кода на нишката (която заключва обекта преди да извика негов метод).

Обектът се заключва когато някоя нишка влиза в метода и програмния код. Всичко останали нишки, които искат да използват същия метод или код трябва да чакат завършването на метода или кода от заключилата я нишка. След освобождаването нишката с най-висок приоритет се опитва да вземе ресурса.

За всеки обект в който има синхронизиран метод (или самия обект е синхронизиран)се генерира един монитор.

Трябва да се отбележи, че за всеки обект съществува само по един монитор. Ако има няколко критични метода, то всички те се заключват при изпълнение на един от тях.

Ако се синхронизира статичен метод то заключването се извършва за всички обекти от класа.

Този подход  гарантира синхронизация на първо ниво - гарантира, че само една нишка може да навлезе в критичната секция. Той обаче не може да осигури някаква подредба на изпълнението на нишките в зависимост от състоянието на програмата

Подредбата на нишките се реализира чрез синхонизация на второ ниво, която се реализира чрез промяна на кода на нишките, които трябва да следят някакви условия показващи състоянието на програмата по време на изпълнението си. Ако една нишка е заключила обект и поради някаква причина не може да продължи, тя трябва да извика  wait() , да премине в състояние "блокирана" и да освободи заключването на обекта. За да бъде ре-активирана, тя трябва да получи от друга нишка съобщение  notyfy() По този начин изпълнението на нишките става управляемо от зададени условия, които позволяват да се управлява реда на изпълнение на нишките.

Ако множество нишки чакат за даден монитор Java runtime system избира една от тях без задължение или гаранция коя нишка ще бъде избрана.

 

notifyAll();

Класът Object притежава още един метод -- notifyAll() -- който събужда всички нишки, чакащи на същия монитор. В този случай нишките се състезават за монитора. След като една го получи другите отново минават в блокирано състояние.
 
Съществуват три метода
wait()
Чака безкрайно да бъде събуден с notyfy() или notifyAll().
wait(long timeout)
Чака събуждане докато изтекат timeout милисекунди.
wait(long timeout, int nanos)
Чака събуждане докато изтекат timeout милисекунди +  nanos наносекунди.
 
wait(long timeout) и wait(long timeout, int nanos) могат да бъдат използвани вместо sleep(). Разликата е, че sleep не може да бъде прекъснат преди изтичането на времето за разлика от wait.

 

Производител - консуматор със синхронизация на метод и управление на реда на изпълнение

Изисква се нишката производител да изпълни критичната секция за да произведе стойност, след това нишката консуматор да я консумира и отново да върне управлението на производителя.

public class IntShare {
private int intShared = -1;
private boolean writable= true;
public synchronized void setIntShared( int val ){
while(!writable) {
try{ wait(); }
catch(InterruptedException e){
System.err.println(e);
}
}
System.out.println( Thread.currentThread().getName() +
" set intShared: " + val );
intShared = val;
writable=false;
notify();
}
public synchronized int getIntShared(){
while(writable) {
try{ wait(); }
catch(InterruptedException e){
System.err.println(e);
}
}
System.out.println( Thread.currentThread().getName() +
" get intShared " + intShared );
writable=true;
notify();
return intShared;
}
}
Set Shared set intShared: 1
GetShared get intShared 1
Set Shared set intShared: 2
GetShared get intShared 2
Set Shared set intShared: 3
GetShared get intShared 3
Set Shared set intShared: 4
GetShared get intShared 4
Set Shared set intShared: 5
GetShared get intShared 5
Set Shared set intShared: 6
GetShared get intShared 6
Set Shared set intShared: 7
GetShared get intShared 7
Set Shared set intShared: 8
GetShared get intShared 8
Set Shared set intShared: 9
GetShared get intShared 9
Set Shared set intShared: 10
Set Shared finished
GetShared get intShared 10
GetShared Total: 55

 

 

Стартиране на няколко продуктора и консуматора

Консуматорът завършва, когато няма произведена стойност и няма активни производители - брояч на активните производители. Ecxeption за завършване.  "Timed waiting" за да не се пропусне завършването на последния производител.

public class IntShare {
    private int intShared = -1;
    private int numP =0;
    private boolean writable= true;
    public synchronized void incP(){        numP++;    }
    public synchronized void decP(){        numP--;    }
    public synchronized void setShared( int val ){
        while(!writable) {
            System.out.println("\t"+Thread.currentThread().getName()+" waiting");
            try{     wait();   }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.println( Thread.currentThread().getName() +
                " set intShared: " + val );
        intShared = val;
        writable = false;
        notifyAll();
    }
    public synchronized int getShared() throws NoP{
        while(writable) {
            if(numP==0)  throw new NoP()  ;
            System.out.println("\t"+Thread.currentThread().getName()+" waiting");
            try{     wait(1000);  }
            catch(InterruptedException e){
                System.err.println(e);
            }
        }
        System.out.println( Thread.currentThread().getName() +
                " get intShared " + intShared );
        writable = true;
        notifyAll();
        return intShared;
    }
}
-----------------------------------------------------------------------------
public class SetShared extends Thread {
    private IntShare pGarde;
    public SetShared( IntShare h ){
        super( "SetShared " +(int)(Math.random()*1000));
        pGarde = h;
    }
    public void run(){
        pGarde.incP();
        System.out.println( getName() + " starting"  );
        for ( int counter = 1; counter <= 4; counter++ ) {
            try {Thread.sleep( (int) ( Math.random() * 300 ) );}
            catch( InterruptedException e ) { System.err.println( e.toString() );   }
            pGarde.setShared( counter );
        }
        pGarde.decP();
        System.out.println( getName() + " finished"  );  
    }
}-----------------------------------------------------------------------------
public class GetShared extends Thread {
    private IntShare cGarde;
    public GetShared( IntShare h ){
        super( "GetShared "+(int)(Math.random()*1000) );
        cGarde = h;
    }
    public void run(){
        System.out.println( getName() + " starting"  );
        int val;
        do {
            try {Thread.sleep( (int) ( Math.random() * 100 ) );}
            catch( InterruptedException e ) {
                System.err.println( e.toString() );
            }
            try{
                val = cGarde.getShared();
            }catch (NoP exc){
                break;
            }
            System.out.println("\t\t"+this.getName()+" get value "+val);
        } while ( true );
        System.out.println( getName() + " finished");
    }
}
----------------------------------------------------------------------------
public class NoP extends Exception{
    NoP(){
        System.out.println("Exception NoP thrown");
    }
}
---------------------------------------------------------------------------
public class SharedMem {
    public static void main( String args[] ){
        IntShare h =new IntShare();
        for(int i=0;i<3;i++){
            (new SetShared( h )).start();
        }
        try {Thread.sleep( (int) ( Math.random() * 100 ) );}
            catch( InterruptedException e ) {
                System.err.println( e.toString() );
        }
        for(int i=0;i<2;i++){
            (new GetShared(h)).start();
        }
    }
}
SetShared 3 starting
SetShared 362 starting
SetShared 227 starting
GetShared 582 starting
GetShared 190 starting
    GetShared 582 waiting
    GetShared 190 waiting
SetShared 362 set intShared: 1
GetShared 190 get intShared 1
        GetShared 190 get value 1
    GetShared 582 waiting
SetShared 227 set intShared: 1
GetShared 582 get intShared 1
        GetShared 582 get value 1
    GetShared 190 waiting
    GetShared 582 waiting
SetShared 3 set intShared: 1
GetShared 582 get intShared 1
        GetShared 582 get value 1
    GetShared 190 waiting
SetShared 3 set intShared: 2
GetShared 190 get intShared 2
        GetShared 190 get value 2
    GetShared 582 waiting
SetShared 3 set intShared: 3
GetShared 582 get intShared 3
        GetShared 582 get value 3
    GetShared 190 waiting
SetShared 3 set intShared: 4
SetShared 3 finished
GetShared 190 get intShared 4
        GetShared 190 get value 4
    GetShared 582 waiting
    GetShared 190 waiting
    GetShared 582 waiting
SetShared 362 set intShared: 2
GetShared 582 get intShared 2
        GetShared 582 get value 2
    GetShared 190 waiting
    GetShared 582 waiting
SetShared 227 set intShared: 2
GetShared 582 get intShared 2
        GetShared 582 get value 2
    GetShared 190 waiting
    GetShared 582 waiting
    GetShared 190 waiting
    GetShared 582 waiting
SetShared 362 set intShared: 3
GetShared 582 get intShared 3
        GetShared 582 get value 3
    GetShared 190 waiting
SetShared 227 set intShared: 3
GetShared 190 get intShared 3
        GetShared 190 get value 3
    GetShared 582 waiting
    GetShared 190 waiting
    GetShared 582 waiting
    GetShared 190 waiting
SetShared 227 set intShared: 4
SetShared 227 finished
GetShared 190 get intShared 4
        GetShared 190 get value 4
    GetShared 582 waiting
SetShared 362 set intShared: 4
SetShared 362 finished
GetShared 582 get intShared 4
        GetShared 582 get value 4
Exception NoP thrown
GetShared 582 finished
Exception NoP thrown
GetShared 190 finished