Ana içeriğe geç
Version: 1.0.1

Data Streaming

Ignite, bir Ignite cluster’ına büyük miktarlarda sürekli veri akışı enjekte etmek için kullanılabilen bir Data Streaming API sağlar. Data Streaming API, ölçeklenebilir(scalable) ve hataya dayanıklı(fault tolerant) olacak şekilde tasarlanmıştır ve Ignite'a aktarılan veriler için at least once semantiğini destekler, yani her giriş en az bir kez işlenir.

Veriler, cachele ilişkili bir data streamer aracılığıyla bir cache’e aktarılır. Data streamer’lar, verileri otomatik olarak buffer’a alır ve daha iyi performans için batchler halinde gruplandırır ve birden çok node’a paralel olarak gönderir.

Data Streaming API'si aşağıdaki özellikleri sağlar:

  • Bir data streamera eklenen veriler otomatik olarak partitionlanır ve nodelar arasında dağıtılır.
  • Verileri colocated bir şekilde aynı anda işleyebilirsiniz.
  • Clientlar, akış halindeyken veriler üzerinde eşzamanlı SQL sorguları çalıştırabilir.

data_streaming.png

Data Streamers

Bir data streamer, belirli bir cachele ilişkilendirilir ve cache’e veri akışı için bir interface sağlar.

Tipik bir senaryoda, bir data streamer nesnesi oluşturulur ve cache’e veri akışı yapmak için onun methodlarından biri kullanılır ve Ignite, gereksiz veri hareketini önlemek için veri girişlerini, partitioning kurallarına göre gruplandırarak, data partitioning ve colocation ile ilgilenir.

Belirli bir cache için data streamer aşağıdaki gibi kullanılabilir;

  • ⌨️ .NET Sample
    using (var stmr = ignite.GetDataStreamer<int, string>("myCache"))
    {
    for (var i = 0; i < 1000; i++)
    stmr.AddData(i, i.ToString());
    }

Processing Data

Yeni veri eklemeden önce özel bir iş mantığı yürütmek gereken durumlarda bir stream reciever kullanılabilir. Bir stream reciever, verileri cache’e depolanmadan önce colocated olacak şekilde işlemek için kullanılır. Bir stream recieverda uygulanan mantık, verilerin depolanacağı node’da yürütülür.

  • ⌨️ .NET Sample
    private class MyStreamReceiver : IStreamReceiver<int, string>
    {
    public void Receive(ICache<int, string> cache, ICollection<ICacheEntry<int, string>> entries)
    {
    foreach (var entry in entries)
    {
    // do something with the entry

    cache.Put(entry.Key, entry.Value);
    }
    }
    }

    public static void StreamReceiverDemo()
    {
    var ignite = Ignition.Start();

    using (var stmr = ignite.GetDataStreamer<int, string>("myCache"))
    {
    stmr.AllowOverwrite = true;
    stmr.Receiver = new MyStreamReceiver();
    }
    }

Bir stream reciever verileri cache’e otomatik olarak koymaz. Put(…) methodlarından birinin kullanılması gerekir.

Stream Transformer

Bir stream transformer, streamdeki verileri güncelleyen bir stream reciever’ın kullanışlı bir implementasyonudur. Stream transformerlar, colocation özelliğinden yararlanır ve depolanacağı node’daki verileri günceller.

Aşağıdaki örnekte, text streaminde bulunan her bir farklı sözcük için bir sayacı artırmak üzere bir stream transformer kullanılmakta;

  • ⌨️ .NET Sample
    class MyEntryProcessor : ICacheEntryProcessor<string, long, object, object>
    {
    public object Process(IMutableCacheEntry<string, long> e, object arg)
    {
    //get current count
    var val = e.Value;

    //increment count by 1
    e.Value = val == 0 ? 1L : val + 1;

    return null;
    }
    }

    public static void StreamTransformerDemo()
    {
    var ignite = Ignition.Start(new IgniteConfiguration
    {
    DiscoverySpi = new TcpDiscoverySpi
    {
    LocalPort = 48500,
    LocalPortRange = 20,
    IpFinder = new TcpDiscoveryStaticIpFinder
    {
    Endpoints = new[]
    {
    "127.0.0.1:48500..48520"
    }
    }
    }
    });
    var cfg = new CacheConfiguration("wordCountCache");
    var stmCache = ignite.GetOrCreateCache<string, long>(cfg);

    using (var stmr = ignite.GetDataStreamer<string, long>(stmCache.Name))
    {
    //Allow data updates
    stmr.AllowOverwrite = true;

    //Configure data transformation to count instances of the same word
    stmr.Receiver = new StreamTransformer<string, long, object, object>(new MyEntryProcessor());

    //stream words into the streamer cache
    foreach (var word in GetWords())
    {
    stmr.AddData(word, 1L);
    }
    }

    Console.WriteLine(stmCache.Get("a"));
    Console.WriteLine(stmCache.Get("b"));
    }

    static IEnumerable<string> GetWords()
    {
    //populate words list somehow
    return Enumerable.Repeat("a", 3).Concat(Enumerable.Repeat("b", 2));
    }

Stream Visitor

Stream visitor, streamdeki her key/value pairini ziyaret eden stream reciever’ın başka bir implementasyonudur. Visitor cache’i güncellemez. Bir pair’in cache’te saklanması gerekiyorsa, put(…) methodlarından biri kullanılmalıdır.

Detaylı bilgi için…

Configuring Data Streamer Thread Pool Size

Data streamer thread pool, data streamerlardan gelen mesajları işlemek için ayrılmıştır.

Varsayılan pool boyutu max(8, toplam çekirdek sayısı). Poolboyutunu değiştirmek için IgniteConfiguration.setDataStreamerThreadPoolSize(…) kullanılır.

  • ⌨️ XML Config