Správičky 59 Blogy 948 Fórum 18 713

Producer Consumer asynchronne

photo
harrison314
23. 5. 2018 14:31:07
Body: 1045
Najaktívnejší č.: 23

Producer Consumer asynchronne

Ahojte, mam programtorsku hadanku.

Dnes som narazil na starsi kod jednoducheej windows sluzby: robi FileWatch nad foldrom, pri vytvoreni suboru v danom foldri, vytvori DTO s informaciou o subore a posle ho do blokujucej Queu (klasicka Queu + Semafor). Z danej Queu vyberaju tieto DTO worker thready a dany subor spracuju (zavolaju WS  api a subor presunu). Co je dolezite pomocou poctu worker threadov ide riadit uroven paralelizmu a tym aj vytazovanie servera (aj toho na ktorom bezi tento windows servis, aj toho ktory poskytuje webove sluzby).

 

Ako rovnake spravanie implementujem pomocou asynchronneho API (povedzme, ze by som pouzil async varianty volania webovych sluzieb), hlavne je aby som vedel riadit uroven paralilizacie ako doteraz a pri tom nemucil Thread Pool?

To znamena, ze nebudem riesit vramci tych tredov nieco ako:

Task.StartNew(()=>{
....
client.CallAnyithingAsync().GetAwaiter().GetResult();
...
});

co zablokuje rovno 3 thready v ThreadPoole.

Ako by ste to riesili, alebo mi nieco unika?



Zvycajne odpovede na Stack Owerflow obsahovali daco ako:

 

[Reakcia]

photo
liero
25. 5. 2018 12:17:21
Body: 7670
Najaktívnejší č.: 5

RE: Producer Consumer asynchronne

1. Co je client
2. Preco to mas obalene v Task.StartNew()?
3. Ukaz, ako ako funguje worker thread teraz. Bude tam zrejme nejaky loop da dequeue

[Reakcia]

photo
harrison314
25. 5. 2018 20:36:37
Body: 1045
Najaktívnejší č.: 23

RE: Producer Consumer asynchronne

1. , 2. Formatovanie zblblo to co som ukazoval bolo to co som na tuto odpoved nasiel na stack owerflov ako ukazku zleho rienia, ktore ti zablokuje 3 thready v poole.

Mne skor ide o vseobecnu ukazku, alebo hint ako to iste spravit asynchornne, lebo  kazdy worker thread vetcinu casu caka na server alebo presuva bajty na disk.

 

Ukazka v psudokode, co to robi teraz:

private void Process()
{
    while(this.CanselationToken.IsCancellationRequested)
    {
        FileProcessInfo info = this.Queue.Deque(this.CanselationToken); //blokujuca fronta
        if(info==null) return;
        PdfSigner signer = new PdfSigner(info.Configuration);
        DocumentId  docId = null;
        using(FileStream fs = new FileSTream(info.SoursePath, FileMode.Open, FileAccess.Read))
        {
            docId = signer.Upload(signer.SoursePath, fs); //toto moze byt async operacia
        }

       // nejaky processing 
        DocumentId signedDoc = signer.Sign(info.PrivateKeyId, visualSignature); //toto moze byt async operacia
            
        using(FileStream fs = new FileSTream(destnation, FileMode.Create, FileAccess.Write))
        {
            using(Stream  networkStream = signer.GetDocumentContent(signedDoc)) //toto moze byt async operacia
            {
                networkStream.CopyTo(fs);//toto moze byt async operacia
            }
        }
    }
}

 

[Reakcia]

photo
liero
30. 5. 2018 10:09:54
Body: 7670
Najaktívnejší č.: 5

RE: Producer Consumer asynchronne

Stale nerozumiem v com je problem. Sprav z toho async Task Process() a zavolaj to paralelne. Nieco na sposob Paralel.Foreach().

Ak this.Queue.Deque nieje threadsafe, daj tam lock.

[Reakcia]

photo
liero
30. 5. 2018 13:31:17
Body: 7670
Najaktívnejší č.: 5

RE: Producer Consumer asynchronne

Pokial len hladas asynchronnu nahradu za Parallel.ForEach, tak skus toto:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 

    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}

[Reakcia]

photo
liero
30. 5. 2018 13:55:22
Body: 7670
Najaktívnejší č.: 5

RE: Producer Consumer asynchronne

pripadne takto:

public class Program
{
	readonly static ConcurrentQueue<string> _queue = new ConcurrentQueue<string>();
	static Random rnd = new Random();
	static Stopwatch timer;
	
	static IEnumerable<string> YieldQueue()
	{
		while (_queue.TryDequeue(out string item))
		{
			//if (_cancellationToken.IsCancellationRequested) break;
			Console.WriteLine("Yielding " + item);
			yield return item;
		}
	}

	public static void Main()
	{
		Console.WriteLine("Hello World");
		for (int i = 0; i < 15; i++)
		{
			_queue.Enqueue("Item " + i);
		}
		
		ForEachAsync(YieldQueue(), 3, ProcessItem).Wait();
	}

	public static async Task ProcessItem(string item)
	{
		Console.WriteLine("Processing " + item);
		await Task.Delay(rnd.Next(0,1000));
		Console.WriteLine("Done " + item);
	}

