Fix restream #101
@@ -73,7 +73,7 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
|
||||
ParsedName parsed = StreamService.ParseName(channel.Name);
|
||||
items.Add(new ChannelInfo()
|
||||
{
|
||||
Id = channel.StreamId.ToString(CultureInfo.InvariantCulture),
|
||||
Id = StreamService.ToGuid(StreamService.LiveTvPrefix, channel.StreamId, 0, 0).ToString(),
|
||||
Number = channel.Num.ToString(CultureInfo.InvariantCulture),
|
||||
ImageUrl = channel.StreamIcon,
|
||||
Name = parsed.Title,
|
||||
@@ -168,6 +168,13 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
|
||||
/// <inheritdoc />
|
||||
public async Task<IEnumerable<ProgramInfo>> GetProgramsAsync(string channelId, DateTime startDateUtc, DateTime endDateUtc, CancellationToken cancellationToken)
|
||||
{
|
||||
Guid guid = Guid.Parse(channelId);
|
||||
StreamService.FromGuid(guid, out int prefix, out int streamId, out int _, out int _);
|
||||
if (prefix != StreamService.LiveTvPrefix)
|
||||
{
|
||||
throw new ArgumentException("Unsupported channel");
|
||||
}
|
||||
|
||||
string key = $"xtream-epg-{channelId}";
|
||||
ICollection<ProgramInfo>? items = null;
|
||||
if (memoryCache.TryGetValue(key, out ICollection<ProgramInfo>? o))
|
||||
@@ -180,13 +187,12 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
|
||||
Plugin plugin = Plugin.Instance;
|
||||
using (XtreamClient client = new XtreamClient())
|
||||
{
|
||||
int streamId = int.Parse(channelId, CultureInfo.InvariantCulture);
|
||||
EpgListings epgs = await client.GetEpgInfoAsync(plugin.Creds, streamId, cancellationToken).ConfigureAwait(false);
|
||||
foreach (EpgInfo epg in epgs.Listings)
|
||||
{
|
||||
items.Add(new ProgramInfo()
|
||||
{
|
||||
Id = $"epg-{epg.Id}",
|
||||
Id = StreamService.ToGuid(StreamService.EpgPrefix, streamId, epg.Id, 0).ToString(),
|
||||
ChannelId = channelId,
|
||||
StartDate = epg.Start,
|
||||
EndDate = epg.End,
|
||||
@@ -213,15 +219,21 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
|
||||
/// <inheritdoc />
|
||||
public Task<ILiveStream> GetChannelStreamWithDirectStreamProvider(string channelId, string streamId, List<ILiveStream> currentLiveStreams, CancellationToken cancellationToken)
|
||||
{
|
||||
ILiveStream? stream = currentLiveStreams.Find(stream => stream.TunerHostId == Restream.TunerHost && stream.MediaSource.Id == channelId);
|
||||
Guid guid = Guid.Parse(channelId);
|
||||
StreamService.FromGuid(guid, out int prefix, out int channel, out int _, out int _);
|
||||
if (prefix != StreamService.LiveTvPrefix)
|
||||
{
|
||||
throw new ArgumentException("Unsupported channel");
|
||||
}
|
||||
|
||||
Plugin plugin = Plugin.Instance;
|
||||
MediaSourceInfo mediaSourceInfo = plugin.StreamService.GetMediaSourceInfo(StreamType.Live, channel, restream: true);
|
||||
ILiveStream? stream = currentLiveStreams.Find(stream => stream.TunerHostId == Restream.TunerHost && stream.MediaSource.Id == mediaSourceInfo.Id);
|
||||
if (stream != null)
|
||||
{
|
||||
return Task.FromResult(stream);
|
||||
}
|
||||
|
||||
Plugin plugin = Plugin.Instance;
|
||||
int channel = int.Parse(channelId, CultureInfo.InvariantCulture);
|
||||
MediaSourceInfo mediaSourceInfo = plugin.StreamService.GetMediaSourceInfo(StreamType.Live, channel, restream: true);
|
||||
stream = new Restream(appHost, httpClientFactory, logger, mediaSourceInfo);
|
||||
return Task.FromResult(stream);
|
||||
}
|
||||
|
@@ -81,6 +81,16 @@ public class StreamService
|
||||
/// </summary>
|
||||
public const int MediaSourcePrefix = 0x5d774c3d;
|
||||
|
||||
/// <summary>
|
||||
/// The id prefix for Live TV items.
|
||||
/// </summary>
|
||||
public const int LiveTvPrefix = 0x5d774c3e;
|
||||
|
||||
/// <summary>
|
||||
/// The id prefix for TV EPG items.
|
||||
/// </summary>
|
||||
public const int EpgPrefix = 0x5d774c3f;
|
||||
|
||||
private static readonly Regex TagRegex = new Regex(@"\[([^\]]+)\]|\|([^\|]+)\|");
|
||||
|
||||
private readonly ILogger logger;
|
||||
|
@@ -15,6 +15,7 @@
|
||||
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
|
||||
namespace Jellyfin.Xtream.Service;
|
||||
|
||||
@@ -25,9 +26,8 @@ public class WrappedBufferReadStream : Stream
|
||||
{
|
||||
private readonly WrappedBufferStream sourceBuffer;
|
||||
|
||||
private long position;
|
||||
private readonly long initialReadHead;
|
||||
private long readHead;
|
||||
private long totalBytesRead;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="WrappedBufferReadStream"/> class.
|
||||
@@ -37,8 +37,7 @@ public class WrappedBufferReadStream : Stream
|
||||
{
|
||||
this.sourceBuffer = sourceBuffer;
|
||||
this.readHead = sourceBuffer.TotalBytesWritten;
|
||||
this.totalBytesRead = 0;
|
||||
this.position = sourceBuffer.Position;
|
||||
this.initialReadHead = readHead;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -49,10 +48,13 @@ public class WrappedBufferReadStream : Stream
|
||||
/// <summary>
|
||||
/// Gets the number of bytes that have been written to this stream.
|
||||
/// </summary>
|
||||
public long TotalBytesRead { get => totalBytesRead; }
|
||||
public long TotalBytesRead { get => readHead - initialReadHead; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public override long Position { get => position; set => position = value; }
|
||||
public override long Position
|
||||
{
|
||||
get => readHead % sourceBuffer.BufferSize; set { }
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool CanRead => true;
|
||||
@@ -72,6 +74,14 @@ public class WrappedBufferReadStream : Stream
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
{
|
||||
long gap = sourceBuffer.TotalBytesWritten - readHead;
|
||||
|
||||
// We cannot return with 0 bytes read, as that indicates the end of the stream has been reached
|
||||
while (gap == 0)
|
||||
{
|
||||
Thread.Sleep(1);
|
||||
gap = sourceBuffer.TotalBytesWritten - readHead;
|
||||
}
|
||||
|
||||
if (gap > sourceBuffer.BufferSize)
|
||||
{
|
||||
// TODO: design good handling method.
|
||||
@@ -81,30 +91,20 @@ public class WrappedBufferReadStream : Stream
|
||||
throw new IOException("Reader cannot keep up");
|
||||
}
|
||||
|
||||
// The bytes that still need to be copied.
|
||||
long remaining = Math.Min(count, sourceBuffer.TotalBytesWritten - readHead);
|
||||
long remainingOffset = offset;
|
||||
|
||||
// The number of bytes that can be copied.
|
||||
long canCopy = Math.Min(count, gap);
|
||||
long read = 0;
|
||||
|
||||
// Copy inside a loop to simplify wrapping logic.
|
||||
while (remaining > 0)
|
||||
while (read < canCopy)
|
||||
{
|
||||
// The amount of bytes that we can directly write from the current position without wrapping.
|
||||
long readable = Math.Min(remaining, sourceBuffer.BufferSize - Position);
|
||||
long readable = Math.Min(canCopy - read, sourceBuffer.BufferSize - Position);
|
||||
|
||||
// Copy the data.
|
||||
Array.Copy(sourceBuffer.Buffer, Position, buffer, remainingOffset, readable);
|
||||
remaining -= readable;
|
||||
remainingOffset += readable;
|
||||
|
||||
Array.Copy(sourceBuffer.Buffer, Position, buffer, offset + read, readable);
|
||||
read += readable;
|
||||
Position += readable;
|
||||
readHead += readable;
|
||||
totalBytesRead += readable;
|
||||
|
||||
// We might have to loop the position.
|
||||
Position %= sourceBuffer.BufferSize;
|
||||
}
|
||||
|
||||
return (int)read;
|
||||
|
@@ -25,7 +25,6 @@ public class WrappedBufferStream : Stream
|
||||
{
|
||||
private readonly byte[] sourceBuffer;
|
||||
|
||||
private long position;
|
||||
private long totalBytesWritten;
|
||||
|
||||
/// <summary>
|
||||
@@ -56,7 +55,10 @@ public class WrappedBufferStream : Stream
|
||||
public long TotalBytesWritten { get => totalBytesWritten; }
|
||||
|
||||
/// <inheritdoc />
|
||||
public override long Position { get => position; set => position = value; }
|
||||
public override long Position
|
||||
{
|
||||
get => totalBytesWritten % BufferSize; set { }
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool CanRead => false;
|
||||
@@ -81,25 +83,18 @@ public class WrappedBufferStream : Stream
|
||||
/// <inheritdoc />
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
{
|
||||
// The bytes that still need to be copied.
|
||||
long remaining = count;
|
||||
long remainingOffset = offset;
|
||||
long written = 0;
|
||||
|
||||
// Copy inside a loop to simplify wrapping logic.
|
||||
while (remaining > 0)
|
||||
while (written < count)
|
||||
{
|
||||
// The amount of bytes that we can directly write from the current position without wrapping.
|
||||
long writable = Math.Min(remaining, BufferSize - Position);
|
||||
long writable = Math.Min(count - written, BufferSize - Position);
|
||||
|
||||
// Copy the data.
|
||||
Array.Copy(buffer, remainingOffset, sourceBuffer, Position, writable);
|
||||
remaining -= writable;
|
||||
remainingOffset += writable;
|
||||
Position += writable;
|
||||
Array.Copy(buffer, offset + written, sourceBuffer, Position, writable);
|
||||
written += writable;
|
||||
totalBytesWritten += writable;
|
||||
|
||||
// We might have to wrap the position.
|
||||
Position %= BufferSize;
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user