This is my code to read the csv file asynchronoulsy using ReadLineAsync() function from the StreamReader class but it reads first line only of the csv file
private async Task ReadAndSendJointDataFromCSVFileAsync(CancellationToken cancellationToken) {
Stopwatch sw = new Stopwatch();
sw.Start();
string filePath = @ "/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/Data/Robots_data.csv";
using(StreamReader oStreamReader = new StreamReader(File.OpenRead(filePath))) {
string sFileLine = await oStreamReader.ReadLineAsync();
string[] jointDataArray = sFileLine.Split(',');
// Assuming the joint data is processed in parallel
var tasks = new List < Task > ();
// Process joint pose
tasks.Add(Task.Run(async () => {
var jointPose = jointDataArray.Take(7).Select(Convert.ToSingle).ToArray();
var jointPoseJson = JsonSerializer.Serialize(jointPose);
await SendTelemetryAsync("JointPose", jointPoseJson, cancellationToken);
}));
// Process joint velocity
tasks.Add(Task.Run(async () => {
var jointVelocity = jointDataArray.Skip(7).Take(7).Select(Convert.ToSingle).ToArray();
var jointVelocityJson = JsonSerializer.Serialize(jointVelocity);
await SendTelemetryAsync("JointVelocity", jointVelocityJson, cancellationToken);
}));
// Process joint acceleration
tasks.Add(Task.Run(async () => {
var jointAcceleration = jointDataArray.Skip(14).Take(7).Select(Convert.ToSingle).ToArray();
var jointAccelerationJson = JsonSerializer.Serialize(jointAcceleration);
await SendTelemetryAsync("JointAcceleration", jointAccelerationJson, cancellationToken);
}));
// Process external wrench
tasks.Add(Task.Run(async () => {
var externalWrench = jointDataArray.Skip(21).Take(6).Select(Convert.ToSingle).ToArray();
var externalWrenchJson = JsonSerializer.Serialize(externalWrench);
await SendTelemetryAsync("ExternalWrench", externalWrenchJson, cancellationToken);
}));
await Task.WhenAll(tasks);
}
sw.Stop();
_logger.LogDebug(String.Format("Elapsed={0}", sw.Elapsed));
}
Basically, the csv file has 10128 lines. I want to read the latest line which gets added to the csv file.
How do I do it?
Using File.ReadLine(filePath) throws this exception
Unhandled exception. System.IO.PathTooLongException: The path '/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/-2.27625e-06,-0.78542,-3.79241e-06,-2.35622,5.66111e-06,3.14159,0.785408,0.00173646,-0.0015847,0.000962475,-0.00044469,-0.000247682,-0.000270337,0.000704195,0.000477503,0.000466693,-6.50664e-05,0.00112044,-2.47425e-06,0.000445592,-0.000685786,1.21642,-0.853085,-0.586162,-0.357496,-0.688677,0.230229' is too long, or a component of the specified path is too long.
private async Task ReadAndSendJointDataFromCSVFileAsync(CancellationToken cancellationToken) {
Stopwatch sw = new Stopwatch();
sw.Start();
string filePath = @ "/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/Data/Robots_data.csv";
using(StreamReader oStreamReader = new StreamReader(File.ReadLines(filePath).Last())) {
string sFileLine = await oStreamReader.ReadLineAsync();
string[] jointDataArray = sFileLine.Split(',');
// Assuming the joint data is processed in parallel
var tasks = new List < Task > ();
// Process joint pose
tasks.Add(Task.Run(async () => {
var jointPose = jointDataArray.Take(7).Select(Convert.ToSingle).ToArray();
var jointPoseJson = JsonSerializer.Serialize(jointPose);
await SendTelemetryAsync("JointPose", jointPoseJson, cancellationToken);
}));
// Process joint velocity
tasks.Add(Task.Run(async () => {
var jointVelocity = jointDataArray.Skip(7).Take(7).Select(Convert.ToSingle).ToArray();
var jointVelocityJson = JsonSerializer.Serialize(jointVelocity);
await SendTelemetryAsync("JointVelocity", jointVelocityJson, cancellationToken);
}));
// Process joint acceleration
tasks.Add(Task.Run(async () => {
var jointAcceleration = jointDataArray.Skip(14).Take(7).Select(Convert.ToSingle).ToArray();
var jointAccelerationJson = JsonSerializer.Serialize(jointAcceleration);
await SendTelemetryAsync("JointAcceleration", jointAccelerationJson, cancellationToken);
}));
// Process external wrench
tasks.Add(Task.Run(async () => {
var externalWrench = jointDataArray.Skip(21).Take(6).Select(Convert.ToSingle).ToArray();
var externalWrenchJson = JsonSerializer.Serialize(externalWrench);
await SendTelemetryAsync("ExternalWrench", externalWrenchJson, cancellationToken);
}));
await Task.WhenAll(tasks);
}
sw.Stop();
_logger.LogDebug(String.Format("Elapsed={0}", sw.Elapsed));
}
The slow (but reliable) method is to read all of the lines in the file and return the last one. Something like:
Of course this allocates a lot of strings for no good reason, and the GC will have to get rid of them all shortly afterwards. Not ideal, but that's how it goes. (The
asyncversion is simple enough to make, but this runs in a few ms per MB. Up to you.)To get it to go faster you can try just reading the last part of the file - enough to get a couple of lines - and process that. You'll need to know the maximum line length for this, just so you can guarantee that you get at least one full line. Assuming that you're working with ASCII or UTF8 data you can probably do something like this:
(This should work in fairly constant time regardless of the size of the file, since it will always process the same amount of data - about 100 microseconds on my machine for a 1MB file using
maxLineLength = 150. Again, conversion toasyncversion is simple.)And finally, if you're expecting lines to be added to the file over time and you want to just read the fresh lines from the file, keep track of the file size and resume reading from there when the file size changes.
As pointed out in the comments, the above isn't particularly useful if you want to read new lines as they come in from outside your program. Not only does it always return the last line of the file, regardless of changes, but if multiple lines are added you'll only get the last one. Almost certainly not what you want.
What you probably want is a way to read new lines as they come in, and not skip any new lines. This means tracking changes in the file's length and resuming read one line at a time. You don't want to lock the file too long, and since
StreamReaderbuffers ahead you can't rely onstream.Positionto line up with the end of the line you just read.The solution is to read all the lines you can each time and save them for subsequent reads. We can pull new lines into a
Queue<>and track the file's size to detect when there's more to read. Something like this:GetNextLineAsync()tries to retrieve the next available line,WaitNextLineAsync(...)spins asynchronously until a line is read. Cancellation exits instead of throwing, which is handy if you want to provide a timeout with aCancelAftertoken:(Yes, I could have made a much simpler version. You asked for
asyncthough.)NOTE: this is not thread safe.