	public static Task ForEachAsync<T>(IEnumerable<T> source, int dop, Func<T, Task> body)
	{
		return Task.WhenAll(
			from partition in Partitioner.Create(source).GetPartitions(dop)select Task.Run(async delegate
			{
				using (partition)
					while (partition.MoveNext())
						await body(partition.Current);
			}

			));
	}
}
Hello World
Yielding Item 0
Processing Item 0
Yielding Item 1
Processing Item 1
Yielding Item 2
Processing Item 2
Done Item 2
Yielding Item 3
Processing Item 3
Done Item 1
Yielding Item 4
Processing Item 4
Done Item 3
Yielding Item 5
Processing Item 5
Done Item 0
Yielding Item 6
Processing Item 6
Done Item 6
Yielding Item 7
Processing Item 7
Done Item 5
Yielding Item 8
Yielding Item 9
Processing Item 8
Done Item 4
Yielding Item 10
Processing Item 10
Done Item 8
Processing Item 9
Done Item 7
Yielding Item 11
Yielding Item 12
Processing Item 11
Done Item 10
Yielding Item 13
Yielding Item 14
Processing Item 13
Done Item 9
Done Item 11
Processing Item 12
Done Item 13
Processing Item 14
Done Item 14
Done Item 12



no v skutocnosti Queue a tym padom ani YieldQueue vobec nepotrebujes a mozes v pohode pouzit akukolvek kolekciu

 

[Reakcia]

photo
harrison314
31. 5. 2018 11:03:01
Body: 1045
Najaktívnejší č.: 23

RE: Producer Consumer asynchronne

To co si napisal nerobi to co som chcel... podarilo sa mi spravit pseudopriklad:

class AsyncQueue<T>
    {
        private readonly SemaphoreSlim semaphore;
        private readonly Queue<T> queu;

        public AsyncQueue()
        {
            this.semaphore = new SemaphoreSlim(0, int.MaxValue);
            this.queu = new Queue<T>();
        }

        public void Enqueue(T element)
        {

            lock (this.queu)
            {
                this.queu.Enqueue(element);
            }

            this.semaphore.Release();
        }

        public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
        {
            await this.semaphore.WaitAsync(cancellationToken);
            if (cancellationToken.IsCancellationRequested)
            {
                return default(T);
            }

            lock (this.queu)
            {
                if (this.queu.Count > 0)
                {
                    return this.queu.Dequeue();
                }
                else
                {
                    return default(T); //queue empty
                }
            }
        }
    }

    public class RealWork
    {
        public static async Task WorkAsync(int item, int workerId)
        {
            Console.WriteLine("Start processing {0} on worker {1}", item, workerId);
            await Task.Delay(Math.Abs(Guid.NewGuid().GetHashCode()) % 1000);
            Console.WriteLine("Stop processing {0}", item);
        }
    }
    class AsyncTesting
    {
        private readonly AsyncQueue<int> queue;
        private readonly CancellationTokenSource tokenSource;
       

        public AsyncTesting()
        {
            this.queue = new AsyncQueue<int>();
            this.tokenSource = new CancellationTokenSource();
        }

        public void InsertItem(int item)
        {
            this.queue.Enqueue(item);
        }

        public void Start(int dop)
        {
            for (int i = 0; i < dop; i++)
            {
                this.WorkerBody(i);
            }
        }

        public void Stop()
        {
            this.tokenSource.Cancel();
        }

        private async Task WorkerBody(int workerId)
        {
            await Task.Yield();

            Console.WriteLine("Start worker {0}", workerId);
            CancellationToken token = this.tokenSource.Token;

            while (!token.IsCancellationRequested)
            {
                int item = await this.queue.DequeueAsync(token);
                if (token.IsCancellationRequested)
                {
                    break;
                }

                await RealWork.WorkAsync(item, workerId);
            }

            Console.WriteLine("Stop worker {0}", workerId);
        }
    }


static void Main(string[] args)
        {
            AsyncTesting asyncTesting = new AsyncTesting();

            Thread t = new Thread(() =>
            {
                int counter = 1;
                Random random = new Random(DateTime.Now.Millisecond);

                for (int i = 0; i < 500; i++)
                {
                    Thread.Sleep(random.Next(50, 1000));
                    asyncTesting.InsertItem(counter++);
                }
            });

            t.Start();

            asyncTesting.Start(3);

            Console.ReadLine();
            asyncTesting.Stop();
}

 

[Reakcia]

photo
vlko
25. 6. 2018 9:24:47
Body: 10815
Najaktívnejší č.: 3

RE: Producer Consumer asynchronne

Nebolo by to rychlejsie napisane v RX?  nahadzes to do nejakeho subjectu, ktory bude vytvarat async operacie a cez .Merge(pocet threadov) budes tie async operacie firovat tak paralelne ako budes chciet.

[Reakcia]



Najaktívnejší užívatelia
1. 17250 b. photo T
2. 15450 b. photo Anonymous
3. 10815 b. photo vlko
4. 9275 b. photo spigi
5. 7670 b. photo Liero
6. 6230 b. photo siro
7. 4395 b. photo duracellko
8. 4180 b. photo xxxmatko
9. 3780 b. photo dudok
10. 3750 b. photo lubolacko