Rexx has additional concurrency mechanisms that can add full concurrency so that more than one method of a given scope can run in an object at a time:
The SETUNGUARDED method of the Method class and the UNGUARDED option of the METHOD directive provide unconditional concurrency
GUARD OFF and GUARD ON control a method's exclusive access to an object's scope
The SETUNGUARDED method of the Method class and the UNGUARDED option of the ::METHOD directive control locking of an object's scope when a method is invoked. Both let a method run even if another method is active on the same object.
Use the SETUNGUARDED method or UNGUARDED option only for methods that do not need exclusive use of their object variable pool, that is, methods whose execution can interleave with another method's execution without affecting the object's integrity. Otherwise, concurrent methods can produce unexpected results.
To use the SETUNGUARDED method for a method you have created with the NEW method of the Method class, you specify:
methodname~SETUNGUARDED
(See SETUNGUARDED for details about SETUNGUARDED.)
Alternately, you can define a method with the ::METHOD directive, specifying the UNGUARDED option:
::METHOD methodname UNGUARDED
You might not be able to use the SETUNGUARDED method or UNGUARDED option in all cases. A method might need exclusive use of its object variables, then allow methods on other activities to run, and perhaps later need exclusive use again. You can use GUARD ON and GUARD OFF to alternate between exclusive use of an object's scope and allowing other activities to use the scope.
By default, a method must wait until a currently running method is finished before it begins. GUARD OFF lets another method (running on a different activity) that needs exclusive use of the same object variables become active on the same object. See GUARD for more information.
Concurrency requires the activities of concurrently running methods to be synchronized. Critical data must be safeguarded so diverse methods on other activities do not perform concurrent updates. Guarded methods satisfy both these needs.
A guarded method combines the UNGUARDED option of the ::METHOD directive or the SETUNGUARDED method of the Method class with the GUARD instruction.
The UNGUARDED option and the SETUNGUARDED method both provide unconditional concurrency. Including a GUARD instruction in a method makes concurrency conditional:
GUARD ON WHEN expression
If the expression on the GUARD instruction evaluates to 1 (true), the method continues to run. If the expression on the GUARD instruction evaluates to 0 (false), the method does not continue running. GUARD reevaluates the expression whenever the value of an exposed object variable changes. When the expression evaluates to 1, the method resumes running. You can use GUARD to block running any method when proceeding is not safe. (See GUARD for details about GUARD.)
Note: It is important to ensure that you use an expression that can be fulfilled. If the condition expression cannot be met, GUARD ON WHEN puts the program in a continuous wait condition. This can occur in particular when several activities run concurrently. In this case, a second activity can make the condition expression invalid before GUARD ON WHEN can use it.
To avoid this, ensure that the GUARD ON WHEN statement is executed before the condition is set to true. Keep in mind that the sequence of running activities is not determined by the calling sequence, so it is important to use a logic that is independent of the activity sequence.
The following example uses REPLY in a method for a write-back cache.
/* Method Write_Back */ use arg data /* Save data to be written */ reply 0 /* Tell the sender all was OK */ self~disk_write(data) /* Now write the data */
The REPLY instruction returns control to the point at which method Write_Back was called, returning the result 0. The caller of method Write_Back continues processing from this point; meanwhile, method Write_Back also continues processing.
The following example uses a message object. It reads a line asynchronously into the variable nextline:
mymsg = infile~start("READLINE") /* Gets message object to carry */ /* message to INFILE */ /* do other work */ nextline=mymsg~result /* Gets result from message object */
This creates a message object that waits for the read to finish while the sender continues with other work. When the line is read, the mymsg message object obtains the result and holds it until the sender requests it.
Semaphores and monitors (bounded buffers) synchronize concurrency processes. Giving readers and writers concurrent access is a typical concurrency problem. The following sections show how to use guarded methods to code semaphore and monitor mechanisms and to provide concurrency for readers and writers.
A semaphore is a mechanism that controls access to resources, for example, preventing simultaneous access. Synchronization often uses semaphores. Here is an example of a semaphore class:
Figure 12-5. Example of a Rexx Semaphore Class
/*******************************************************************************/ /* A Rexx Semaphore Class. */ /* */ /* This file implements a semaphore class in Rexx. The class is defined to */ /* the Global Rexx Environment. The following methods are defined for */ /* this class: */ /* init - Initializes a new semaphore. Accepts the following positional */ /* parameters: */ /* 'name' - global name for this semaphore */ /* if named default to set name in */ /* the class semDirectory */ /* noshare - do not define named semaphore */ /* in class semDirectory */ /* Initial state (0 or 1) */ /* setInitialState - Allow for subclass to have some post-initialization, */ /* and do setup based on initial state of semaphore */ /* Waiting - Is the number of objects waiting on this semaphore. */ /* Shared - Is this semaphore shared (Global). */ /* Named - Is this semaphore named. */ /* Name - Is the name of a named semaphore. */ /* setSem - Sets the semaphore and returns previous state. */ /* resetSem - Sets state to unSet. */ /* querySem - Returns current state of semaphore. */ /* */ /* SemaphoreMeta - Is the metaclass for the semaphore classes. This class is */ /* set up so that when a namedSemaphore is shared, it maintains these */ /* named/shared semaphores as part of its state. These semaphores are */ /* maintained in a directory, and an UNKNOWN method is installed on the */ /* class to forward unknown messages to the directory. In this way the */ /* class can function as a class and "like" a directory, so [] syntax can */ /* be used to retrieve a semaphore from the class. */ /* */ /* */ /* The following are in the subclass EventSemaphore. */ /* */ /* Post - Posts this semaphore. */ /* Query - Queries the number of posts since the last reset. */ /* Reset - Resets the semaphore. */ /* Wait - Waits on this semaphore. */ /* */ /* */ /* The following are in the subclass MutexSemaphore */ /* */ /* requestMutex - Gets exclusive use of semaphore. */ /* releaseMutex - Releases to allow someone else to use semaphore. */ /* NOTE: Currently anyone can issue a release (need not be the owner). */ /*******************************************************************************/ /* ============================================================================ */ /* === Start of Semaphore class. ===== */ /* ============================================================================ */ ::class SemaphoreMeta subclass class ::method init expose semDict /* Be sure to initialize parent */ .message~new(self, .array~of("INIT", super), "a", arg(1,"a"))~send semDict = .directory~new ::method unknown expose semDict use arg msgName, args /* Forward all unknown messages */ /* to the semaphore dictionary */ .message~new(semDict, msgName, "a", args)~send if var("RESULT") then return result else return ::class Semaphore subclass object metaclass SemaphoreMeta ::method init expose sem waits shared name use arg semname, shr, state waits = 0 /* No one waiting */ name = "" /* Assume unnamed */ shared = 0 /* Assume not shared */ sem = 0 /* Default to not posted */ if state = 1 Then /* Should initial state be set? */ sem = 1 /* Was a name specified? */ if VAR("SEMNAME") & semname \= "" Then Do name = semname /* Yes, so set the name */ if shr \= "NOSHARE" Then Do /* Do we want to share this sem? */ shared = 1 /* Yes, mark it shared */ /* Shared add to semDict */ self~class[name] = self End End self~setInitialState(sem) /* Initialize initial state */ ::method setInitialState /* This method intended to be */ nop /* overridden by subclasses */ ::method setSem expose sem oldState = sem sem = 1 /* Set new state to 1 */ return oldState ::method resetSem expose sem sem = 0 return 0 ::method querySem expose sem return sem ::method shared expose shared return shared /* Return true 1 or false 0 */ ::method named expose name /* Does semaphore have a name? */ if name = "" Then return 0 /* No, not named */ Else return 1 /* Yes, it is named */ ::method name expose name return name /* Return name or "" */ ::method incWaits expose waits waits = waits + 1 /* One more object waiting */ ::method decWaits expose Waits waits = waits - 1 /* One object less waiting */ ::method Waiting expose Waits return waits /* Return number of objects waiting */ /* ========================================================================== */ /* === Start of EventSemaphore class. === */ /* ========================================================================== */ ::class EventSemaphore subclass Semaphore public ::method setInitialState expose posted posts use arg posted if posted then posts = 1 else posts = 0 ::method post expose posts posted self~setSem /* Set semaphore state */ posted = 1 /* Mark as posted */ reply posts = posts + 1 /* Increase the number of posts */ ::method wait expose posted self~incWaits /* Increment number waiting */ guard off guard on when posted /* Now wait until posted */ reply /* Return to caller */ self~decWaits /* Cleanup, 1 less waiting */ ::method reset expose posts posted posted = self~resetSem /* Reset semaphore */ reply /* Do an early reply */ posts = 0 /* Reset number of posts */ ::method query expose posts /* Return number of times */ return posts /* Semaphore has been posted */ /* ========================================================================== */ /* === Start of MutexSemaphore class. === */ /* ========================================================================== */ ::class MutexSemaphore subclass Semaphore public ::method setInitialState expose owned use arg owned ::method requestMutex expose Owned Do forever /* Do until we get the semaphore */ owned = self~setSem if Owned = 0 /* Was semaphore already set? */ Then leave /* Wasn't owned; we now have it */ else Do self~incWaits guard off /* Turn off guard status to let */ /* others come in */ guard on when \Owned /* Wait until not owned and get */ /* guard */ self~decWaits /* One less waiting for MUTEX */ End /* Go up and see if we can get it */ End ::method releaseMutex expose owned owned = self~resetSem /* Reset semaphore */
Note: There are functions available that use system semaphores. See SysCreateEventSem, and SysCreateMutexSem.
A monitor object consists of a number of client methods, WAIT and SIGNAL methods for client methods to use, and one or more condition variables. Guarded methods provide the functionality of monitors. Do not confuse this with the Monitor class (see The Monitor Class).
::method init /* Initialize the bounded buffer */ expose size in out n use arg size in = 1 out = 1 n = 0 ::method append unguarded /* Add to the bounded buffer if not full */ expose n size b. in guard on when n < size use arg b.in in = in//size+1 n = n+1 ::method take /* Remove from the bounded buffer if not empty */ expose n b. out size guard on when n > 0 reply b.out out = out//size+1 n = n-1
The concurrency problem of the readers and writers requires that writers exclude writers and readers, whereas readers exclude only writers. The UNGUARDED option is required to allow several concurrent readers.
::method init expose readers writers readers = 0 writers = 0 ::method read unguarded /* Read if no one is writing */ expose writers readers guard on when writers = 0 readers = readers + 1 guard off /* Read the data */ say "Reading (writers:" writers", readers:" readers")." guard on readers = readers - 1 ::method write unguarded /* Write if no-one is writing or reading */ expose writers readers guard on when writers + readers = 0 writers = writers + 1 /* Write the data */ say "Writing (writers:" writers", readers:" readers")." writers = writers - 1