Fix restream #101
@@ -73,7 +73,7 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
|
|||||||
ParsedName parsed = StreamService.ParseName(channel.Name);
|
ParsedName parsed = StreamService.ParseName(channel.Name);
|
||||||
items.Add(new ChannelInfo()
|
items.Add(new ChannelInfo()
|
||||||
{
|
{
|
||||||
Id = channel.StreamId.ToString(CultureInfo.InvariantCulture),
|
Id = StreamService.ToGuid(StreamService.LiveTvPrefix, channel.StreamId, 0, 0).ToString(),
|
||||||
Number = channel.Num.ToString(CultureInfo.InvariantCulture),
|
Number = channel.Num.ToString(CultureInfo.InvariantCulture),
|
||||||
ImageUrl = channel.StreamIcon,
|
ImageUrl = channel.StreamIcon,
|
||||||
Name = parsed.Title,
|
Name = parsed.Title,
|
||||||
@@ -168,6 +168,13 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public async Task<IEnumerable<ProgramInfo>> GetProgramsAsync(string channelId, DateTime startDateUtc, DateTime endDateUtc, CancellationToken cancellationToken)
|
public async Task<IEnumerable<ProgramInfo>> GetProgramsAsync(string channelId, DateTime startDateUtc, DateTime endDateUtc, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
|
Guid guid = Guid.Parse(channelId);
|
||||||
|
StreamService.FromGuid(guid, out int prefix, out int streamId, out int _, out int _);
|
||||||
|
if (prefix != StreamService.LiveTvPrefix)
|
||||||
|
{
|
||||||
|
throw new ArgumentException("Unsupported channel");
|
||||||
|
}
|
||||||
|
|
||||||
string key = $"xtream-epg-{channelId}";
|
string key = $"xtream-epg-{channelId}";
|
||||||
ICollection<ProgramInfo>? items = null;
|
ICollection<ProgramInfo>? items = null;
|
||||||
if (memoryCache.TryGetValue(key, out ICollection<ProgramInfo>? o))
|
if (memoryCache.TryGetValue(key, out ICollection<ProgramInfo>? o))
|
||||||
@@ -180,13 +187,12 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
|
|||||||
Plugin plugin = Plugin.Instance;
|
Plugin plugin = Plugin.Instance;
|
||||||
using (XtreamClient client = new XtreamClient())
|
using (XtreamClient client = new XtreamClient())
|
||||||
{
|
{
|
||||||
int streamId = int.Parse(channelId, CultureInfo.InvariantCulture);
|
|
||||||
EpgListings epgs = await client.GetEpgInfoAsync(plugin.Creds, streamId, cancellationToken).ConfigureAwait(false);
|
EpgListings epgs = await client.GetEpgInfoAsync(plugin.Creds, streamId, cancellationToken).ConfigureAwait(false);
|
||||||
foreach (EpgInfo epg in epgs.Listings)
|
foreach (EpgInfo epg in epgs.Listings)
|
||||||
{
|
{
|
||||||
items.Add(new ProgramInfo()
|
items.Add(new ProgramInfo()
|
||||||
{
|
{
|
||||||
Id = $"epg-{epg.Id}",
|
Id = StreamService.ToGuid(StreamService.EpgPrefix, streamId, epg.Id, 0).ToString(),
|
||||||
ChannelId = channelId,
|
ChannelId = channelId,
|
||||||
StartDate = epg.Start,
|
StartDate = epg.Start,
|
||||||
EndDate = epg.End,
|
EndDate = epg.End,
|
||||||
@@ -213,15 +219,21 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public Task<ILiveStream> GetChannelStreamWithDirectStreamProvider(string channelId, string streamId, List<ILiveStream> currentLiveStreams, CancellationToken cancellationToken)
|
public Task<ILiveStream> GetChannelStreamWithDirectStreamProvider(string channelId, string streamId, List<ILiveStream> currentLiveStreams, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
ILiveStream? stream = currentLiveStreams.Find(stream => stream.TunerHostId == Restream.TunerHost && stream.MediaSource.Id == channelId);
|
Guid guid = Guid.Parse(channelId);
|
||||||
|
StreamService.FromGuid(guid, out int prefix, out int channel, out int _, out int _);
|
||||||
|
if (prefix != StreamService.LiveTvPrefix)
|
||||||
|
{
|
||||||
|
throw new ArgumentException("Unsupported channel");
|
||||||
|
}
|
||||||
|
|
||||||
|
Plugin plugin = Plugin.Instance;
|
||||||
|
MediaSourceInfo mediaSourceInfo = plugin.StreamService.GetMediaSourceInfo(StreamType.Live, channel, restream: true);
|
||||||
|
ILiveStream? stream = currentLiveStreams.Find(stream => stream.TunerHostId == Restream.TunerHost && stream.MediaSource.Id == mediaSourceInfo.Id);
|
||||||
if (stream != null)
|
if (stream != null)
|
||||||
{
|
{
|
||||||
return Task.FromResult(stream);
|
return Task.FromResult(stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
Plugin plugin = Plugin.Instance;
|
|
||||||
int channel = int.Parse(channelId, CultureInfo.InvariantCulture);
|
|
||||||
MediaSourceInfo mediaSourceInfo = plugin.StreamService.GetMediaSourceInfo(StreamType.Live, channel, restream: true);
|
|
||||||
stream = new Restream(appHost, httpClientFactory, logger, mediaSourceInfo);
|
stream = new Restream(appHost, httpClientFactory, logger, mediaSourceInfo);
|
||||||
return Task.FromResult(stream);
|
return Task.FromResult(stream);
|
||||||
}
|
}
|
||||||
|
@@ -81,6 +81,16 @@ public class StreamService
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public const int MediaSourcePrefix = 0x5d774c3d;
|
public const int MediaSourcePrefix = 0x5d774c3d;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The id prefix for Live TV items.
|
||||||
|
/// </summary>
|
||||||
|
public const int LiveTvPrefix = 0x5d774c3e;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The id prefix for TV EPG items.
|
||||||
|
/// </summary>
|
||||||
|
public const int EpgPrefix = 0x5d774c3f;
|
||||||
|
|
||||||
private static readonly Regex TagRegex = new Regex(@"\[([^\]]+)\]|\|([^\|]+)\|");
|
private static readonly Regex TagRegex = new Regex(@"\[([^\]]+)\]|\|([^\|]+)\|");
|
||||||
|
|
||||||
private readonly ILogger logger;
|
private readonly ILogger logger;
|
||||||
|
@@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
using System;
|
using System;
|
||||||
using System.IO;
|
using System.IO;
|
||||||
|
using System.Threading;
|
||||||
|
|
||||||
namespace Jellyfin.Xtream.Service;
|
namespace Jellyfin.Xtream.Service;
|
||||||
|
|
||||||
@@ -25,9 +26,8 @@ public class WrappedBufferReadStream : Stream
|
|||||||
{
|
{
|
||||||
private readonly WrappedBufferStream sourceBuffer;
|
private readonly WrappedBufferStream sourceBuffer;
|
||||||
|
|
||||||
private long position;
|
private readonly long initialReadHead;
|
||||||
private long readHead;
|
private long readHead;
|
||||||
private long totalBytesRead;
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Initializes a new instance of the <see cref="WrappedBufferReadStream"/> class.
|
/// Initializes a new instance of the <see cref="WrappedBufferReadStream"/> class.
|
||||||
@@ -37,8 +37,7 @@ public class WrappedBufferReadStream : Stream
|
|||||||
{
|
{
|
||||||
this.sourceBuffer = sourceBuffer;
|
this.sourceBuffer = sourceBuffer;
|
||||||
this.readHead = sourceBuffer.TotalBytesWritten;
|
this.readHead = sourceBuffer.TotalBytesWritten;
|
||||||
this.totalBytesRead = 0;
|
this.initialReadHead = readHead;
|
||||||
this.position = sourceBuffer.Position;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -49,10 +48,13 @@ public class WrappedBufferReadStream : Stream
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Gets the number of bytes that have been written to this stream.
|
/// Gets the number of bytes that have been written to this stream.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public long TotalBytesRead { get => totalBytesRead; }
|
public long TotalBytesRead { get => readHead - initialReadHead; }
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override long Position { get => position; set => position = value; }
|
public override long Position
|
||||||
|
{
|
||||||
|
get => readHead % sourceBuffer.BufferSize; set { }
|
||||||
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override bool CanRead => true;
|
public override bool CanRead => true;
|
||||||
@@ -72,6 +74,14 @@ public class WrappedBufferReadStream : Stream
|
|||||||
public override int Read(byte[] buffer, int offset, int count)
|
public override int Read(byte[] buffer, int offset, int count)
|
||||||
{
|
{
|
||||||
long gap = sourceBuffer.TotalBytesWritten - readHead;
|
long gap = sourceBuffer.TotalBytesWritten - readHead;
|
||||||
|
|
||||||
|
// We cannot return with 0 bytes read, as that indicates the end of the stream has been reached
|
||||||
|
while (gap == 0)
|
||||||
|
{
|
||||||
|
Thread.Sleep(1);
|
||||||
|
gap = sourceBuffer.TotalBytesWritten - readHead;
|
||||||
|
}
|
||||||
|
|
||||||
if (gap > sourceBuffer.BufferSize)
|
if (gap > sourceBuffer.BufferSize)
|
||||||
{
|
{
|
||||||
// TODO: design good handling method.
|
// TODO: design good handling method.
|
||||||
@@ -81,30 +91,20 @@ public class WrappedBufferReadStream : Stream
|
|||||||
throw new IOException("Reader cannot keep up");
|
throw new IOException("Reader cannot keep up");
|
||||||
}
|
}
|
||||||
|
|
||||||
// The bytes that still need to be copied.
|
// The number of bytes that can be copied.
|
||||||
long remaining = Math.Min(count, sourceBuffer.TotalBytesWritten - readHead);
|
long canCopy = Math.Min(count, gap);
|
||||||
long remainingOffset = offset;
|
|
||||||
|
|
||||||
long read = 0;
|
long read = 0;
|
||||||
|
|
||||||
// Copy inside a loop to simplify wrapping logic.
|
// Copy inside a loop to simplify wrapping logic.
|
||||||
while (remaining > 0)
|
while (read < canCopy)
|
||||||
{
|
{
|
||||||
// The amount of bytes that we can directly write from the current position without wrapping.
|
// The amount of bytes that we can directly write from the current position without wrapping.
|
||||||
long readable = Math.Min(remaining, sourceBuffer.BufferSize - Position);
|
long readable = Math.Min(canCopy - read, sourceBuffer.BufferSize - Position);
|
||||||
|
|
||||||
// Copy the data.
|
// Copy the data.
|
||||||
Array.Copy(sourceBuffer.Buffer, Position, buffer, remainingOffset, readable);
|
Array.Copy(sourceBuffer.Buffer, Position, buffer, offset + read, readable);
|
||||||
remaining -= readable;
|
|
||||||
remainingOffset += readable;
|
|
||||||
|
|
||||||
read += readable;
|
read += readable;
|
||||||
Position += readable;
|
|
||||||
readHead += readable;
|
readHead += readable;
|
||||||
totalBytesRead += readable;
|
|
||||||
|
|
||||||
// We might have to loop the position.
|
|
||||||
Position %= sourceBuffer.BufferSize;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return (int)read;
|
return (int)read;
|
||||||
|
@@ -25,7 +25,6 @@ public class WrappedBufferStream : Stream
|
|||||||
{
|
{
|
||||||
private readonly byte[] sourceBuffer;
|
private readonly byte[] sourceBuffer;
|
||||||
|
|
||||||
private long position;
|
|
||||||
private long totalBytesWritten;
|
private long totalBytesWritten;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -56,7 +55,10 @@ public class WrappedBufferStream : Stream
|
|||||||
public long TotalBytesWritten { get => totalBytesWritten; }
|
public long TotalBytesWritten { get => totalBytesWritten; }
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override long Position { get => position; set => position = value; }
|
public override long Position
|
||||||
|
{
|
||||||
|
get => totalBytesWritten % BufferSize; set { }
|
||||||
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override bool CanRead => false;
|
public override bool CanRead => false;
|
||||||
@@ -81,25 +83,18 @@ public class WrappedBufferStream : Stream
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
public override void Write(byte[] buffer, int offset, int count)
|
public override void Write(byte[] buffer, int offset, int count)
|
||||||
{
|
{
|
||||||
// The bytes that still need to be copied.
|
long written = 0;
|
||||||
long remaining = count;
|
|
||||||
long remainingOffset = offset;
|
|
||||||
|
|
||||||
// Copy inside a loop to simplify wrapping logic.
|
// Copy inside a loop to simplify wrapping logic.
|
||||||
while (remaining > 0)
|
while (written < count)
|
||||||
{
|
{
|
||||||
// The amount of bytes that we can directly write from the current position without wrapping.
|
// The amount of bytes that we can directly write from the current position without wrapping.
|
||||||
long writable = Math.Min(remaining, BufferSize - Position);
|
long writable = Math.Min(count - written, BufferSize - Position);
|
||||||
|
|
||||||
// Copy the data.
|
// Copy the data.
|
||||||
Array.Copy(buffer, remainingOffset, sourceBuffer, Position, writable);
|
Array.Copy(buffer, offset + written, sourceBuffer, Position, writable);
|
||||||
remaining -= writable;
|
written += writable;
|
||||||
remainingOffset += writable;
|
|
||||||
Position += writable;
|
|
||||||
totalBytesWritten += writable;
|
totalBytesWritten += writable;
|
||||||
|
|
||||||
// We might have to wrap the position.
|
|
||||||
Position %= BufferSize;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user