31 0 578KB
Ministerul Educaţiei al Republicii Moldova Universitatea Tehnică a Moldovei Facultatea Calculatoare, Informatică şi Microelectronică
Raport Lucrarea de laborator nr. 1 Disciplina PAD Tema: Agent de mesagerie – Message Broker
A efectuat:
st. gr. Ti-141 Braga Eugen
A verificat:
Ciorba Dumitru
Chişinău 2017
Scopul lucrarii: Integrarea bazată pe agenți de mesaje care ar permite o comunicare asincronă dintre componentele distribuite ale unui sistem.
1) Realizind acest agent de mesagerie am folosit protocolul TCP. Protocolul de control al transmisiei este un protocol folosit de obicei de aplicații care au nevoie de confirmare de primire a datelor. Efectuează o conectare virtuală full duplex între două puncte terminale, fiecare punct fiind definit de către o adresă IP și de către un port TCP.Transmission Control Protocol (TCP) este unul dintre protocoalele de bază ale suitei de protocoale Internet. TCP este unul dintre cele două componente originale ale suitei (celalalt fiind Protocolul Internet, sau IP), astfel încât întreaga suita este frecvent menționată ca stiva TCP/IP. În special, TCP oferă încredere, asigura livrarea ordonata a unui flux de octeți de la un program de pe un computer la alt program de pe un alt computer aflat în rețea. Pe lângă sarcinile sale de gestionare a traficului, TCP controlează mărimea segmentului de date, debitul de informație, rata la care se face schimbul de date.
Am folosit acest protocol fiindca: -
Este bazat pe conexiuni
-
Asigura ca datele vor ajunge la destinatie in ordinea care trebuie.
-
In caz ca mesajul a ajuns corupt, acesta va retransmite inca odata mesajul.
2) Descrierea componentelor care participa la schimbul de mesaje: -
Broker: este o componenta fizica care gestioneaza comunicarea dintre componentele unei aplicatii distribuite. Avantajul utilizarii acestei tehnici consta in deculplarea receptorului de transmitatorul mesajelor. Prin urmare o aplicatie participanta transmite mesaje doar agentului, indicind un nume logic al receptorului. o Este capabil sa primeasca si sa transmita asincron mesaje. o Sa serializeze datele care le transmite o Sa cunoasca cui trebuie sa transmita mesaje.
-
Sender: este o componenta fizica care se conecteaza la agentul de mesaje si livreaza mesajele care vor fi accesibile de receivers. Acestia se conecteaza la un anumit IP si portul care agentul de mesaje asculta (port deschis), transmitind mesaje incontinuu tuturor receiverilor.
-
Receiver: o componenta fizica care se conecteaza la agentul de mesaje si face ”subscribe” ceea ce semnifica ca ei sunt gata sa primeasca toate mesajele de la senderi. Atunci cind mesajele se transmit de la broker la receiver ele se serializeaza si desirealizeaza.
3) Serializarea si desirealizarea obiectelor. Sender transmite mesaje de tip String catre Broker. Acesta in timpul procesarii creeaza din acel String un Obiect JSON care se transmite serializat. jsonObject = new JsonObject(); jsonObject.addProperty("mesaj:", message); jsonObject.addProperty("Sender:",(socket.getRemoteSocketAddress()).toString()); jsonQueu.add(jsonObject);
Acest obiect este compus din perechea “nume-valoare”, unde numele este String si valoarea este orice alt tip a JsonElement Serializarea se realizeaza prin tranformarea cu ajutorul clasei ObjectOutputStream care transforma un obiect in bytes si este scris intr-un stream care se transmite la alt host (receiver). for(Map.Entry entry : receiveri.entrySet()) { for(JsonObject strObj : jsonQueu) { try { entry.getValue().writeObject(strObj.toString()); System.out.println("Trimit la: " + entry.getKey().getRemoteSocketAddress()); } catch (IOException e) { e.printStackTrace(); } } }
Toti receiver’ii sunt pastrati intr-o mapa concurenta. Aceasta este compusa din Socket-ul conexiunii si ObjectOtputStream-ul acestui socket. Entry.getValue este ObjectOutputStream-ul prin care se realizeaza serializarea si se transmite la receiver. Aceasta casa are metoda writeObject() care realizeaza acest lucru cu un obiect. Desirealizarea – este procesul prin care un stream de bytes este convertit intr-un obiect complex cum ar fi JSON. Obiectele ObjectOutputStream si ObjectInputStream pot sa realizeze serializarea daca obiectul transmis poate fi serializat, de aceea la transmitere obiectul JSON se transmite ca un String. La desirealizare acesta este primit ca un String si apoi din String este format Obiectul JSON propriu zis. gson = new Gson(); jsonElement = gson.fromJson(objectInputStream.readObject().toString(), JsonElement.class); jsonObject = jsonElement.getAsJsonObject(); Apoi din acest JsonObiect se pot manipula elementele acestuia in modul urmator: future.get().get("Sender:") + future.get().get("mesaj:")) 4) Multi – Threading receivers and senders Ficare receiver si sender care se conecteaza la Broker i se ofera un fir de executie aparte, ceea ce semnifica ca ei sunt independent, pina la momentul cind trebuie sa proceseze vrio resursa critica. while(true){ Socket socket = this.serverSocket.accept(); new Thread( new Broker(socket)).start(); } Broker-ul mereu asculta si este legat de un anuit PORT, atunci cind cineva intentioneaza sa se conecteze la portul deschis de Broker, acesta isi creeaza o instanta noua pe un fir de executie diferit. Chiar daca pentru fiecare sender si receiver se creeaza defapt cite o instanta noua, aceasta are resursele sale statice ceea ce inseamna ca sunt resurse ale clasei insa nu ale instantei. private static BlockingQueue jsonQueu = new ArrayBlockingQueue(40); private static Map receiveri = new ConcurrentHashMap();
5) Implimentarea asincronismului Java ofera java.util.concurent.ExecutorService interfata care reprezinta un mecanizm de executie asincron care este capabil sa execute anumite Task-uri in background.
Figura 1 – Un thread delegheaza un task catre ExecutorService pentru o executie asincrona. Exista mai multe cai de a delega task-uri pentru executii printre care: -
execute(Runnable)
-
submit(Runnable)
-
submit(Callable)
ExecutorService service = Executors.newSingleThreadExecutor(); Future future = service.submit(new Callable() { @Override public JsonObject call() throws Exception { return desirealizeStringObject(); } }); service.shutdown(); service.awaitTermination(1000, TimeUnit.MILLISECONDS); In codul de mai sus este un exemplu de creare a unui task care executa un task pe un thread aparte si totodata acesta datorita intefetei Callable, va fi nevoita sa astepta un raspuns de tip JsonObject.
Atunci cind cream acest serviciu putem sa instantiem cite fire de executie dorim sa execute aceasta functionalitate. Totodata cind apelam service.shutdown() – se intelege ca nici un alt thread nu mai poate intra ca sa execute. Metoda awaitTermination() – blockeaza pina cind nu sa realizat taskul sau pina cind sa finisat timpul prescris. 6) Pastra mesajelor si rutarea lor. Mesajele care apar de la sender sunt stocate intr-o colectie concurenta care este sincronizata automat de Java Virtual Machine fara implicarea programatorului ( se refera si la scriere si la citire). private static BlockingQueue jsonQueu = new ArrayBlockingQueue(40); Atunci cind are loc transmiterea mesajelor serializate ele mai intii de toate sunt scoase din aceasta coada. Apoi obiectul este convertit in bytes si tranmis catre receiver.
class Class Model Runnable Broker -
bufferedReader: BufferedReader jsonObject: JsonObject jsonQueu: BlockingQueue = new ArrayBlocki... receiveri: Map = new ConcurrentH... socket: Socket toti: Map = new ConcurrentH...
+ + + + + +
Broker(Socket) checkForEmptyAndSendAsync(String): void createJSON(String): void ifSubscribe(String): void run(): void sendAndSerialize(Map): void Receiv er
Sender -
bufferedReaderFromRemote: BufferedReader = null printWriter: PrintWriter = null socket: Socket = null
+ +
main(String[]): void sendMessages(): void 1..*
1..*
-
gson: Gson jsonElement: JsonElement jsonObject: JsonObject keybordScanner: Scanner objectInputStream: ObjectInputStream printWriter: PrintWriter socket: Socket
+ + +
desirealizeStringObject(): JsonObject getMessages(): void main(String[]): void
brokerServ ice ~
PORT: int = 4444 {readOnly} serverSocket: ServerSocket
+ +
connection(): void main(String[]): void
Figura 2 – Diagrama de clase a componentelor participante la schimbul de mesaje.
In figura nr.2 este aratata structura fizica cit si comportamentul prin Diagrama de clase. Deci se observa careva relatii intre aceste componente. Relatia de agregare intre componenta Broker si BrokerService semnifica ca daca nu va exista BrokerService nu va putea exista Broker, deci acesta este dependent de BrokerService. Dupa multiplicitatea componentelor Sender si Reveiver se poate intelege ca pot exista mai multe instante independente la sistem. Concluzie Realizind acest laborator am insusit de ce este necesar ca schimbul de mesaje dintre careva componente sa fie procesate de un agent (de mesagerie). Acesta avind mai multe benefii asupra intregii platforme precum: -
reduce cuplarea: transmitatorii comunica doar cu brokerul, astfel o potential grupare a mai multor receptori sub un nume logic comun poate deveni transparenta transmitatorilor.
-
Mareste integrabilitatea: aplicatia care comunica cu brokerul nu trebuie sa aiba aceeasi interfata, astfel brokerul poate deveni o punte dintre aplicatii cu diferite nivele de securitate si calitate.
-
Mareste evolutivitatea: brokerul protejeaza componentele de modificarile individuale ale aplicatiilor integrate.
Totodata am implimentat ca fiecare componenta sa se execute pe un fir de executie aparte ceea ce este destul de important pe o platforma. Realizarea asincronizmului este o parte necesara pentru a nu crea blocari in momentul de transmitere si procesare. Bibliografie 1. http://tutorials.jenkov.com/java-util-concurrent/executorservice.html/ Jenkov/ 02.10.2017 2. http://stackoverflow.com/questions/4110664/gson-directly-convert-string-to-jsonobject-no-pojo 08.10.2017