Do you guys have any pointers on multithreading? I’m making a bunch of network calls with a getHttp block. Each time the block is triggered, it creates a new thread and makes a network call, parses the data and updates specific slots based on the data (see image for reference).
The issue is that I have a ton of these and they trigger once every 10 seconds. That’s a lot of overhead in creating and deleting threads.
I was hoping to use a threadpool to hopefully lower this overhead. I’m not sure how to make it so that each instance of the program uses the same threadpool.
if there is another approach that might work better, i am open to ideas.
I also tried running everything on the main engine thread but that just causes severe lag in updating slots
The code is a bit messy as I’m still developing it but here it is. This is in a program block but I plan on moving to an ide in the future. I’m imply using the code from thread example in the program pallete.
public void onExecute() throws Exception
{
// Create a new thread, with the starting point set as the current program
// object.
Thread thread = new Thread(this, getComponent().getName());
// Start the thread
thread.start();
}
public void run()
{
// This task will take a long time
{
//System.out.println("started task ["+Thread.currentThread().getName()+"]");
BStatusString out = getOut();
BStatusString status = getStatus();
BStatusString url = getUrl();
BStatusString user = getUser();
BStatusString password = getPassword();
BStatusString serNo = getSerNo();
BStatusString values = getValues();
String deviceType = getDeviceType();
String output = "URL HERE";
try
{
//inputs
String xml = getURL(new URL(output));
out.setValue(xml);
//processing the connection failure
if(xml.contains("Request Failed"))
{
status.setValue("0");
}
else //parsing data
{
XParser xp = XParser.make(xml);
XElem rootElement = xp.parse();
//Validate a sucessful result
String value = getValueForKey("success", rootElement);
status.setValue(value);
if(status.getValue().equals("1"))
{
for(XElem element: rootElement.elem(getDeviceType()).elems())
{
String name= element.name();
value = getValueForKey(name, rootElement.elem(getDeviceType()));
if(this.getComponent().getSlot(name) == null)//check if slot exists
{
if(value!=null)//set slot output value
{
this.getComponent().add(name, new BStatusString(value),Flags.SUMMARY);
}
else//out is null here
{
this.getComponent().add(name, new BStatusString(),Flags.SUMMARY);
}
}
//if slot exists but the api retuen value is null or 0
else if (value == null || value.equals("0"))
{
Slot slot = this.getComponent().getSlot(name);
Property property = slot.asProperty();
BStatusString currentValue = (BStatusString) this.getComponent().get(property);
currentValue.setValue("");//set value to null
}
else //else just update the value
{
Slot slot = this.getComponent().getSlot(name);
Property property = slot.asProperty();
BStatusString currentValue = (BStatusString) this.getComponent().get(property);
currentValue.setValue(value);
}
}
}
}
} catch (Exception ie) {
out.setValue(ie.toString());
}
//System.out.println("ended task ["+Thread.currentThread().getName()+"]");
}
}
public String getValueForKey(String key, XElem rootElement)
{
if (rootElement == null || key == null)
{
return "XML has not been parsed yet";
}
String[] keyParts = key.split("\\.");
XElem currentElement = rootElement;
for (String part : keyParts)
{
boolean found = false;
for (XElem childElement : currentElement.elems())
{
if (childElement.name().equals(part))
{
currentElement = childElement;
found = true;
break;
}
}
if (!found)
{
return null; // Key not found
}
}
if (currentElement.text() == null) {
return null;
}
return currentElement.text().toString();
}
//import java.io.BufferedReader;
//import java.io.InputStreamReader;
//import java.net.HttpURLConnection;
//import java.net.URL;
//import java....IOException
public static String getURL(final URL destination) throws IOException, MalformedURLException {
// Handle MalformedURLException
try
{
final HttpURLConnection connection = (HttpURLConnection) destination.openConnection();
connection.setRequestMethod("GET");
connection.setRequestProperty("Host", destination.getHost());
connection.setDoOutput(true);
connection.connect();
try
{
int responseCode = connection.getResponseCode();
if (responseCode != HttpURLConnection.HTTP_OK)
{
return "Request Failed: "+ connection.getResponseCode();
}
StringBuilder inString = new StringBuilder();
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream())))
{
String inBuffer;
while ((inBuffer = in.readLine()) != null)
{
inString.append(inBuffer).append("\n");
}
}
return inString.toString();
} finally
{
connection.disconnect();
}
} catch (MalformedURLException e) {
return "Invalid URL";
}
}
I was thinking of making a block that manages all the threads and having all the getHTTP blocks reference that blocks threadpool to borrow a thread. just not sure if its feasible, it certainly sounds possible though.
How many of these blocks are you going to have? It might be overkill to run a new thread for each http request. How expensive are these requests on the main thread? It may be better just to use an “Action” to run the task.
Have a read through this document, it will give some “best practices” for multithreading.
My recommendation would be to build this into a module. Program objects when used frugally can be very taxing on the stations resources. Especially on station start up.
I currently have ~200+ of these. A single request isn’t too bad by itself but having 200+ requests does slow down the main thread. even causes Engine watchdog timeout causing the server to terminate and reboot (along with many other issues). I’ll likely have to add more blocks as we get more projects, this is running on a central server with a handful of sites on it. I can’t use “Action” since the data needs to be constantly refreshed. I can change the trigger time, currently 10 seconds, to something longer but that isn’t a long term solution.
I just went through the documentation, haven’t encountered this specific one before but I did read something similar. That and with stuff I learned in school, I seem to be doing pretty good in following best practices when it comes to multithreading. On a side note, I did take a class where we had to build an OS from scratch using C, so I’m pretty familiar with multithreading and the issues it can cause if not done properly, like deadlocks, race conditions, etc.
Hey Boba… This is a direct copy/paste from the dev course material.
Dunno if it’s useful, but it’s what I got. Haven’t used multithreading very much.
This is BDemoWorker.java
package com.tridiumuniversity.devTrafficLights;
import javax.baja.sys.*;
import javax.baja.util.*;
import javax.baja.nre.annotations.*;
@NiagaraType
@NiagaraProperty(
name = "queueSize",
type = "int",
defaultValue = "10"
)
public class BDemoWorker extends BWorker
{
@Override
public Worker getWorker()
{
if (worker == null)
{
queue = new CoalesceQueue(getQueueSize());
worker = new Worker(queue);
}
return worker;
}
@Override
public void changed(Property p, Context cx)
{
if (queueSize.equals(p))
{
// Force instantiation of new worker with new queue size
worker = null;
}
}
/**
* Post an action to be run asynchronously.
*/
public void postAsync(Runnable r)
{
if (!isRunning() || queue == null)
{
throw new NotRunningException();
}
queue.enqueue(r);
}
private CoalesceQueue queue;
private Worker worker;
}
And this is BFastComponentWithBWorker.java
package com.tridiumuniversity.devTrafficLights;
import javax.baja.nre.annotations.NiagaraAction;
import javax.baja.nre.annotations.NiagaraProperty;
import javax.baja.nre.annotations.NiagaraType;
import javax.baja.sys.Action;
import javax.baja.sys.BComponent;
import javax.baja.sys.BValue;
import javax.baja.sys.Context;
import javax.baja.sys.Flags;
import javax.baja.sys.Property;
import javax.baja.sys.Sys;
import javax.baja.sys.Type;
import javax.baja.util.IFuture;
import javax.baja.util.Invocation;
@NiagaraType
@NiagaraProperty(
name = "state",
type = "String",
flags = Flags.TRANSIENT | Flags.READONLY | Flags.SUMMARY,
defaultValue = "\"\""
)
@NiagaraProperty(
name = "worker",
type = "BDemoWorker",
defaultValue= "new BDemoWorker()"
)
@NiagaraAction(
name = "execute",
flags = Flags.ASYNC
)
public class BFastComponentWithBWorker
extends BComponent
{
public void doExecute()
{
setState("Executing");
try
{
//Simulate a slow task by sleeping for 10 seconds
Thread.sleep(10000L);
}
catch (InterruptedException ex)
{
setState("Interrupted");
}
setState("Complete");
}
@Override
public IFuture post(Action action, BValue argument, Context cx)
{
if (execute.equals(action))
{
getWorker().postAsync(new Invocation(this, action, argument, cx));
}
return null;
}
}
Oh sorry @Boba, I thought you were using program blocks for some reason . I usually use Worker as @Rosenthaler described.
I’m curious though, what are you interfacing to? This seems like an interesting project, looking at your screenshot. It looks like you’re pulling data via API for some schools? Is there an intermediate system where the API resides or do these controllers have a built in API?
Many schools/universities have pelican wireless tstats, especially for the mobile units. Pelican has a built in API that I can interface with.
I am currently in the process of testing ThreadPoolWorker to implement a threadpool for the getHttp requests to run on, instead of creating and deleting threads every execution. Funnily enough, ThreadPoolWorker is a sub class of worker. Seems like worker only runs on a singe thread (separate from main thread). I think its best if I allocate a threadpool for scalability.
Sounds cool! I’ve never heard of these pelican controllers.
I’m wondering, are you using these for control or just for monitoring and display? Maybe you can have it just fetch on demand, for example when someone views the graphic?
The “fetch on demand” idea did cross my mind. I’m comfortable with java but I get stuck on JS so detecting when a page is active isn’t something I’m able to integrate into Niagara. if you have any ideas on how to go about it, feel free to point out.
I wouldn’t think you’d have to code in any “fetch-on-demand” stuff. (I also assumed you were using a ProgramObject.) Always first ask the question: “What can the Framework do for me?” For instance, you shouldn’t have to code any “fetch-on-demand” stuff. For instance, if you’re extending bascidriver or ndriver that should be handled for you if you follow the rules for implementing proxy points, etc. I have seen many very excellent developers “reinvent the wheel,” doing awesome code in massive jar files where they could have simply written a few lines of code that “asked” the Framework to do the stuff for them. The post above by @Rosenthaler is great in that it points out a few of the ways that gets your code off the main engine thread by handing it to the Frameowork which will then safely handle any multi-threading. It sounds like you are heading in the right direction by creating a custom thread pool object from BThreadPoolWorker. Please post your progress here because I’d love to learn from your experience on this.
For this project, I’ve mostly used the built in functionality. I did build my own (very basic) XML parser until i found out that niagara had a built in one. Its not public and has no documentation but thats another story.
Regarding the threadpool issue, i did some reading on it. I’m currently struggling between whether to keep my current code (seems to be working well enough) or to optimize it and in turn making it more scalable.
Ultimately, i think it best to hold off any optimization until we get more pelican projects.
@Boba dont forget you can add external libraries to your code also and include them in your jar. You’ll probably find that there are a bunch of pre built xml parser libraries you could include to save you reinventing the wheel