Super fast batch processing, using batch parallelism, grouping related records
Ever came across a situation, where you needed to process a voluminous staging record set, clubbing up similar records?
For example, we have a huge journal staging table, and we need to process journals records from it, and then post them along with. Or you have a big staging table, containing purchase lines, where you need to calculate the individual puchase price and update the lines and then after each purchase order is complete, you need to process some additional activity at the header level. This necessarily means that you have to split up the entire process, by grouping up Journal Id (or Purch Id as per the last example), feeding each group into one thread and then keep an eye on each thread: as it completes, you carry out the additional activity.
The above figure exactly explains this process, by subjugating itself, into groups of records and then assigning each of these groups to a parallely executing thread. And then waiting for the thread to be over. And then finally concluding the process by executing last leg of the execution (posting individual journals, etc.).
The below mentioned steps explain the overall process. However I think, the first three steps, are quite common, and you can skip them -- as they occur in every SysOperationFramework pattern.
Step 1. Create the AOT query:
Okay, first thing first,. We need to design the query, selecting the group by field, which could be used as an unique identifier across the table. Going by the above example, we can take PurchId as a Group by field. In our example, we are calling it DMBatchStagingTblQuery.
Step 2. Create the Contract class:
The name of the contract class is DMBatchContract, and its structure is very gross:
[DataContract]
/// <summary>
/// Create getter setter for query
/// </summary>
public class DMBatchContract
{
str packedQuery;
[DataMember,AifQueryTypeAttribute('_packedQuery',queryStr(DMBatchStagingTblQuery))]
public str parmQuery(str _packedQuery = packedQuery)
{
packedQuery = _packedQuery;
return packedQuery;
}
public Query getQuery()
{
return new Query(SysOperationHelper::base64Decode(packedQuery));
}
public void setQuery(Query _query)
{
packedQuery = SysOperationHelper::base64Encode(_query.pack());
}
}
Which packs the query into a string to be prompted, on the controller class.
Step 3: Creating the controller class:
The name of the controller is: DMBatchController and looks like --
class DMBatchController
extends SysOperationServiceController
{
public static void main(Args _args)
{
DMBatchController controller;
controller = new DMBatchController (
classStr(DMBatchService),
methodStr(DMBatchService, process),
SysOperationExecutionMode::Synchronous);
controller.parmDialogCaption("Processing the multithreaded batch for purchase line update");
controller.startOperation();
}
}
Step 4: the service class
The name of the service class is DMBatchService, and is having one method called process, which looks like as follows:
public void process(DMBatchContract _contract)
{
QueryRun qryRun = new QueryRun(_contract.parmQuery());
while (qryRun.next())
{
DMPurchPriceStagingtable purchPriceStagingTable = qryRun.get(tablenum(DMPurchPriceStagingtable));
if (purchPriceStagingTable)
{
BatchHeader batchHeader = BatchHeader::construct();
batchHeader.parmCaption(strfmt("Processing purchase Id %1", purchPriceStagingTable.PurchId));
SysOperationServiceController sysOperationServiceController = new SysOperationServiceController(
classStr(DMIndividiualLineService),
methodStr(DMIndividiualLineService, processLine),
SysOperationExecutionMode::Synchronous);
DMIndividiualLineContract dataContract = sysOperationServiceController.getDataContractObject as DMIndividiualLineContract;
batchHeader.addTask(sysOperationServiceController);
}
batchHeader.save();
}
}
As you have rightly guessed, the above class takes up each purchId. It creates one header per Purchase id and then gets into the loop and calls another service class/controller as a tasks to the header [We have to create 3 more classes: service class - DMIndividiualLineService, controller - sysOperationServiceController, contract -- DMIndividiualLineContract].
Please note: do mention to keep the batch execution as Synchronous: as otherwise, the batch will simply forget to retain the value of the contract class parm methods.
5. Inside inner service class: DMIndividiualLineService
Depending on the number of lines, we can either directly process the individual lines like this:
public void processLine(DMIndividiualLineContract _contract)
{
PurchaseLine purchLine2Process;
while select forupdate purchLine2Process
where purchLine2Process.PurchId == _contract.parmPurchId()
{
//Process purchase lines
}
}
Or you can further arrange to create another set of contract, service, and controller classes, to split up the original thread, into smaller chinks.
6. Waiting for the thread execution to be over:
Look at the DMBatchService --> process method. This can be strategically done by noting the task in a container and then letting the container be looped through to, put the thread to 'Join' mode:
Task task = conpeek(containerTasks, i);
task.join();
And once this task is finished, you can call the necessary classes to post/process the header records.
Comments
Post a Comment