Data Streaming
Ignite, bir Ignite cluster’ına büyük miktarlarda sürekli veri akışı enjekte etmek için kullanılabilen bir Data Streaming API sağlar. Data Streaming API, ölçeklenebilir(scalable) ve hataya dayanıklı(fault tolerant) olacak şekilde tasarlanmıştır ve Ignite'a aktarılan veriler için at least once
semantiğini destekler, yani her giriş en az bir kez işlenir.
Veriler, cachele ilişkili bir data streamer aracılığıyla bir cache’e aktarılır. Data streamer’lar, verileri otomatik olarak buffer’a alır ve daha iyi performans için batchler halinde gruplandırır ve birden çok node’a paralel olarak gönderir.
Data Streaming API'si aşağıdaki özellikleri sağlar:
- Bir data streamera eklenen veriler otomatik olarak partitionlanır ve nodelar arasında dağıtılır.
- Verileri colocated bir şekilde aynı anda işleyebilirsiniz.
- Clientlar, akış halindeyken veriler üzerinde eşzamanlı SQL sorguları çalıştırabilir.
Data Streamers
Bir data streamer, belirli bir cachele ilişkilendirilir ve cache’e veri akışı için bir interface sağlar.
Tipik bir senaryoda, bir data streamer nesnesi oluşturulur ve cache’e veri akışı yapmak için onun methodlarından biri kullanılır ve Ignite, gereksiz veri hareketini önlemek için veri girişlerini, partitioning kurallarına göre gruplandırarak, data partitioning ve colocation ile ilgilenir.
Belirli bir cache için data streamer aşağıdaki gibi kullanılabilir;
- ⌨️ .NET Sample
using (var stmr = ignite.GetDataStreamer<int, string>("myCache"))
{
for (var i = 0; i < 1000; i++)
stmr.AddData(i, i.ToString());
}
Processing Data
Yeni veri eklemeden önce özel bir iş mantığı yürütmek gereken durumlarda bir stream reciever kullanılabilir. Bir stream reciever, verileri cache’e depolanmadan önce colocated olacak şekilde işlemek için kullanılır. Bir stream recieverda uygulanan mantık, verilerin depolanacağı node’da yürütülür.
- ⌨️ .NET Sample
private class MyStreamReceiver : IStreamReceiver<int, string>
{
public void Receive(ICache<int, string> cache, ICollection<ICacheEntry<int, string>> entries)
{
foreach (var entry in entries)
{
// do something with the entry
cache.Put(entry.Key, entry.Value);
}
}
}
public static void StreamReceiverDemo()
{
var ignite = Ignition.Start();
using (var stmr = ignite.GetDataStreamer<int, string>("myCache"))
{
stmr.AllowOverwrite = true;
stmr.Receiver = new MyStreamReceiver();
}
}
Bir stream reciever verileri cache’e otomatik olarak koymaz.
Put(…)
methodlarından birinin kullanılması gerekir.
Stream Transformer
Bir stream transformer, streamdeki verileri güncelleyen bir stream reciever’ın kullanışlı bir implementasyonudur. Stream transformerlar, colocation özelliğinden yararlanır ve depolanacağı node’daki verileri günceller.
Aşağıdaki örnekte, text streaminde bulunan her bir farklı sözcük için bir sayacı artırmak üzere bir stream transformer kullanılmakta;
- ⌨️ .NET Sample
class MyEntryProcessor : ICacheEntryProcessor<string, long, object, object>
{
public object Process(IMutableCacheEntry<string, long> e, object arg)
{
//get current count
var val = e.Value;
//increment count by 1
e.Value = val == 0 ? 1L : val + 1;
return null;
}
}
public static void StreamTransformerDemo()
{
var ignite = Ignition.Start(new IgniteConfiguration
{
DiscoverySpi = new TcpDiscoverySpi
{
LocalPort = 48500,
LocalPortRange = 20,
IpFinder = new TcpDiscoveryStaticIpFinder
{
Endpoints = new[]
{
"127.0.0.1:48500..48520"
}
}
}
});
var cfg = new CacheConfiguration("wordCountCache");
var stmCache = ignite.GetOrCreateCache<string, long>(cfg);
using (var stmr = ignite.GetDataStreamer<string, long>(stmCache.Name))
{
//Allow data updates
stmr.AllowOverwrite = true;
//Configure data transformation to count instances of the same word
stmr.Receiver = new StreamTransformer<string, long, object, object>(new MyEntryProcessor());
//stream words into the streamer cache
foreach (var word in GetWords())
{
stmr.AddData(word, 1L);
}
}
Console.WriteLine(stmCache.Get("a"));
Console.WriteLine(stmCache.Get("b"));
}
static IEnumerable<string> GetWords()
{
//populate words list somehow
return Enumerable.Repeat("a", 3).Concat(Enumerable.Repeat("b", 2));
}
Stream Visitor
Stream visitor, streamdeki her key/value pairini ziyaret eden stream reciever’ın başka bir implementasyonudur. Visitor cache’i güncellemez. Bir pair’in cache’te saklanması gerekiyorsa, put(…)
methodlarından biri kullanılmalıdır.
Configuring Data Streamer Thread Pool Size
Data streamer thread pool, data streamerlardan gelen mesajları işlemek için ayrılmıştır.
Varsayılan pool boyutu max(8, toplam çekirdek sayısı)
. Poolboyutunu değiştirmek için IgniteConfiguration.setDataStreamerThreadPoolSize(…)
kullanılır.
- ⌨️ XML Config