Fix restream #101

Merged
Kevinjil merged 5 commits from fix/restream into master 2024-05-16 19:54:49 +00:00
4 changed files with 59 additions and 42 deletions

View File

@@ -73,7 +73,7 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
ParsedName parsed = StreamService.ParseName(channel.Name); ParsedName parsed = StreamService.ParseName(channel.Name);
items.Add(new ChannelInfo() items.Add(new ChannelInfo()
{ {
Id = channel.StreamId.ToString(CultureInfo.InvariantCulture), Id = StreamService.ToGuid(StreamService.LiveTvPrefix, channel.StreamId, 0, 0).ToString(),
Number = channel.Num.ToString(CultureInfo.InvariantCulture), Number = channel.Num.ToString(CultureInfo.InvariantCulture),
ImageUrl = channel.StreamIcon, ImageUrl = channel.StreamIcon,
Name = parsed.Title, Name = parsed.Title,
@@ -168,6 +168,13 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
/// <inheritdoc /> /// <inheritdoc />
public async Task<IEnumerable<ProgramInfo>> GetProgramsAsync(string channelId, DateTime startDateUtc, DateTime endDateUtc, CancellationToken cancellationToken) public async Task<IEnumerable<ProgramInfo>> GetProgramsAsync(string channelId, DateTime startDateUtc, DateTime endDateUtc, CancellationToken cancellationToken)
{ {
Guid guid = Guid.Parse(channelId);
StreamService.FromGuid(guid, out int prefix, out int streamId, out int _, out int _);
if (prefix != StreamService.LiveTvPrefix)
{
throw new ArgumentException("Unsupported channel");
}
string key = $"xtream-epg-{channelId}"; string key = $"xtream-epg-{channelId}";
ICollection<ProgramInfo>? items = null; ICollection<ProgramInfo>? items = null;
if (memoryCache.TryGetValue(key, out ICollection<ProgramInfo>? o)) if (memoryCache.TryGetValue(key, out ICollection<ProgramInfo>? o))
@@ -180,13 +187,12 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
Plugin plugin = Plugin.Instance; Plugin plugin = Plugin.Instance;
using (XtreamClient client = new XtreamClient()) using (XtreamClient client = new XtreamClient())
{ {
int streamId = int.Parse(channelId, CultureInfo.InvariantCulture);
EpgListings epgs = await client.GetEpgInfoAsync(plugin.Creds, streamId, cancellationToken).ConfigureAwait(false); EpgListings epgs = await client.GetEpgInfoAsync(plugin.Creds, streamId, cancellationToken).ConfigureAwait(false);
foreach (EpgInfo epg in epgs.Listings) foreach (EpgInfo epg in epgs.Listings)
{ {
items.Add(new ProgramInfo() items.Add(new ProgramInfo()
{ {
Id = $"epg-{epg.Id}", Id = StreamService.ToGuid(StreamService.EpgPrefix, streamId, epg.Id, 0).ToString(),
ChannelId = channelId, ChannelId = channelId,
StartDate = epg.Start, StartDate = epg.Start,
EndDate = epg.End, EndDate = epg.End,
@@ -213,15 +219,21 @@ public class LiveTvService : ILiveTvService, ISupportsDirectStreamProvider
/// <inheritdoc /> /// <inheritdoc />
public Task<ILiveStream> GetChannelStreamWithDirectStreamProvider(string channelId, string streamId, List<ILiveStream> currentLiveStreams, CancellationToken cancellationToken) public Task<ILiveStream> GetChannelStreamWithDirectStreamProvider(string channelId, string streamId, List<ILiveStream> currentLiveStreams, CancellationToken cancellationToken)
{ {
ILiveStream? stream = currentLiveStreams.Find(stream => stream.TunerHostId == Restream.TunerHost && stream.MediaSource.Id == channelId); Guid guid = Guid.Parse(channelId);
StreamService.FromGuid(guid, out int prefix, out int channel, out int _, out int _);
if (prefix != StreamService.LiveTvPrefix)
{
throw new ArgumentException("Unsupported channel");
}
Plugin plugin = Plugin.Instance;
MediaSourceInfo mediaSourceInfo = plugin.StreamService.GetMediaSourceInfo(StreamType.Live, channel, restream: true);
ILiveStream? stream = currentLiveStreams.Find(stream => stream.TunerHostId == Restream.TunerHost && stream.MediaSource.Id == mediaSourceInfo.Id);
if (stream != null) if (stream != null)
{ {
return Task.FromResult(stream); return Task.FromResult(stream);
} }
Plugin plugin = Plugin.Instance;
int channel = int.Parse(channelId, CultureInfo.InvariantCulture);
MediaSourceInfo mediaSourceInfo = plugin.StreamService.GetMediaSourceInfo(StreamType.Live, channel, restream: true);
stream = new Restream(appHost, httpClientFactory, logger, mediaSourceInfo); stream = new Restream(appHost, httpClientFactory, logger, mediaSourceInfo);
return Task.FromResult(stream); return Task.FromResult(stream);
} }

View File

@@ -81,6 +81,16 @@ public class StreamService
/// </summary> /// </summary>
public const int MediaSourcePrefix = 0x5d774c3d; public const int MediaSourcePrefix = 0x5d774c3d;
/// <summary>
/// The id prefix for Live TV items.
/// </summary>
public const int LiveTvPrefix = 0x5d774c3e;
/// <summary>
/// The id prefix for TV EPG items.
/// </summary>
public const int EpgPrefix = 0x5d774c3f;
private static readonly Regex TagRegex = new Regex(@"\[([^\]]+)\]|\|([^\|]+)\|"); private static readonly Regex TagRegex = new Regex(@"\[([^\]]+)\]|\|([^\|]+)\|");
private readonly ILogger logger; private readonly ILogger logger;

View File

@@ -15,6 +15,7 @@
using System; using System;
using System.IO; using System.IO;
using System.Threading;
namespace Jellyfin.Xtream.Service; namespace Jellyfin.Xtream.Service;
@@ -25,9 +26,8 @@ public class WrappedBufferReadStream : Stream
{ {
private readonly WrappedBufferStream sourceBuffer; private readonly WrappedBufferStream sourceBuffer;
private long position; private readonly long initialReadHead;
private long readHead; private long readHead;
private long totalBytesRead;
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="WrappedBufferReadStream"/> class. /// Initializes a new instance of the <see cref="WrappedBufferReadStream"/> class.
@@ -37,8 +37,7 @@ public class WrappedBufferReadStream : Stream
{ {
this.sourceBuffer = sourceBuffer; this.sourceBuffer = sourceBuffer;
this.readHead = sourceBuffer.TotalBytesWritten; this.readHead = sourceBuffer.TotalBytesWritten;
this.totalBytesRead = 0; this.initialReadHead = readHead;
this.position = sourceBuffer.Position;
} }
/// <summary> /// <summary>
@@ -49,10 +48,13 @@ public class WrappedBufferReadStream : Stream
/// <summary> /// <summary>
/// Gets the number of bytes that have been written to this stream. /// Gets the number of bytes that have been written to this stream.
/// </summary> /// </summary>
public long TotalBytesRead { get => totalBytesRead; } public long TotalBytesRead { get => readHead - initialReadHead; }
/// <inheritdoc /> /// <inheritdoc />
public override long Position { get => position; set => position = value; } public override long Position
{
get => readHead % sourceBuffer.BufferSize; set { }
}
/// <inheritdoc /> /// <inheritdoc />
public override bool CanRead => true; public override bool CanRead => true;
@@ -72,6 +74,14 @@ public class WrappedBufferReadStream : Stream
public override int Read(byte[] buffer, int offset, int count) public override int Read(byte[] buffer, int offset, int count)
{ {
long gap = sourceBuffer.TotalBytesWritten - readHead; long gap = sourceBuffer.TotalBytesWritten - readHead;
// We cannot return with 0 bytes read, as that indicates the end of the stream has been reached
while (gap == 0)
{
Thread.Sleep(1);
gap = sourceBuffer.TotalBytesWritten - readHead;
}
if (gap > sourceBuffer.BufferSize) if (gap > sourceBuffer.BufferSize)
{ {
// TODO: design good handling method. // TODO: design good handling method.
@@ -81,30 +91,20 @@ public class WrappedBufferReadStream : Stream
throw new IOException("Reader cannot keep up"); throw new IOException("Reader cannot keep up");
} }
// The bytes that still need to be copied. // The number of bytes that can be copied.
long remaining = Math.Min(count, sourceBuffer.TotalBytesWritten - readHead); long canCopy = Math.Min(count, gap);
long remainingOffset = offset;
long read = 0; long read = 0;
// Copy inside a loop to simplify wrapping logic. // Copy inside a loop to simplify wrapping logic.
while (remaining > 0) while (read < canCopy)
{ {
// The amount of bytes that we can directly write from the current position without wrapping. // The amount of bytes that we can directly write from the current position without wrapping.
long readable = Math.Min(remaining, sourceBuffer.BufferSize - Position); long readable = Math.Min(canCopy - read, sourceBuffer.BufferSize - Position);
// Copy the data. // Copy the data.
Array.Copy(sourceBuffer.Buffer, Position, buffer, remainingOffset, readable); Array.Copy(sourceBuffer.Buffer, Position, buffer, offset + read, readable);
remaining -= readable;
remainingOffset += readable;
read += readable; read += readable;
Position += readable;
readHead += readable; readHead += readable;
totalBytesRead += readable;
// We might have to loop the position.
Position %= sourceBuffer.BufferSize;
} }
return (int)read; return (int)read;

View File

@@ -25,7 +25,6 @@ public class WrappedBufferStream : Stream
{ {
private readonly byte[] sourceBuffer; private readonly byte[] sourceBuffer;
private long position;
private long totalBytesWritten; private long totalBytesWritten;
/// <summary> /// <summary>
@@ -56,7 +55,10 @@ public class WrappedBufferStream : Stream
public long TotalBytesWritten { get => totalBytesWritten; } public long TotalBytesWritten { get => totalBytesWritten; }
/// <inheritdoc /> /// <inheritdoc />
public override long Position { get => position; set => position = value; } public override long Position
{
get => totalBytesWritten % BufferSize; set { }
}
/// <inheritdoc /> /// <inheritdoc />
public override bool CanRead => false; public override bool CanRead => false;
@@ -81,25 +83,18 @@ public class WrappedBufferStream : Stream
/// <inheritdoc /> /// <inheritdoc />
public override void Write(byte[] buffer, int offset, int count) public override void Write(byte[] buffer, int offset, int count)
{ {
// The bytes that still need to be copied. long written = 0;
long remaining = count;
long remainingOffset = offset;
// Copy inside a loop to simplify wrapping logic. // Copy inside a loop to simplify wrapping logic.
while (remaining > 0) while (written < count)
{ {
// The amount of bytes that we can directly write from the current position without wrapping. // The amount of bytes that we can directly write from the current position without wrapping.
long writable = Math.Min(remaining, BufferSize - Position); long writable = Math.Min(count - written, BufferSize - Position);
// Copy the data. // Copy the data.
Array.Copy(buffer, remainingOffset, sourceBuffer, Position, writable); Array.Copy(buffer, offset + written, sourceBuffer, Position, writable);
remaining -= writable; written += writable;
remainingOffset += writable;
Position += writable;
totalBytesWritten += writable; totalBytesWritten += writable;
// We might have to wrap the position.
Position %= BufferSize;
} }
} }