Linux-libre 4.19.8-gnu
[librecmc/linux-libre.git] / drivers / md / dm-kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11
12 #include <linux/types.h>
13 #include <linux/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/delay.h>
26 #include <linux/device-mapper.h>
27 #include <linux/dm-kcopyd.h>
28
29 #include "dm-core.h"
30
31 #define SUB_JOB_SIZE    128
32 #define SPLIT_COUNT     8
33 #define MIN_JOBS        8
34 #define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
35
36 /*-----------------------------------------------------------------
37  * Each kcopyd client has its own little pool of preallocated
38  * pages for kcopyd io.
39  *---------------------------------------------------------------*/
40 struct dm_kcopyd_client {
41         struct page_list *pages;
42         unsigned nr_reserved_pages;
43         unsigned nr_free_pages;
44
45         struct dm_io_client *io_client;
46
47         wait_queue_head_t destroyq;
48
49         mempool_t job_pool;
50
51         struct workqueue_struct *kcopyd_wq;
52         struct work_struct kcopyd_work;
53
54         struct dm_kcopyd_throttle *throttle;
55
56         atomic_t nr_jobs;
57
58 /*
59  * We maintain three lists of jobs:
60  *
61  * i)   jobs waiting for pages
62  * ii)  jobs that have pages, and are waiting for the io to be issued.
63  * iii) jobs that have completed.
64  *
65  * All three of these are protected by job_lock.
66  */
67         spinlock_t job_lock;
68         struct list_head complete_jobs;
69         struct list_head io_jobs;
70         struct list_head pages_jobs;
71 };
72
73 static struct page_list zero_page_list;
74
75 static DEFINE_SPINLOCK(throttle_spinlock);
76
77 /*
78  * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
79  * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
80  * by 2.
81  */
82 #define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
83
84 /*
85  * Sleep this number of milliseconds.
86  *
87  * The value was decided experimentally.
88  * Smaller values seem to cause an increased copy rate above the limit.
89  * The reason for this is unknown but possibly due to jiffies rounding errors
90  * or read/write cache inside the disk.
91  */
92 #define SLEEP_MSEC                      100
93
94 /*
95  * Maximum number of sleep events. There is a theoretical livelock if more
96  * kcopyd clients do work simultaneously which this limit avoids.
97  */
98 #define MAX_SLEEPS                      10
99
100 static void io_job_start(struct dm_kcopyd_throttle *t)
101 {
102         unsigned throttle, now, difference;
103         int slept = 0, skew;
104
105         if (unlikely(!t))
106                 return;
107
108 try_again:
109         spin_lock_irq(&throttle_spinlock);
110
111         throttle = READ_ONCE(t->throttle);
112
113         if (likely(throttle >= 100))
114                 goto skip_limit;
115
116         now = jiffies;
117         difference = now - t->last_jiffies;
118         t->last_jiffies = now;
119         if (t->num_io_jobs)
120                 t->io_period += difference;
121         t->total_period += difference;
122
123         /*
124          * Maintain sane values if we got a temporary overflow.
125          */
126         if (unlikely(t->io_period > t->total_period))
127                 t->io_period = t->total_period;
128
129         if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
130                 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
131                 t->total_period >>= shift;
132                 t->io_period >>= shift;
133         }
134
135         skew = t->io_period - throttle * t->total_period / 100;
136
137         if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
138                 slept++;
139                 spin_unlock_irq(&throttle_spinlock);
140                 msleep(SLEEP_MSEC);
141                 goto try_again;
142         }
143
144 skip_limit:
145         t->num_io_jobs++;
146
147         spin_unlock_irq(&throttle_spinlock);
148 }
149
150 static void io_job_finish(struct dm_kcopyd_throttle *t)
151 {
152         unsigned long flags;
153
154         if (unlikely(!t))
155                 return;
156
157         spin_lock_irqsave(&throttle_spinlock, flags);
158
159         t->num_io_jobs--;
160
161         if (likely(READ_ONCE(t->throttle) >= 100))
162                 goto skip_limit;
163
164         if (!t->num_io_jobs) {
165                 unsigned now, difference;
166
167                 now = jiffies;
168                 difference = now - t->last_jiffies;
169                 t->last_jiffies = now;
170
171                 t->io_period += difference;
172                 t->total_period += difference;
173
174                 /*
175                  * Maintain sane values if we got a temporary overflow.
176                  */
177                 if (unlikely(t->io_period > t->total_period))
178                         t->io_period = t->total_period;
179         }
180
181 skip_limit:
182         spin_unlock_irqrestore(&throttle_spinlock, flags);
183 }
184
185
186 static void wake(struct dm_kcopyd_client *kc)
187 {
188         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
189 }
190
191 /*
192  * Obtain one page for the use of kcopyd.
193  */
194 static struct page_list *alloc_pl(gfp_t gfp)
195 {
196         struct page_list *pl;
197
198         pl = kmalloc(sizeof(*pl), gfp);
199         if (!pl)
200                 return NULL;
201
202         pl->page = alloc_page(gfp);
203         if (!pl->page) {
204                 kfree(pl);
205                 return NULL;
206         }
207
208         return pl;
209 }
210
211 static void free_pl(struct page_list *pl)
212 {
213         __free_page(pl->page);
214         kfree(pl);
215 }
216
217 /*
218  * Add the provided pages to a client's free page list, releasing
219  * back to the system any beyond the reserved_pages limit.
220  */
221 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
222 {
223         struct page_list *next;
224
225         do {
226                 next = pl->next;
227
228                 if (kc->nr_free_pages >= kc->nr_reserved_pages)
229                         free_pl(pl);
230                 else {
231                         pl->next = kc->pages;
232                         kc->pages = pl;
233                         kc->nr_free_pages++;
234                 }
235
236                 pl = next;
237         } while (pl);
238 }
239
240 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
241                             unsigned int nr, struct page_list **pages)
242 {
243         struct page_list *pl;
244
245         *pages = NULL;
246
247         do {
248                 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
249                 if (unlikely(!pl)) {
250                         /* Use reserved pages */
251                         pl = kc->pages;
252                         if (unlikely(!pl))
253                                 goto out_of_memory;
254                         kc->pages = pl->next;
255                         kc->nr_free_pages--;
256                 }
257                 pl->next = *pages;
258                 *pages = pl;
259         } while (--nr);
260
261         return 0;
262
263 out_of_memory:
264         if (*pages)
265                 kcopyd_put_pages(kc, *pages);
266         return -ENOMEM;
267 }
268
269 /*
270  * These three functions resize the page pool.
271  */
272 static void drop_pages(struct page_list *pl)
273 {
274         struct page_list *next;
275
276         while (pl) {
277                 next = pl->next;
278                 free_pl(pl);
279                 pl = next;
280         }
281 }
282
283 /*
284  * Allocate and reserve nr_pages for the use of a specific client.
285  */
286 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
287 {
288         unsigned i;
289         struct page_list *pl = NULL, *next;
290
291         for (i = 0; i < nr_pages; i++) {
292                 next = alloc_pl(GFP_KERNEL);
293                 if (!next) {
294                         if (pl)
295                                 drop_pages(pl);
296                         return -ENOMEM;
297                 }
298                 next->next = pl;
299                 pl = next;
300         }
301
302         kc->nr_reserved_pages += nr_pages;
303         kcopyd_put_pages(kc, pl);
304
305         return 0;
306 }
307
308 static void client_free_pages(struct dm_kcopyd_client *kc)
309 {
310         BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
311         drop_pages(kc->pages);
312         kc->pages = NULL;
313         kc->nr_free_pages = kc->nr_reserved_pages = 0;
314 }
315
316 /*-----------------------------------------------------------------
317  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
318  * for this reason we use a mempool to prevent the client from
319  * ever having to do io (which could cause a deadlock).
320  *---------------------------------------------------------------*/
321 struct kcopyd_job {
322         struct dm_kcopyd_client *kc;
323         struct list_head list;
324         unsigned long flags;
325
326         /*
327          * Error state of the job.
328          */
329         int read_err;
330         unsigned long write_err;
331
332         /*
333          * Either READ or WRITE
334          */
335         int rw;
336         struct dm_io_region source;
337
338         /*
339          * The destinations for the transfer.
340          */
341         unsigned int num_dests;
342         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
343
344         struct page_list *pages;
345
346         /*
347          * Set this to ensure you are notified when the job has
348          * completed.  'context' is for callback to use.
349          */
350         dm_kcopyd_notify_fn fn;
351         void *context;
352
353         /*
354          * These fields are only used if the job has been split
355          * into more manageable parts.
356          */
357         struct mutex lock;
358         atomic_t sub_jobs;
359         sector_t progress;
360         sector_t write_offset;
361
362         struct kcopyd_job *master_job;
363 };
364
365 static struct kmem_cache *_job_cache;
366
367 int __init dm_kcopyd_init(void)
368 {
369         _job_cache = kmem_cache_create("kcopyd_job",
370                                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
371                                 __alignof__(struct kcopyd_job), 0, NULL);
372         if (!_job_cache)
373                 return -ENOMEM;
374
375         zero_page_list.next = &zero_page_list;
376         zero_page_list.page = ZERO_PAGE(0);
377
378         return 0;
379 }
380
381 void dm_kcopyd_exit(void)
382 {
383         kmem_cache_destroy(_job_cache);
384         _job_cache = NULL;
385 }
386
387 /*
388  * Functions to push and pop a job onto the head of a given job
389  * list.
390  */
391 static struct kcopyd_job *pop_io_job(struct list_head *jobs,
392                                      struct dm_kcopyd_client *kc)
393 {
394         struct kcopyd_job *job;
395
396         /*
397          * For I/O jobs, pop any read, any write without sequential write
398          * constraint and sequential writes that are at the right position.
399          */
400         list_for_each_entry(job, jobs, list) {
401                 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
402                         list_del(&job->list);
403                         return job;
404                 }
405
406                 if (job->write_offset == job->master_job->write_offset) {
407                         job->master_job->write_offset += job->source.count;
408                         list_del(&job->list);
409                         return job;
410                 }
411         }
412
413         return NULL;
414 }
415
416 static struct kcopyd_job *pop(struct list_head *jobs,
417                               struct dm_kcopyd_client *kc)
418 {
419         struct kcopyd_job *job = NULL;
420         unsigned long flags;
421
422         spin_lock_irqsave(&kc->job_lock, flags);
423
424         if (!list_empty(jobs)) {
425                 if (jobs == &kc->io_jobs)
426                         job = pop_io_job(jobs, kc);
427                 else {
428                         job = list_entry(jobs->next, struct kcopyd_job, list);
429                         list_del(&job->list);
430                 }
431         }
432         spin_unlock_irqrestore(&kc->job_lock, flags);
433
434         return job;
435 }
436
437 static void push(struct list_head *jobs, struct kcopyd_job *job)
438 {
439         unsigned long flags;
440         struct dm_kcopyd_client *kc = job->kc;
441
442         spin_lock_irqsave(&kc->job_lock, flags);
443         list_add_tail(&job->list, jobs);
444         spin_unlock_irqrestore(&kc->job_lock, flags);
445 }
446
447
448 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
449 {
450         unsigned long flags;
451         struct dm_kcopyd_client *kc = job->kc;
452
453         spin_lock_irqsave(&kc->job_lock, flags);
454         list_add(&job->list, jobs);
455         spin_unlock_irqrestore(&kc->job_lock, flags);
456 }
457
458 /*
459  * These three functions process 1 item from the corresponding
460  * job list.
461  *
462  * They return:
463  * < 0: error
464  *   0: success
465  * > 0: can't process yet.
466  */
467 static int run_complete_job(struct kcopyd_job *job)
468 {
469         void *context = job->context;
470         int read_err = job->read_err;
471         unsigned long write_err = job->write_err;
472         dm_kcopyd_notify_fn fn = job->fn;
473         struct dm_kcopyd_client *kc = job->kc;
474
475         if (job->pages && job->pages != &zero_page_list)
476                 kcopyd_put_pages(kc, job->pages);
477         /*
478          * If this is the master job, the sub jobs have already
479          * completed so we can free everything.
480          */
481         if (job->master_job == job) {
482                 mutex_destroy(&job->lock);
483                 mempool_free(job, &kc->job_pool);
484         }
485         fn(read_err, write_err, context);
486
487         if (atomic_dec_and_test(&kc->nr_jobs))
488                 wake_up(&kc->destroyq);
489
490         cond_resched();
491
492         return 0;
493 }
494
495 static void complete_io(unsigned long error, void *context)
496 {
497         struct kcopyd_job *job = (struct kcopyd_job *) context;
498         struct dm_kcopyd_client *kc = job->kc;
499
500         io_job_finish(kc->throttle);
501
502         if (error) {
503                 if (op_is_write(job->rw))
504                         job->write_err |= error;
505                 else
506                         job->read_err = 1;
507
508                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
509                         push(&kc->complete_jobs, job);
510                         wake(kc);
511                         return;
512                 }
513         }
514
515         if (op_is_write(job->rw))
516                 push(&kc->complete_jobs, job);
517
518         else {
519                 job->rw = WRITE;
520                 push(&kc->io_jobs, job);
521         }
522
523         wake(kc);
524 }
525
526 /*
527  * Request io on as many buffer heads as we can currently get for
528  * a particular job.
529  */
530 static int run_io_job(struct kcopyd_job *job)
531 {
532         int r;
533         struct dm_io_request io_req = {
534                 .bi_op = job->rw,
535                 .bi_op_flags = 0,
536                 .mem.type = DM_IO_PAGE_LIST,
537                 .mem.ptr.pl = job->pages,
538                 .mem.offset = 0,
539                 .notify.fn = complete_io,
540                 .notify.context = job,
541                 .client = job->kc->io_client,
542         };
543
544         /*
545          * If we need to write sequentially and some reads or writes failed,
546          * no point in continuing.
547          */
548         if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
549             job->master_job->write_err)
550                 return -EIO;
551
552         io_job_start(job->kc->throttle);
553
554         if (job->rw == READ)
555                 r = dm_io(&io_req, 1, &job->source, NULL);
556         else
557                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
558
559         return r;
560 }
561
562 static int run_pages_job(struct kcopyd_job *job)
563 {
564         int r;
565         unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
566
567         r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
568         if (!r) {
569                 /* this job is ready for io */
570                 push(&job->kc->io_jobs, job);
571                 return 0;
572         }
573
574         if (r == -ENOMEM)
575                 /* can't complete now */
576                 return 1;
577
578         return r;
579 }
580
581 /*
582  * Run through a list for as long as possible.  Returns the count
583  * of successful jobs.
584  */
585 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
586                         int (*fn) (struct kcopyd_job *))
587 {
588         struct kcopyd_job *job;
589         int r, count = 0;
590
591         while ((job = pop(jobs, kc))) {
592
593                 r = fn(job);
594
595                 if (r < 0) {
596                         /* error this rogue job */
597                         if (op_is_write(job->rw))
598                                 job->write_err = (unsigned long) -1L;
599                         else
600                                 job->read_err = 1;
601                         push(&kc->complete_jobs, job);
602                         break;
603                 }
604
605                 if (r > 0) {
606                         /*
607                          * We couldn't service this job ATM, so
608                          * push this job back onto the list.
609                          */
610                         push_head(jobs, job);
611                         break;
612                 }
613
614                 count++;
615         }
616
617         return count;
618 }
619
620 /*
621  * kcopyd does this every time it's woken up.
622  */
623 static void do_work(struct work_struct *work)
624 {
625         struct dm_kcopyd_client *kc = container_of(work,
626                                         struct dm_kcopyd_client, kcopyd_work);
627         struct blk_plug plug;
628
629         /*
630          * The order that these are called is *very* important.
631          * complete jobs can free some pages for pages jobs.
632          * Pages jobs when successful will jump onto the io jobs
633          * list.  io jobs call wake when they complete and it all
634          * starts again.
635          */
636         blk_start_plug(&plug);
637         process_jobs(&kc->complete_jobs, kc, run_complete_job);
638         process_jobs(&kc->pages_jobs, kc, run_pages_job);
639         process_jobs(&kc->io_jobs, kc, run_io_job);
640         blk_finish_plug(&plug);
641 }
642
643 /*
644  * If we are copying a small region we just dispatch a single job
645  * to do the copy, otherwise the io has to be split up into many
646  * jobs.
647  */
648 static void dispatch_job(struct kcopyd_job *job)
649 {
650         struct dm_kcopyd_client *kc = job->kc;
651         atomic_inc(&kc->nr_jobs);
652         if (unlikely(!job->source.count))
653                 push(&kc->complete_jobs, job);
654         else if (job->pages == &zero_page_list)
655                 push(&kc->io_jobs, job);
656         else
657                 push(&kc->pages_jobs, job);
658         wake(kc);
659 }
660
661 static void segment_complete(int read_err, unsigned long write_err,
662                              void *context)
663 {
664         /* FIXME: tidy this function */
665         sector_t progress = 0;
666         sector_t count = 0;
667         struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
668         struct kcopyd_job *job = sub_job->master_job;
669         struct dm_kcopyd_client *kc = job->kc;
670
671         mutex_lock(&job->lock);
672
673         /* update the error */
674         if (read_err)
675                 job->read_err = 1;
676
677         if (write_err)
678                 job->write_err |= write_err;
679
680         /*
681          * Only dispatch more work if there hasn't been an error.
682          */
683         if ((!job->read_err && !job->write_err) ||
684             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
685                 /* get the next chunk of work */
686                 progress = job->progress;
687                 count = job->source.count - progress;
688                 if (count) {
689                         if (count > SUB_JOB_SIZE)
690                                 count = SUB_JOB_SIZE;
691
692                         job->progress += count;
693                 }
694         }
695         mutex_unlock(&job->lock);
696
697         if (count) {
698                 int i;
699
700                 *sub_job = *job;
701                 sub_job->write_offset = progress;
702                 sub_job->source.sector += progress;
703                 sub_job->source.count = count;
704
705                 for (i = 0; i < job->num_dests; i++) {
706                         sub_job->dests[i].sector += progress;
707                         sub_job->dests[i].count = count;
708                 }
709
710                 sub_job->fn = segment_complete;
711                 sub_job->context = sub_job;
712                 dispatch_job(sub_job);
713
714         } else if (atomic_dec_and_test(&job->sub_jobs)) {
715
716                 /*
717                  * Queue the completion callback to the kcopyd thread.
718                  *
719                  * Some callers assume that all the completions are called
720                  * from a single thread and don't race with each other.
721                  *
722                  * We must not call the callback directly here because this
723                  * code may not be executing in the thread.
724                  */
725                 push(&kc->complete_jobs, job);
726                 wake(kc);
727         }
728 }
729
730 /*
731  * Create some sub jobs to share the work between them.
732  */
733 static void split_job(struct kcopyd_job *master_job)
734 {
735         int i;
736
737         atomic_inc(&master_job->kc->nr_jobs);
738
739         atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
740         for (i = 0; i < SPLIT_COUNT; i++) {
741                 master_job[i + 1].master_job = master_job;
742                 segment_complete(0, 0u, &master_job[i + 1]);
743         }
744 }
745
746 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
747                     unsigned int num_dests, struct dm_io_region *dests,
748                     unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
749 {
750         struct kcopyd_job *job;
751         int i;
752
753         /*
754          * Allocate an array of jobs consisting of one master job
755          * followed by SPLIT_COUNT sub jobs.
756          */
757         job = mempool_alloc(&kc->job_pool, GFP_NOIO);
758         mutex_init(&job->lock);
759
760         /*
761          * set up for the read.
762          */
763         job->kc = kc;
764         job->flags = flags;
765         job->read_err = 0;
766         job->write_err = 0;
767
768         job->num_dests = num_dests;
769         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
770
771         /*
772          * If one of the destination is a host-managed zoned block device,
773          * we need to write sequentially. If one of the destination is a
774          * host-aware device, then leave it to the caller to choose what to do.
775          */
776         if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) {
777                 for (i = 0; i < job->num_dests; i++) {
778                         if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
779                                 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags);
780                                 break;
781                         }
782                 }
783         }
784
785         /*
786          * If we need to write sequentially, errors cannot be ignored.
787          */
788         if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) &&
789             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags))
790                 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags);
791
792         if (from) {
793                 job->source = *from;
794                 job->pages = NULL;
795                 job->rw = READ;
796         } else {
797                 memset(&job->source, 0, sizeof job->source);
798                 job->source.count = job->dests[0].count;
799                 job->pages = &zero_page_list;
800
801                 /*
802                  * Use WRITE ZEROES to optimize zeroing if all dests support it.
803                  */
804                 job->rw = REQ_OP_WRITE_ZEROES;
805                 for (i = 0; i < job->num_dests; i++)
806                         if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
807                                 job->rw = WRITE;
808                                 break;
809                         }
810         }
811
812         job->fn = fn;
813         job->context = context;
814         job->master_job = job;
815         job->write_offset = 0;
816
817         if (job->source.count <= SUB_JOB_SIZE)
818                 dispatch_job(job);
819         else {
820                 job->progress = 0;
821                 split_job(job);
822         }
823 }
824 EXPORT_SYMBOL(dm_kcopyd_copy);
825
826 void dm_kcopyd_zero(struct dm_kcopyd_client *kc,
827                     unsigned num_dests, struct dm_io_region *dests,
828                     unsigned flags, dm_kcopyd_notify_fn fn, void *context)
829 {
830         dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
831 }
832 EXPORT_SYMBOL(dm_kcopyd_zero);
833
834 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
835                                  dm_kcopyd_notify_fn fn, void *context)
836 {
837         struct kcopyd_job *job;
838
839         job = mempool_alloc(&kc->job_pool, GFP_NOIO);
840
841         memset(job, 0, sizeof(struct kcopyd_job));
842         job->kc = kc;
843         job->fn = fn;
844         job->context = context;
845         job->master_job = job;
846
847         atomic_inc(&kc->nr_jobs);
848
849         return job;
850 }
851 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
852
853 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
854 {
855         struct kcopyd_job *job = j;
856         struct dm_kcopyd_client *kc = job->kc;
857
858         job->read_err = read_err;
859         job->write_err = write_err;
860
861         push(&kc->complete_jobs, job);
862         wake(kc);
863 }
864 EXPORT_SYMBOL(dm_kcopyd_do_callback);
865
866 /*
867  * Cancels a kcopyd job, eg. someone might be deactivating a
868  * mirror.
869  */
870 #if 0
871 int kcopyd_cancel(struct kcopyd_job *job, int block)
872 {
873         /* FIXME: finish */
874         return -1;
875 }
876 #endif  /*  0  */
877
878 /*-----------------------------------------------------------------
879  * Client setup
880  *---------------------------------------------------------------*/
881 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
882 {
883         int r;
884         struct dm_kcopyd_client *kc;
885
886         kc = kzalloc(sizeof(*kc), GFP_KERNEL);
887         if (!kc)
888                 return ERR_PTR(-ENOMEM);
889
890         spin_lock_init(&kc->job_lock);
891         INIT_LIST_HEAD(&kc->complete_jobs);
892         INIT_LIST_HEAD(&kc->io_jobs);
893         INIT_LIST_HEAD(&kc->pages_jobs);
894         kc->throttle = throttle;
895
896         r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
897         if (r)
898                 goto bad_slab;
899
900         INIT_WORK(&kc->kcopyd_work, do_work);
901         kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
902         if (!kc->kcopyd_wq) {
903                 r = -ENOMEM;
904                 goto bad_workqueue;
905         }
906
907         kc->pages = NULL;
908         kc->nr_reserved_pages = kc->nr_free_pages = 0;
909         r = client_reserve_pages(kc, RESERVE_PAGES);
910         if (r)
911                 goto bad_client_pages;
912
913         kc->io_client = dm_io_client_create();
914         if (IS_ERR(kc->io_client)) {
915                 r = PTR_ERR(kc->io_client);
916                 goto bad_io_client;
917         }
918
919         init_waitqueue_head(&kc->destroyq);
920         atomic_set(&kc->nr_jobs, 0);
921
922         return kc;
923
924 bad_io_client:
925         client_free_pages(kc);
926 bad_client_pages:
927         destroy_workqueue(kc->kcopyd_wq);
928 bad_workqueue:
929         mempool_exit(&kc->job_pool);
930 bad_slab:
931         kfree(kc);
932
933         return ERR_PTR(r);
934 }
935 EXPORT_SYMBOL(dm_kcopyd_client_create);
936
937 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
938 {
939         /* Wait for completion of all jobs submitted by this client. */
940         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
941
942         BUG_ON(!list_empty(&kc->complete_jobs));
943         BUG_ON(!list_empty(&kc->io_jobs));
944         BUG_ON(!list_empty(&kc->pages_jobs));
945         destroy_workqueue(kc->kcopyd_wq);
946         dm_io_client_destroy(kc->io_client);
947         client_free_pages(kc);
948         mempool_exit(&kc->job_pool);
949         kfree(kc);
950 }
951 EXPORT_SYMBOL(dm_kcopyd_client_destroy);