Prev: Using GtkAda in Ubuntu
Next: gnat on opensuse 11.1
From: Maciej Sobczak on 19 Sep 2008 08:21 Hi all, Imagine a server with fixed number of worker tasks. There is no queue of jobs and jobs are immediately passed to one of the tasks that is currently idle. There is a separate task (or just the main one) that provides jobs for worker tasks. I am concerned with the proper structure of objects - I mean in the sense of recommended Ada practice. Obviously there is a need for some shared resource where the requesting task will put the job and from where the worker task will pick it up. This is more or less what I came up with, where the "channel" is a single processing pipeline: type Worker_State is (Idle, Ready, Working); protected type Channel_State is procedure Post (J : in Job_Type); entry Get_Job (J : out Job_Type); function Busy return Boolean; private State : Worker_State := Idle; Job_To_Do : Job_Type; end Channel_State; protected body Channel_State is procedure Post (J : in Job_Type) is begin if State /= Idle then raise Program_Error; end if; Job_To_Do := J; State := Ready; end Post; entry Get_Job (J : out Job_Type) when State = Ready is begin J := Job_To_Do; State := Working; end Get_Job; function Busy return Boolean is begin return State /= Idle; end Busy; end Channel_State; type Channel; task type Worker_Task (Ch : access Channel); type Channel is record State : Channel_State; Worker : Worker_Task (Channel'Access); end record; task body Worker_Task is Job : Job_Type; begin loop Ch.all.Get_Job (Job); -- do the job ... end loop; end Worker_Task; Max_Channels : constant := 5; Channels : array (1 .. Max_Channels) of Channel; My question is whether this is what a seasoned Ada programmer would do. Initially I tried to have two separate arrays, one for jobs and one for worker tasks, but I found it difficult to link workers with their respective jobs. After bundling them together in a single record that is referenced from the task it worked and I actually find it structured better. The main task after constructing a job object finds some channel where the worker task is not busy and posts the job to its shared state component: loop Job := ... Found_Worker := False; for I in Channels'Range loop if not Channels (I).State.Busy then Channels (I).State.Post (Job); Found_Worker := True; exit; end if; end loop; if not Found_Worker then -- all pipelines are busy, -- but the overflow handling is not shown... end if; end loop; All this works fine, but my question considers the choice of language constructs and idioms. -- Maciej Sobczak * www.msobczak.com * www.inspirel.com Database Access Library for Ada: www.inspirel.com/soci-ada
From: Jean-Pierre Rosen on 19 Sep 2008 09:34 Maciej Sobczak a �crit : > Hi all, > > Imagine a server with fixed number of worker tasks. There is no queue > of jobs and jobs are immediately passed to one of the tasks that is > currently idle. There is a separate task (or just the main one) that > provides jobs for worker tasks. [...] Why not simply use a rendezvous? Each worker has an entry Get_Job: task body Worker_Task is Job : Job_Type; begin loop Get_Job (Job); -- do the job ... end loop; end Worker_Task; and the server is simply (assuming Servers is an array of Worker_Task): loop Job := ... Found_Worker := False; for I in Servers'Range loop select Server (I).Get_Job (Job); Found_Worker := True; exit; else -- This server is busy null; end select; end loop; if not Found_Worker then -- all pipelines are busy, -- but the overflow handling is not shown... end if; end loop; -- --------------------------------------------------------- J-P. Rosen (rosen(a)adalog.fr) Visit Adalog's web site at http://www.adalog.fr
From: Dmitry A. Kazakov on 19 Sep 2008 13:02 On Fri, 19 Sep 2008 15:34:00 +0200, Jean-Pierre Rosen wrote: > Maciej Sobczak a �crit : >> >> Imagine a server with fixed number of worker tasks. There is no queue >> of jobs and jobs are immediately passed to one of the tasks that is >> currently idle. There is a separate task (or just the main one) that >> provides jobs for worker tasks. > [...] > > Why not simply use a rendezvous? > Each worker has an entry Get_Job: > > task body Worker_Task is > Job : Job_Type; > begin > loop > Get_Job (Job); You mean: accept Get_Job (Requested : Job_Type) do Job := Requested; end Get_Job; > end loop; > end Worker_Task; > > and the server is simply (assuming Servers is an array of Worker_Task): > > loop > Job := ... > > Found_Worker := False; > for I in Servers'Range loop > select > Server (I).Get_Job (Job); > Found_Worker := True; > exit; > else > -- This server is busy > null; > end select; > end loop; > > if not Found_Worker then > -- all pipelines are busy, > -- but the overflow handling is not shown... > end if; > end loop; This scheme requires some additional efforts in order to maintain the list of idle workers. I would use an inverse one, which is much simpler: with Ada.Text_IO; use Ada.Text_IO; type Job_Type is (Quit, Run); type Job (Kind : Job_Type := Quit) is null record; task type Worker; task Server is entry Get (Work : out Job); end Server; task body Server is Request : Job; Workers : array (1..5) of Worker; begin loop -- Get a job to do Request := (Kind => Run); -- Wait for a worker to come accept Get (Work : out Job) do Work := Request; end Get; end loop; -- Terminating workers for Index in Workers'Range loop Request := (Kind => Quit); accept Get (Work : out Job) do Work := Request; end Get; end loop; end Server; task body Worker is Work : Job; begin loop Server.Get (Work); case Work.Kind is when Quit => exit; when Run => Put_Line ("Doing things"); end case; end loop; Put_Line ("I am done"); end Worker; Note that task termination is usually a difficult problem in Ada. You should pay an attention to this early. (The "terminate" alternative is unusable in 80% cases.) In the solution above a special job type is used to kill the worker. -- Regards, Dmitry A. Kazakov http://www.dmitry-kazakov.de
From: anon on 19 Sep 2008 19:01 Since you are using Channels, I am assuming that your talking about TCP/IP servers. In this case you should look into using the Check_Selector with the use of Signalling_Fds and Socket_Sets instead of using arrays. Now the way GNAT has written the Selector function and the way the C_Select is written the maximum number of server per Check_Selector is 27. That is, each C_Select function can only monitor 32 sockets, but GNAT.Sockets.Create_Selector uses two socket and the TCP/IP sub-system uses or predefines 3, so you are left with 27 user define servers. Found this clent/server on the net. The following client/server shows how to use the Check_Selector routine. They were both were written in all caps and use Ada-95 spec so, you may need to adjust the Signalling_Fds routine calls for Ada-2005 (GNAT-2008), but they do compile and excute under GNAT GPL 2007 using Linux. Note: 1) I alter for spacing and cap format. To make them more readable. 2) The procedure "Set_Socket_Option" may need to be commented out. Some TCP/IP system do not like GNAT version of that routine. 3) As for Windows, I did not test! -------------------------------------------------------------- -- -- Pool_Server -- with GNAT.Sockets ; use GNAT.Sockets ; with Ada.Text_IO ; procedure Pool_Server is MaxTasks : constant Positive := 5 ; -- buffer size type Index is mod MaxTasks ; function Rev ( S : String ) return String is Res : String ( S'Range ) ; J : Integer := S'First ; begin for I in reverse S'Range loop Res ( J ) := S ( I ) ; J := J + 1 ; end loop ; return Res ; end Rev ; protected Aborted is procedure Set ; function Check return Boolean ; private Done : Boolean := False ; end Aborted ; protected body Aborted is procedure Set is begin Done := True ; end Set ; function Check return Boolean is begin return Done ; end Check ; end Aborted ; type Echo ; type Echo_Access is access Echo ; task type Echo is entry Start ( N_Sock : IN Socket_Type ; Self : IN Echo_Access ) ; entry ReStart ( N_Sock : IN Socket_Type ) ; end Echo ; type Task_Array is array ( Index ) of Echo_Access ; protected Buffer is entry Deposit ( X : in Echo_Access ) ; entry Extract ( X : out Echo_Access ) ; function NumWaiting return Natural ; private Buf : Task_Array ; I, J : Index := 0 ; Count : Natural range 0 .. MaxTasks := 0 ; end Buffer ; task body Echo is Sock : Socket_Type ; S : Stream_Access ; Me : Echo_Access ; Input_Selector : Selector_Type ; Input_Set : Socket_Set_Type ; WSet : Socket_Set_Type ; Input_Status : Selector_Status ; begin --set up selector Create_Selector ( Input_Selector ) ; --Initialise socket sets --WSet is always empty as we are not interested in output events -- RSet only ever contains one socket namely Sock Empty ( Input_Set ) ; Empty ( WSet ) ; ACCEPT Start ( N_Sock : IN Socket_Type ; Self : IN Echo_Access ) DO Sock := N_Sock ; Me := Self ; end Start ; loop begin -- block for exception handling S := Stream ( Sock ) ; -- set up stream on socket Boolean'Write ( S, True ) ; -- acknowledge connection loop -- check for input on Sock socket Set ( Input_Set, Sock ) ; -- time-out on check if no input within 0.5 second Check_Selector ( Input_Selector, Input_Set, WSet, Input_Status, 0.5 ) ; if Input_Status = Completed then -- we have input, so process it declare Str : String := String'Input ( S ) ; begin exit when Str = "quit" ; String'Output ( S, Rev ( Str ) ) ; end ; end if ; if Aborted.Check then String'Output ( S, "Server aborted" ) ; exit ; end if ; end loop ; Ada.Text_IO.New_Line ; Ada.Text_IO.Put_Line ( "Slave Closing Connection" ) ; ShutDown_Socket ( Sock, Shut_Read_Write ) ; Buffer.Deposit ( Me ) ; exception -- The mostly likely exception is if client quits unexpectedly -- close the socket and deposit ourselves in the buffer when others => Ada.Text_IO.New_Line ; Ada.Text_IO.Put_Line ( "Connection closed unexpectedly" ) ; Close_Socket ( Sock ) ; Buffer.Deposit ( Me ) ; end ; select ACCEPT ReStart ( N_Sock : IN Socket_Type ) DO Sock := N_Sock ; end ReStart ; or -- terminate if all slaves are queued here and -- if the main server task has finished terminate ; end select ; end loop ; end Echo ; protected body Buffer is entry Deposit ( X : IN Echo_Access ) when Count < MaxTasks is begin Buf ( I ) := X ; I := I + 1 ; Count := Count + 1 ; end Deposit ; entry Extract ( X : OUT Echo_Access ) when Count > 0 is begin X := Buf ( J ) ; J := J + 1 ; Count := Count - 1 ; end Extract ; function NumWaiting return Natural is begin return Count ; end NumWaiting ; end Buffer ; Server : Socket_Type ; New_Sock : Socket_Type ; Slave : Echo_Access ; Addr : Sock_Addr_Type ( Family_Inet ) ; Peer_Addr : Sock_Addr_Type ( Family_Inet ) ; Avail : Boolean := False ; Ch : Character ; TotalTasks : Natural := 0 ; Accept_Selector : Selector_Type ; Accept_Set : Socket_Set_Type ; WSet : Socket_Set_Type ; Accept_Status : Selector_Status ; begin -- main server task Ada.Text_IO.Put_Line ( "WARNING server loops for ever." ) ; Ada.Text_IO.Put ( "Press A to terminate server and all " ) ; Ada.Text_IO.Put_Line ( "tasks immediately or press Q to ") ; Ada.Text_IO.Put ( "accept no further connections and " ) ; Ada.Text_IO.Put ( "terminate gracefully when all clients " ) ; Ada.Text_IO.Put ( "are fully when all clients are through." ) ; Ada.Text_IO.New_Line ; Initialize ; Create_Socket ( Server) ; Addr := ( Family_Inet, Addresses ( Get_Host_By_Name ( Host_Name ), 1 ), 50000 ) ; -- allow server address to be reused for multiple connections Set_Socket_Option ( Server, Socket_Level, ( Reuse_Address, True ) ) ; Bind_Socket ( Server, Addr ) ; Listen_Socket ( Server, 4 ) ; -- set up selector Create_Selector ( Accept_Selector ) ; -- Initialise socket sets -- WSet is always empty as we are not interested in output -- events Accept_Set only ever contains one socket namely -- Server Empty ( Accept_Set ) ; Empty ( WSet ) ; loop Ada.Text_IO.Get_Immediate ( Ch, Avail ) ; if Avail and then ( Ch = 'q' or Ch = 'Q' or Ch = 'a' or Ch = 'A' ) then exit ; end if ; -- check for input (connection requests) on Server socket Set ( Accept_Set, Server ) ; -- time-out on check if no request within 1 second Check_Selector ( Accept_Selector, Accept_Set, WSet, Accept_Status, 1.0 ) ; if Accept_Status = Completed then -- must be an event on Server socket as it is the only -- one that we are checking. -- Hence the Accept_Socket call should not block. Accept_Socket ( Server, New_Sock, Peer_Addr ) ; Ada.Text_IO.New_Line ; Ada.Text_IO.Put_Line ( "Connection accepted -- allocating slave" ) ; if Buffer.NumWaiting = 0 and TotalTasks < MaxTasks then Slave := NEW Echo ; -- start new task TotalTasks := TotalTasks + 1 ; Ada.Text_IO.Put_Line ( "New slave task started" ) ; -- call entry Start to activate task Slave.Start ( New_Sock, Slave ) ; else Ada.Text_IO.Put_Line ( "Waiting for an idle slave task" ) ; Buffer.Extract ( Slave ) ; -- call entry Start to re-activate task Slave.ReStart ( New_Sock ) ; Ada.Text_IO.Put_Line ( " Idle slave task reactivated" ) ; end if ; end if ; end loop ; if Ch = 'a' or Ch = 'A' then -- signal slave tasks to terminate Aborted.Set ; end if ; -- tidy up Close_Selector ( Accept_Selector ) ; Empty ( Accept_Set ) ; Close_Socket ( Server ) ; Ada.Text_IO.New_Line ; Ada.Text_IO.Put_Line ( "Main server task exiting ..." ) ; Finalize ; end Pool_Server ; -------------------------------------------------------------- -- -- Pool_Client -- with Gnat.Sockets ; use Gnat.Sockets ; with Ada.Command_Line ; use Ada.Command_Line ; with Ada.Text_IO ; use Ada.Text_IO ; procedure Pool_Client is Sock : Socket_Type ; S : Stream_Access ; Addr : Sock_Addr_Type ( Family_Inet ) ; Msg : String ( 1 .. 80 ) ; Last : Natural ; B : Boolean ; Read_Selector : Selector_Type ; Read_Set, WSet : Socket_Set_Type ; Read_Status : Selector_Status ; begin Initialize ; Create_Socket ( Sock ) ; Addr := ( Family_Inet, Addresses ( Get_Host_By_Name ( Argument ( 1 ) ), 1 ), 50000 ) ; Create_Selector ( Read_Selector ) ; Empty ( Read_Set ) ; Empty ( WSet ) ; Connect_Socket ( Sock, Addr ) ; S := Stream ( Sock ) ; Boolean'Read ( S, B ) ; -- wait for connection to be accepted loop Set ( Read_Set, Sock ) ; -- check for input on socket (server may be aborting) -- time-out immediately if no input pending -- We seem to need a small delay here (using zero seems to block -- forever) -- Is this a GNAT bug or AB misreading Check_Selector docs? Check_Selector ( Read_Selector, Read_Set, WSet, Read_Status, 0.005 ) ; if Read_Status = Expired then Ada.Text_IO.Put ( "Message> " ) ; -- prompt user for message Ada.Text_IO.Get_Line ( Msg, Last ) ; -- send message to socket unless server is aborting String'Output ( S, Msg ( 1 .. Last ) ) ; exit when Msg ( 1 .. Last ) = "quit" ; end if ; declare -- receive message Str : String := String'Input ( S ) ; begin Ada.Text_IO.Put_Line ( Str ) ; exit when Str = "Server aborted" ; end ; end loop ; Ada.Text_IO.Put_Line ( "Client quitting ..." ) ; ShutDown_Socket ( Sock ) ; Close_Selector ( Read_Selector ) ; Finalize ; exception when others => Ada.Text_IO.Put_Line ("Exception: Client quitting ..." ) ; Close_Socket ( Sock ) ; Close_Selector( Read_Selector ) ; Finalize ; end Pool_Client ; In <8b4d1170-22e6-40d3-8ed1-096dc0163491(a)m36g2000hse.googlegroups.com>, Maciej Sobczak <see.my.homepage(a)gmail.com> writes: >Hi all, > >Imagine a server with fixed number of worker tasks. There is no queue >of jobs and jobs are immediately passed to one of the tasks that is >currently idle. There is a separate task (or just the main one) that >provides jobs for worker tasks. > >I am concerned with the proper structure of objects - I mean in the >sense of recommended Ada practice. >Obviously there is a need for some shared resource where the >requesting task will put the job and from where the worker task will >pick it up. > >This is more or less what I came up with, where the "channel" is a >single processing pipeline: > > type Worker_State is (Idle, Ready, Working); > > protected type Channel_State is > procedure Post (J : in Job_Type); > entry Get_Job (J : out Job_Type); > function Busy return Boolean; > private > State : Worker_State := Idle; > Job_To_Do : Job_Type; > end Channel_State; > > protected body Channel_State is > > procedure Post (J : in Job_Type) is > begin > if State /= Idle then > raise Program_Error; > end if; > > Job_To_Do := J; > State := Ready; > end Post; > > entry Get_Job (J : out Job_Type) when State = Ready is > begin > J := Job_To_Do; > State := Working; > end Get_Job; > > function Busy return Boolean is > begin > return State /= Idle; > end Busy; > > end Channel_State; > > type Channel; > task type Worker_Task (Ch : access Channel); > > type Channel is record > State : Channel_State; > Worker : Worker_Task (Channel'Access); > end record; > > task body Worker_Task is > Job : Job_Type; > begin > loop > Ch.all.Get_Job (Job); > > -- do the job ... > > end loop; > end Worker_Task; > > Max_Channels : constant := 5; > > Channels : array (1 .. Max_Channels) of Channel; > >My question is whether this is what a seasoned Ada programmer would >do. >Initially I tried to have two separate arrays, one for jobs and one >for worker tasks, but I found it difficult to link workers with their >respective jobs. After bundling them together in a single record that >is referenced from the task it worked and I actually find it >structured better. > >The main task after constructing a job object finds some channel where >the worker task is not busy and posts the job to its shared state >component: > > loop > Job := ... > > Found_Worker := False; > for I in Channels'Range loop > if not Channels (I).State.Busy then > Channels (I).State.Post (Job); > Found_Worker := True; > exit; > end if; > end loop; > > if not Found_Worker then > -- all pipelines are busy, > -- but the overflow handling is not shown... > end if; > end loop; > >All this works fine, but my question considers the choice of language >constructs and idioms. > >-- >Maciej Sobczak * www.msobczak.com * www.inspirel.com > >Database Access Library for Ada: www.inspirel.com/soci-ada
From: Maciej Sobczak on 21 Sep 2008 13:23
On 19 Wrz, 15:34, Jean-Pierre Rosen <ro...(a)adalog.fr> wrote: > Why not simply use a rendezvous? This is something that I deliberately wanted to avoid. I don't like the idea of tasks interacting with each other directly and I prefer to have the communication part extracted away (Ravenscar got that right for a reason, I think?). In theory, I could also change the queuing strategy without modifying the worker task. -- Maciej Sobczak * www.msobczak.com * www.inspirel.com Database Access Library for Ada: www.inspirel.com/soci-ada |