ReactiveUI Extensions

Searched in ReactiveUI the option to Select(IObservableCache.WatchValue) an Observable, then Switch that, trying to get a default value if initial value isn't found in the IObservableCache, while not producing duplicate entries using StartWith but keeping the semantics of DefaultIfEmpty? I got you covered with this little piece of beautiful engineering.

//
// Copyright(c) 2022 Jöran "AliveDevil" Malek
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//

using System.Reactive.Disposables;

namespace System.Reactive.Linq;

public static class Observables
{
    /// <summary>
    /// Switch and DefaultIfEmpty, on steroids
    /// </summary>
    public static IObservable<T> SwitchWithDefault<T>(this IObservable<IObservable<T>> observable, T value = default)
        => new SwitchWithDefaultObservable<T>(observable, value);

    private class SwitchWithDefaultObservable<TSource> : ObservableBase<TSource>
    {
        private readonly TSource defaultValue;
        private readonly IObservable<IObservable<TSource>> sources;

        public SwitchWithDefaultObservable(IObservable<IObservable<TSource>> sources, TSource defaultValue)
        {
            this.sources = sources;
            this.defaultValue = defaultValue;
        }

        protected override IDisposable SubscribeCore(IObserver<TSource> observer)
        {
            var _ = new _(defaultValue, observer);
            _.Run(sources);
            return _;
        }

        private class _ : ObserverBase<IObservable<TSource>>
        {
            private readonly TSource defaultValue;
            private readonly SerialDisposable innerSerialDisposable = new();
            private readonly IObserver<TSource> observer;
            private readonly SingleAssignmentDisposable upstream = new();

            public _(TSource defaultValue, IObserver<TSource> observer)
            {
                this.defaultValue = defaultValue;
                this.observer = observer;
            }

            public void ForwardOnCompleted()
            {
                observer.OnCompleted();
                Dispose();
            }

            public void ForwardOnError(Exception error)
            {
                observer.OnError(error);
                Dispose();
            }

            public void ForwardOnNext(TSource value) => observer.OnNext(value);

            public void Run(IObservable<IObservable<TSource>> sources)
            {
                upstream.Disposable = sources.Subscribe(this);
            }

            protected override void Dispose(bool disposing)
            {
                if (disposing)
                {
                    innerSerialDisposable?.Dispose();
                }

                base.Dispose(disposing);
            }

            protected override void OnCompletedCore()
            {
                observer.OnCompleted();
            }

            protected override void OnErrorCore(Exception error) => ForwardOnError(error);

            protected override void OnNextCore(IObservable<TSource> value)
            {
                var innerObserver = new InnerObserver(this, defaultValue);
                innerSerialDisposable.Disposable = innerObserver;
                innerObserver.Subscribe(value);
            }

            private class InnerObserver : ObserverBase<TSource>
            {
                private readonly TSource defaultValue;
                private readonly _ parent;
                private readonly SingleAssignmentDisposable upstream = new();

                public InnerObserver(_ parent, TSource defaultValue)
                {
                    this.parent = parent;
                    this.defaultValue = defaultValue;

                    Found = false;
                }

                public bool Found { get; set; }

                public void Subscribe(IObservable<TSource> upstream)
                {
                    this.upstream.Disposable = upstream.SubscribeSafe(this);
                    if (!Found)
                    {
                        OnNext(defaultValue);
                    }
                }

                protected override void Dispose(bool disposing)
                {
                    if (disposing)
                    {
                        upstream.Dispose();
                    }

                    base.Dispose(disposing);
                }

                protected override void OnCompletedCore()
                {
                    Dispose();
                    if (!Found)
                    {
                        OnNextCore(defaultValue);
                        parent.ForwardOnCompleted();
                    }
                }

                protected override void OnErrorCore(Exception error)
                {
                    Dispose();
                    if (!Found)
                    {
                        OnNextCore(defaultValue);
                        parent.ForwardOnError(error);
                    }
                }

                protected override void OnNextCore(TSource value)
                {
                    Found = true;
                    parent.ForwardOnNext(value);
                }
            }
        }
    }
}

Previous Post