VirtualBox

source: vbox/trunk/src/libs/liblzma-5.4.1/common/stream_encoder_mt.c@ 98879

Last change on this file since 98879 was 98737, checked in by vboxsync, 21 months ago

lobs/liblzma-5.4.1: Windows build fixes, bugref:10254

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 35.2 KB
Line 
1///////////////////////////////////////////////////////////////////////////////
2//
3/// \file stream_encoder_mt.c
4/// \brief Multithreaded .xz Stream encoder
5//
6// Author: Lasse Collin
7//
8// This file has been put into the public domain.
9// You can do whatever you want with this file.
10//
11///////////////////////////////////////////////////////////////////////////////
12
13#include "filter_encoder.h"
14#include "easy_preset.h"
15#include "block_encoder.h"
16#include "block_buffer_encoder.h"
17#include "index_encoder.h"
18#include "outqueue.h"
19
20
21/// Maximum supported block size. This makes it simpler to prevent integer
22/// overflows if we are given unusually large block size.
23#define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24
25
26typedef enum {
27 /// Waiting for work.
28 THR_IDLE,
29
30 /// Encoding is in progress.
31 THR_RUN,
32
33 /// Encoding is in progress but no more input data will
34 /// be read.
35 THR_FINISH,
36
37 /// The main thread wants the thread to stop whatever it was doing
38 /// but not exit.
39 THR_STOP,
40
41 /// The main thread wants the thread to exit. We could use
42 /// cancellation but since there's stopped anyway, this is lazier.
43 THR_EXIT,
44
45} worker_state;
46
47typedef struct lzma_stream_coder_s lzma_stream_coder;
48
49typedef struct worker_thread_s worker_thread;
50struct worker_thread_s {
51 worker_state state;
52
53 /// Input buffer of coder->block_size bytes. The main thread will
54 /// put new input into this and update in_size accordingly. Once
55 /// no more input is coming, state will be set to THR_FINISH.
56 uint8_t *in;
57
58 /// Amount of data available in the input buffer. This is modified
59 /// only by the main thread.
60 size_t in_size;
61
62 /// Output buffer for this thread. This is set by the main
63 /// thread every time a new Block is started with this thread
64 /// structure.
65 lzma_outbuf *outbuf;
66
67 /// Pointer to the main structure is needed when putting this
68 /// thread back to the stack of free threads.
69 lzma_stream_coder *coder;
70
71 /// The allocator is set by the main thread. Since a copy of the
72 /// pointer is kept here, the application must not change the
73 /// allocator before calling lzma_end().
74 const lzma_allocator *allocator;
75
76 /// Amount of uncompressed data that has already been compressed.
77 uint64_t progress_in;
78
79 /// Amount of compressed data that is ready.
80 uint64_t progress_out;
81
82 /// Block encoder
83 lzma_next_coder block_encoder;
84
85 /// Compression options for this Block
86 lzma_block block_options;
87
88 /// Filter chain for this thread. By copying the filters array
89 /// to each thread it is possible to change the filter chain
90 /// between Blocks using lzma_filters_update().
91 lzma_filter filters[LZMA_FILTERS_MAX + 1];
92
93 /// Next structure in the stack of free worker threads.
94 worker_thread *next;
95
96 mythread_mutex mutex;
97 mythread_cond cond;
98
99 /// The ID of this thread is used to join the thread
100 /// when it's not needed anymore.
101 mythread thread_id;
102};
103
104
105struct lzma_stream_coder_s {
106 enum {
107 SEQ_STREAM_HEADER,
108 SEQ_BLOCK,
109 SEQ_INDEX,
110 SEQ_STREAM_FOOTER,
111 } sequence;
112
113 /// Start a new Block every block_size bytes of input unless
114 /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
115 size_t block_size;
116
117 /// The filter chain to use for the next Block.
118 /// This can be updated using lzma_filters_update()
119 /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
120 lzma_filter filters[LZMA_FILTERS_MAX + 1];
121
122 /// A copy of filters[] will be put here when attempting to get
123 /// a new worker thread. This will be copied to a worker thread
124 /// when a thread becomes free and then this cache is marked as
125 /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
126 /// the filter options from filters[] would get uselessly copied
127 /// multiple times (allocated and freed) when waiting for a new free
128 /// worker thread.
129 ///
130 /// This is freed if filters[] is updated via lzma_filters_update().
131 lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
132
133
134 /// Index to hold sizes of the Blocks
135 lzma_index *index;
136
137 /// Index encoder
138 lzma_next_coder index_encoder;
139
140
141 /// Stream Flags for encoding the Stream Header and Stream Footer.
142 lzma_stream_flags stream_flags;
143
144 /// Buffer to hold Stream Header and Stream Footer.
145 uint8_t header[LZMA_STREAM_HEADER_SIZE];
146
147 /// Read position in header[]
148 size_t header_pos;
149
150
151 /// Output buffer queue for compressed data
152 lzma_outq outq;
153
154 /// How much memory to allocate for each lzma_outbuf.buf
155 size_t outbuf_alloc_size;
156
157
158 /// Maximum wait time if cannot use all the input and cannot
159 /// fill the output buffer. This is in milliseconds.
160 uint32_t timeout;
161
162
163 /// Error code from a worker thread
164 lzma_ret thread_error;
165
166 /// Array of allocated thread-specific structures
167 worker_thread *threads;
168
169 /// Number of structures in "threads" above. This is also the
170 /// number of threads that will be created at maximum.
171 uint32_t threads_max;
172
173 /// Number of thread structures that have been initialized, and
174 /// thus the number of worker threads actually created so far.
175 uint32_t threads_initialized;
176
177 /// Stack of free threads. When a thread finishes, it puts itself
178 /// back into this stack. This starts as empty because threads
179 /// are created only when actually needed.
180 worker_thread *threads_free;
181
182 /// The most recent worker thread to which the main thread writes
183 /// the new input from the application.
184 worker_thread *thr;
185
186
187 /// Amount of uncompressed data in Blocks that have already
188 /// been finished.
189 uint64_t progress_in;
190
191 /// Amount of compressed data in Stream Header + Blocks that
192 /// have already been finished.
193 uint64_t progress_out;
194
195
196 mythread_mutex mutex;
197 mythread_cond cond;
198};
199
200
201/// Tell the main thread that something has gone wrong.
202static void
203worker_error(worker_thread *thr, lzma_ret ret)
204{
205 assert(ret != LZMA_OK);
206 assert(ret != LZMA_STREAM_END);
207
208 mythread_sync(thr->coder->mutex) {
209 if (thr->coder->thread_error == LZMA_OK)
210 thr->coder->thread_error = ret;
211
212 mythread_cond_signal(&thr->coder->cond);
213 }
214
215 return;
216}
217
218
219static worker_state
220worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
221{
222 assert(thr->progress_in == 0);
223 assert(thr->progress_out == 0);
224
225 // Set the Block options.
226 thr->block_options = (lzma_block){
227 .version = 0,
228 .check = thr->coder->stream_flags.check,
229 .compressed_size = thr->outbuf->allocated,
230 .uncompressed_size = thr->coder->block_size,
231 .filters = thr->filters,
232 };
233
234 // Calculate maximum size of the Block Header. This amount is
235 // reserved in the beginning of the buffer so that Block Header
236 // along with Compressed Size and Uncompressed Size can be
237 // written there.
238 lzma_ret ret = lzma_block_header_size(&thr->block_options);
239 if (ret != LZMA_OK) {
240 worker_error(thr, ret);
241 return THR_STOP;
242 }
243
244 // Initialize the Block encoder.
245 ret = lzma_block_encoder_init(&thr->block_encoder,
246 thr->allocator, &thr->block_options);
247 if (ret != LZMA_OK) {
248 worker_error(thr, ret);
249 return THR_STOP;
250 }
251
252 size_t in_pos = 0;
253 size_t in_size = 0;
254
255 *out_pos = thr->block_options.header_size;
256 const size_t out_size = thr->outbuf->allocated;
257
258 do {
259 mythread_sync(thr->mutex) {
260 // Store in_pos and *out_pos into *thr so that
261 // an application may read them via
262 // lzma_get_progress() to get progress information.
263 //
264 // NOTE: These aren't updated when the encoding
265 // finishes. Instead, the final values are taken
266 // later from thr->outbuf.
267 thr->progress_in = in_pos;
268 thr->progress_out = *out_pos;
269
270 while (in_size == thr->in_size
271 && thr->state == THR_RUN)
272 mythread_cond_wait(&thr->cond, &thr->mutex);
273
274 state = thr->state;
275 in_size = thr->in_size;
276 }
277
278 // Return if we were asked to stop or exit.
279 if (state >= THR_STOP)
280 return state;
281
282 lzma_action action = state == THR_FINISH
283 ? LZMA_FINISH : LZMA_RUN;
284
285 // Limit the amount of input given to the Block encoder
286 // at once. This way this thread can react fairly quickly
287 // if the main thread wants us to stop or exit.
288 static const size_t in_chunk_max = 16384;
289 size_t in_limit = in_size;
290 if (in_size - in_pos > in_chunk_max) {
291 in_limit = in_pos + in_chunk_max;
292 action = LZMA_RUN;
293 }
294
295 ret = thr->block_encoder.code(
296 thr->block_encoder.coder, thr->allocator,
297 thr->in, &in_pos, in_limit, thr->outbuf->buf,
298 out_pos, out_size, action);
299 } while (ret == LZMA_OK && *out_pos < out_size);
300
301 switch (ret) {
302 case LZMA_STREAM_END:
303 assert(state == THR_FINISH);
304
305 // Encode the Block Header. By doing it after
306 // the compression, we can store the Compressed Size
307 // and Uncompressed Size fields.
308 ret = lzma_block_header_encode(&thr->block_options,
309 thr->outbuf->buf);
310 if (ret != LZMA_OK) {
311 worker_error(thr, ret);
312 return THR_STOP;
313 }
314
315 break;
316
317 case LZMA_OK:
318 // The data was incompressible. Encode it using uncompressed
319 // LZMA2 chunks.
320 //
321 // First wait that we have gotten all the input.
322 mythread_sync(thr->mutex) {
323 while (thr->state == THR_RUN)
324 mythread_cond_wait(&thr->cond, &thr->mutex);
325
326 state = thr->state;
327 in_size = thr->in_size;
328 }
329
330 if (state >= THR_STOP)
331 return state;
332
333 // Do the encoding. This takes care of the Block Header too.
334 *out_pos = 0;
335 ret = lzma_block_uncomp_encode(&thr->block_options,
336 thr->in, in_size, thr->outbuf->buf,
337 out_pos, out_size);
338
339 // It shouldn't fail.
340 if (ret != LZMA_OK) {
341 worker_error(thr, LZMA_PROG_ERROR);
342 return THR_STOP;
343 }
344
345 break;
346
347 default:
348 worker_error(thr, ret);
349 return THR_STOP;
350 }
351
352 // Set the size information that will be read by the main thread
353 // to write the Index field.
354 thr->outbuf->unpadded_size
355 = lzma_block_unpadded_size(&thr->block_options);
356 assert(thr->outbuf->unpadded_size != 0);
357 thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
358
359 return THR_FINISH;
360}
361
362
363static MYTHREAD_RET_TYPE
364#ifndef VBOX
365worker_start(void *thr_ptr)
366#else
367worker_start(RTTHREAD hThread, void *thr_ptr)
368#endif
369{
370 worker_thread *thr = thr_ptr;
371 worker_state state = THR_IDLE; // Init to silence a warning
372
373 while (true) {
374 // Wait for work.
375 mythread_sync(thr->mutex) {
376 while (true) {
377 // The thread is already idle so if we are
378 // requested to stop, just set the state.
379 if (thr->state == THR_STOP) {
380 thr->state = THR_IDLE;
381 mythread_cond_signal(&thr->cond);
382 }
383
384 state = thr->state;
385 if (state != THR_IDLE)
386 break;
387
388 mythread_cond_wait(&thr->cond, &thr->mutex);
389 }
390 }
391
392 size_t out_pos = 0;
393
394 assert(state != THR_IDLE);
395 assert(state != THR_STOP);
396
397 if (state <= THR_FINISH)
398 state = worker_encode(thr, &out_pos, state);
399
400 if (state == THR_EXIT)
401 break;
402
403 // Mark the thread as idle unless the main thread has
404 // told us to exit. Signal is needed for the case
405 // where the main thread is waiting for the threads to stop.
406 mythread_sync(thr->mutex) {
407 if (thr->state != THR_EXIT) {
408 thr->state = THR_IDLE;
409 mythread_cond_signal(&thr->cond);
410 }
411 }
412
413 mythread_sync(thr->coder->mutex) {
414 // If no errors occurred, make the encoded data
415 // available to be copied out.
416 if (state == THR_FINISH) {
417 thr->outbuf->pos = out_pos;
418 thr->outbuf->finished = true;
419 }
420
421 // Update the main progress info.
422 thr->coder->progress_in
423 += thr->outbuf->uncompressed_size;
424 thr->coder->progress_out += out_pos;
425 thr->progress_in = 0;
426 thr->progress_out = 0;
427
428 // Return this thread to the stack of free threads.
429 thr->next = thr->coder->threads_free;
430 thr->coder->threads_free = thr;
431
432 mythread_cond_signal(&thr->coder->cond);
433 }
434 }
435
436 // Exiting, free the resources.
437 lzma_filters_free(thr->filters, thr->allocator);
438
439 mythread_mutex_destroy(&thr->mutex);
440 mythread_cond_destroy(&thr->cond);
441
442 lzma_next_end(&thr->block_encoder, thr->allocator);
443 lzma_free(thr->in, thr->allocator);
444 return MYTHREAD_RET_VALUE;
445}
446
447
448/// Make the threads stop but not exit. Optionally wait for them to stop.
449static void
450threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
451{
452 // Tell the threads to stop.
453 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
454 mythread_sync(coder->threads[i].mutex) {
455 coder->threads[i].state = THR_STOP;
456 mythread_cond_signal(&coder->threads[i].cond);
457 }
458 }
459
460 if (!wait_for_threads)
461 return;
462
463 // Wait for the threads to settle in the idle state.
464 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
465 mythread_sync(coder->threads[i].mutex) {
466 while (coder->threads[i].state != THR_IDLE)
467 mythread_cond_wait(&coder->threads[i].cond,
468 &coder->threads[i].mutex);
469 }
470 }
471
472 return;
473}
474
475
476/// Stop the threads and free the resources associated with them.
477/// Wait until the threads have exited.
478static void
479threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
480{
481 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
482 mythread_sync(coder->threads[i].mutex) {
483 coder->threads[i].state = THR_EXIT;
484 mythread_cond_signal(&coder->threads[i].cond);
485 }
486 }
487
488 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
489 int ret = mythread_join(coder->threads[i].thread_id);
490 assert(ret == 0);
491 (void)ret;
492 }
493
494 lzma_free(coder->threads, allocator);
495 return;
496}
497
498
499/// Initialize a new worker_thread structure and create a new thread.
500static lzma_ret
501initialize_new_thread(lzma_stream_coder *coder,
502 const lzma_allocator *allocator)
503{
504 worker_thread *thr = &coder->threads[coder->threads_initialized];
505
506 thr->in = lzma_alloc(coder->block_size, allocator);
507 if (thr->in == NULL)
508 return LZMA_MEM_ERROR;
509
510 if (mythread_mutex_init(&thr->mutex))
511 goto error_mutex;
512
513 if (mythread_cond_init(&thr->cond))
514 goto error_cond;
515
516 thr->state = THR_IDLE;
517 thr->allocator = allocator;
518 thr->coder = coder;
519 thr->progress_in = 0;
520 thr->progress_out = 0;
521 thr->block_encoder = LZMA_NEXT_CODER_INIT;
522 thr->filters[0].id = LZMA_VLI_UNKNOWN;
523
524 if (mythread_create(&thr->thread_id, &worker_start, thr))
525 goto error_thread;
526
527 ++coder->threads_initialized;
528 coder->thr = thr;
529
530 return LZMA_OK;
531
532error_thread:
533 mythread_cond_destroy(&thr->cond);
534
535error_cond:
536 mythread_mutex_destroy(&thr->mutex);
537
538error_mutex:
539 lzma_free(thr->in, allocator);
540 return LZMA_MEM_ERROR;
541}
542
543
544static lzma_ret
545get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
546{
547 // If there are no free output subqueues, there is no
548 // point to try getting a thread.
549 if (!lzma_outq_has_buf(&coder->outq))
550 return LZMA_OK;
551
552 // That's also true if we cannot allocate memory for the output
553 // buffer in the output queue.
554 return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
555 coder->outbuf_alloc_size));
556
557 // Make a thread-specific copy of the filter chain. Put it in
558 // the cache array first so that if we cannot get a new thread yet,
559 // the allocation is ready when we try again.
560 if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
561 return_if_error(lzma_filters_copy(
562 coder->filters, coder->filters_cache, allocator));
563
564 // If there is a free structure on the stack, use it.
565 mythread_sync(coder->mutex) {
566 if (coder->threads_free != NULL) {
567 coder->thr = coder->threads_free;
568 coder->threads_free = coder->threads_free->next;
569 }
570 }
571
572 if (coder->thr == NULL) {
573 // If there are no uninitialized structures left, return.
574 if (coder->threads_initialized == coder->threads_max)
575 return LZMA_OK;
576
577 // Initialize a new thread.
578 return_if_error(initialize_new_thread(coder, allocator));
579 }
580
581 // Reset the parts of the thread state that have to be done
582 // in the main thread.
583 mythread_sync(coder->thr->mutex) {
584 coder->thr->state = THR_RUN;
585 coder->thr->in_size = 0;
586 coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
587
588 // Free the old thread-specific filter options and replace
589 // them with the already-allocated new options from
590 // coder->filters_cache[]. Then mark the cache as empty.
591 lzma_filters_free(coder->thr->filters, allocator);
592 memcpy(coder->thr->filters, coder->filters_cache,
593 sizeof(coder->filters_cache));
594 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
595
596 mythread_cond_signal(&coder->thr->cond);
597 }
598
599 return LZMA_OK;
600}
601
602
603static lzma_ret
604stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
605 const uint8_t *restrict in, size_t *restrict in_pos,
606 size_t in_size, lzma_action action)
607{
608 while (*in_pos < in_size
609 || (coder->thr != NULL && action != LZMA_RUN)) {
610 if (coder->thr == NULL) {
611 // Get a new thread.
612 const lzma_ret ret = get_thread(coder, allocator);
613 if (coder->thr == NULL)
614 return ret;
615 }
616
617 // Copy the input data to thread's buffer.
618 size_t thr_in_size = coder->thr->in_size;
619 lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
620 &thr_in_size, coder->block_size);
621
622 // Tell the Block encoder to finish if
623 // - it has got block_size bytes of input; or
624 // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
625 // or LZMA_FULL_BARRIER was used.
626 //
627 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
628 const bool finish = thr_in_size == coder->block_size
629 || (*in_pos == in_size && action != LZMA_RUN);
630
631 bool block_error = false;
632
633 mythread_sync(coder->thr->mutex) {
634 if (coder->thr->state == THR_IDLE) {
635 // Something has gone wrong with the Block
636 // encoder. It has set coder->thread_error
637 // which we will read a few lines later.
638 block_error = true;
639 } else {
640 // Tell the Block encoder its new amount
641 // of input and update the state if needed.
642 coder->thr->in_size = thr_in_size;
643
644 if (finish)
645 coder->thr->state = THR_FINISH;
646
647 mythread_cond_signal(&coder->thr->cond);
648 }
649 }
650
651 if (block_error) {
652#ifndef VBOX
653 lzma_ret ret;
654#else
655 lzma_ret ret = LZMA_OK; /* Shut up msc who can't grok the mythread_sync construct below. */
656#endif
657
658 mythread_sync(coder->mutex) {
659 ret = coder->thread_error;
660 }
661
662 return ret;
663 }
664
665 if (finish)
666 coder->thr = NULL;
667 }
668
669 return LZMA_OK;
670}
671
672
673/// Wait until more input can be consumed, more output can be read, or
674/// an optional timeout is reached.
675static bool
676wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
677 bool *has_blocked, bool has_input)
678{
679 if (coder->timeout != 0 && !*has_blocked) {
680 // Every time when stream_encode_mt() is called via
681 // lzma_code(), *has_blocked starts as false. We set it
682 // to true here and calculate the absolute time when
683 // we must return if there's nothing to do.
684 //
685 // This way if we block multiple times for short moments
686 // less than "timeout" milliseconds, we will return once
687 // "timeout" amount of time has passed since the *first*
688 // blocking occurred. If the absolute time was calculated
689 // again every time we block, "timeout" would effectively
690 // be meaningless if we never consecutively block longer
691 // than "timeout" ms.
692 *has_blocked = true;
693 mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
694 }
695
696 bool timed_out = false;
697
698 mythread_sync(coder->mutex) {
699 // There are four things that we wait. If one of them
700 // becomes possible, we return.
701 // - If there is input left, we need to get a free
702 // worker thread and an output buffer for it.
703 // - Data ready to be read from the output queue.
704 // - A worker thread indicates an error.
705 // - Time out occurs.
706 while ((!has_input || coder->threads_free == NULL
707 || !lzma_outq_has_buf(&coder->outq))
708 && !lzma_outq_is_readable(&coder->outq)
709 && coder->thread_error == LZMA_OK
710 && !timed_out) {
711 if (coder->timeout != 0)
712 timed_out = mythread_cond_timedwait(
713 &coder->cond, &coder->mutex,
714 wait_abs) != 0;
715 else
716 mythread_cond_wait(&coder->cond,
717 &coder->mutex);
718 }
719 }
720
721 return timed_out;
722}
723
724
725static lzma_ret
726stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
727 const uint8_t *restrict in, size_t *restrict in_pos,
728 size_t in_size, uint8_t *restrict out,
729 size_t *restrict out_pos, size_t out_size, lzma_action action)
730{
731 lzma_stream_coder *coder = coder_ptr;
732
733 switch (coder->sequence) {
734 case SEQ_STREAM_HEADER:
735 lzma_bufcpy(coder->header, &coder->header_pos,
736 sizeof(coder->header),
737 out, out_pos, out_size);
738 if (coder->header_pos < sizeof(coder->header))
739 return LZMA_OK;
740
741 coder->header_pos = 0;
742 coder->sequence = SEQ_BLOCK;
743
744 // Fall through
745
746 case SEQ_BLOCK: {
747 // Initialized to silence warnings.
748 lzma_vli unpadded_size = 0;
749 lzma_vli uncompressed_size = 0;
750 lzma_ret ret = LZMA_OK;
751
752 // These are for wait_for_work().
753 bool has_blocked = false;
754 mythread_condtime wait_abs;
755
756 while (true) {
757 mythread_sync(coder->mutex) {
758 // Check for Block encoder errors.
759 ret = coder->thread_error;
760 if (ret != LZMA_OK) {
761 assert(ret != LZMA_STREAM_END);
762 break; // Break out of mythread_sync.
763 }
764
765 // Try to read compressed data to out[].
766 ret = lzma_outq_read(&coder->outq, allocator,
767 out, out_pos, out_size,
768 &unpadded_size,
769 &uncompressed_size);
770 }
771
772 if (ret == LZMA_STREAM_END) {
773 // End of Block. Add it to the Index.
774 ret = lzma_index_append(coder->index,
775 allocator, unpadded_size,
776 uncompressed_size);
777 if (ret != LZMA_OK) {
778 threads_stop(coder, false);
779 return ret;
780 }
781
782 // If we didn't fill the output buffer yet,
783 // try to read more data. Maybe the next
784 // outbuf has been finished already too.
785 if (*out_pos < out_size)
786 continue;
787 }
788
789 if (ret != LZMA_OK) {
790 // coder->thread_error was set.
791 threads_stop(coder, false);
792 return ret;
793 }
794
795 // Try to give uncompressed data to a worker thread.
796 ret = stream_encode_in(coder, allocator,
797 in, in_pos, in_size, action);
798 if (ret != LZMA_OK) {
799 threads_stop(coder, false);
800 return ret;
801 }
802
803 // See if we should wait or return.
804 //
805 // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
806 if (*in_pos == in_size) {
807 // LZMA_RUN: More data is probably coming
808 // so return to let the caller fill the
809 // input buffer.
810 if (action == LZMA_RUN)
811 return LZMA_OK;
812
813 // LZMA_FULL_BARRIER: The same as with
814 // LZMA_RUN but tell the caller that the
815 // barrier was completed.
816 if (action == LZMA_FULL_BARRIER)
817 return LZMA_STREAM_END;
818
819 // Finishing or flushing isn't completed until
820 // all input data has been encoded and copied
821 // to the output buffer.
822 if (lzma_outq_is_empty(&coder->outq)) {
823 // LZMA_FINISH: Continue to encode
824 // the Index field.
825 if (action == LZMA_FINISH)
826 break;
827
828 // LZMA_FULL_FLUSH: Return to tell
829 // the caller that flushing was
830 // completed.
831 if (action == LZMA_FULL_FLUSH)
832 return LZMA_STREAM_END;
833 }
834 }
835
836 // Return if there is no output space left.
837 // This check must be done after testing the input
838 // buffer, because we might want to use a different
839 // return code.
840 if (*out_pos == out_size)
841 return LZMA_OK;
842
843 // Neither in nor out has been used completely.
844 // Wait until there's something we can do.
845 if (wait_for_work(coder, &wait_abs, &has_blocked,
846 *in_pos < in_size))
847 return LZMA_TIMED_OUT;
848 }
849
850 // All Blocks have been encoded and the threads have stopped.
851 // Prepare to encode the Index field.
852 return_if_error(lzma_index_encoder_init(
853 &coder->index_encoder, allocator,
854 coder->index));
855 coder->sequence = SEQ_INDEX;
856
857 // Update the progress info to take the Index and
858 // Stream Footer into account. Those are very fast to encode
859 // so in terms of progress information they can be thought
860 // to be ready to be copied out.
861 coder->progress_out += lzma_index_size(coder->index)
862 + LZMA_STREAM_HEADER_SIZE;
863 }
864
865 // Fall through
866
867 case SEQ_INDEX: {
868 // Call the Index encoder. It doesn't take any input, so
869 // those pointers can be NULL.
870 const lzma_ret ret = coder->index_encoder.code(
871 coder->index_encoder.coder, allocator,
872 NULL, NULL, 0,
873 out, out_pos, out_size, LZMA_RUN);
874 if (ret != LZMA_STREAM_END)
875 return ret;
876
877 // Encode the Stream Footer into coder->buffer.
878 coder->stream_flags.backward_size
879 = lzma_index_size(coder->index);
880 if (lzma_stream_footer_encode(&coder->stream_flags,
881 coder->header) != LZMA_OK)
882 return LZMA_PROG_ERROR;
883
884 coder->sequence = SEQ_STREAM_FOOTER;
885 }
886
887 // Fall through
888
889 case SEQ_STREAM_FOOTER:
890 lzma_bufcpy(coder->header, &coder->header_pos,
891 sizeof(coder->header),
892 out, out_pos, out_size);
893 return coder->header_pos < sizeof(coder->header)
894 ? LZMA_OK : LZMA_STREAM_END;
895 }
896
897 assert(0);
898 return LZMA_PROG_ERROR;
899}
900
901
902static void
903stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
904{
905 lzma_stream_coder *coder = coder_ptr;
906
907 // Threads must be killed before the output queue can be freed.
908 threads_end(coder, allocator);
909 lzma_outq_end(&coder->outq, allocator);
910
911 lzma_filters_free(coder->filters, allocator);
912 lzma_filters_free(coder->filters_cache, allocator);
913
914 lzma_next_end(&coder->index_encoder, allocator);
915 lzma_index_end(coder->index, allocator);
916
917 mythread_cond_destroy(&coder->cond);
918 mythread_mutex_destroy(&coder->mutex);
919
920 lzma_free(coder, allocator);
921 return;
922}
923
924
925static lzma_ret
926stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
927 const lzma_filter *filters,
928 const lzma_filter *reversed_filters
929 lzma_attribute((__unused__)))
930{
931 lzma_stream_coder *coder = coder_ptr;
932
933 // Applications shouldn't attempt to change the options when
934 // we are already encoding the Index or Stream Footer.
935 if (coder->sequence > SEQ_BLOCK)
936 return LZMA_PROG_ERROR;
937
938 // For now the threaded encoder doesn't support changing
939 // the options in the middle of a Block.
940 if (coder->thr != NULL)
941 return LZMA_PROG_ERROR;
942
943 // Check if the filter chain seems mostly valid. See the comment
944 // in stream_encoder_mt_init().
945 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
946 return LZMA_OPTIONS_ERROR;
947
948 // Make a copy to a temporary buffer first. This way the encoder
949 // state stays unchanged if an error occurs in lzma_filters_copy().
950 lzma_filter temp[LZMA_FILTERS_MAX + 1];
951 return_if_error(lzma_filters_copy(filters, temp, allocator));
952
953 // Free the options of the old chain as well as the cache.
954 lzma_filters_free(coder->filters, allocator);
955 lzma_filters_free(coder->filters_cache, allocator);
956
957 // Copy the new filter chain in place.
958 memcpy(coder->filters, temp, sizeof(temp));
959
960 return LZMA_OK;
961}
962
963
964/// Options handling for lzma_stream_encoder_mt_init() and
965/// lzma_stream_encoder_mt_memusage()
966static lzma_ret
967get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
968 const lzma_filter **filters, uint64_t *block_size,
969 uint64_t *outbuf_size_max)
970{
971 // Validate some of the options.
972 if (options == NULL)
973 return LZMA_PROG_ERROR;
974
975 if (options->flags != 0 || options->threads == 0
976 || options->threads > LZMA_THREADS_MAX)
977 return LZMA_OPTIONS_ERROR;
978
979 if (options->filters != NULL) {
980 // Filter chain was given, use it as is.
981 *filters = options->filters;
982 } else {
983 // Use a preset.
984 if (lzma_easy_preset(opt_easy, options->preset))
985 return LZMA_OPTIONS_ERROR;
986
987 *filters = opt_easy->filters;
988 }
989
990 // Block size
991 if (options->block_size > 0) {
992 if (options->block_size > BLOCK_SIZE_MAX)
993 return LZMA_OPTIONS_ERROR;
994
995 *block_size = options->block_size;
996 } else {
997 // Determine the Block size from the filter chain.
998 *block_size = lzma_mt_block_size(*filters);
999 if (*block_size == 0)
1000 return LZMA_OPTIONS_ERROR;
1001
1002 assert(*block_size <= BLOCK_SIZE_MAX);
1003 }
1004
1005 // Calculate the maximum amount output that a single output buffer
1006 // may need to hold. This is the same as the maximum total size of
1007 // a Block.
1008 *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
1009 if (*outbuf_size_max == 0)
1010 return LZMA_MEM_ERROR;
1011
1012 return LZMA_OK;
1013}
1014
1015
1016static void
1017get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
1018{
1019 lzma_stream_coder *coder = coder_ptr;
1020
1021 // Lock coder->mutex to prevent finishing threads from moving their
1022 // progress info from the worker_thread structure to lzma_stream_coder.
1023 mythread_sync(coder->mutex) {
1024 *progress_in = coder->progress_in;
1025 *progress_out = coder->progress_out;
1026
1027 for (size_t i = 0; i < coder->threads_initialized; ++i) {
1028 mythread_sync(coder->threads[i].mutex) {
1029 *progress_in += coder->threads[i].progress_in;
1030 *progress_out += coder->threads[i]
1031 .progress_out;
1032 }
1033 }
1034 }
1035
1036 return;
1037}
1038
1039
1040static lzma_ret
1041stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
1042 const lzma_mt *options)
1043{
1044 lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
1045
1046 // Get the filter chain.
1047 lzma_options_easy easy;
1048 const lzma_filter *filters;
1049 uint64_t block_size;
1050 uint64_t outbuf_size_max;
1051 return_if_error(get_options(options, &easy, &filters,
1052 &block_size, &outbuf_size_max));
1053
1054#if SIZE_MAX < UINT64_MAX
1055 if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
1056 return LZMA_MEM_ERROR;
1057#endif
1058
1059 // Validate the filter chain so that we can give an error in this
1060 // function instead of delaying it to the first call to lzma_code().
1061 // The memory usage calculation verifies the filter chain as
1062 // a side effect so we take advantage of that. It's not a perfect
1063 // check though as raw encoder allows LZMA1 too but such problems
1064 // will be caught eventually with Block Header encoder.
1065 if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
1066 return LZMA_OPTIONS_ERROR;
1067
1068 // Validate the Check ID.
1069 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
1070 return LZMA_PROG_ERROR;
1071
1072 if (!lzma_check_is_supported(options->check))
1073 return LZMA_UNSUPPORTED_CHECK;
1074
1075 // Allocate and initialize the base structure if needed.
1076 lzma_stream_coder *coder = next->coder;
1077 if (coder == NULL) {
1078 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
1079 if (coder == NULL)
1080 return LZMA_MEM_ERROR;
1081
1082 next->coder = coder;
1083
1084 // For the mutex and condition variable initializations
1085 // the error handling has to be done here because
1086 // stream_encoder_mt_end() doesn't know if they have
1087 // already been initialized or not.
1088 if (mythread_mutex_init(&coder->mutex)) {
1089 lzma_free(coder, allocator);
1090 next->coder = NULL;
1091 return LZMA_MEM_ERROR;
1092 }
1093
1094 if (mythread_cond_init(&coder->cond)) {
1095 mythread_mutex_destroy(&coder->mutex);
1096 lzma_free(coder, allocator);
1097 next->coder = NULL;
1098 return LZMA_MEM_ERROR;
1099 }
1100
1101 next->code = &stream_encode_mt;
1102 next->end = &stream_encoder_mt_end;
1103 next->get_progress = &get_progress;
1104 next->update = &stream_encoder_mt_update;
1105
1106 coder->filters[0].id = LZMA_VLI_UNKNOWN;
1107 coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
1108 coder->index_encoder = LZMA_NEXT_CODER_INIT;
1109 coder->index = NULL;
1110 memzero(&coder->outq, sizeof(coder->outq));
1111 coder->threads = NULL;
1112 coder->threads_max = 0;
1113 coder->threads_initialized = 0;
1114 }
1115
1116 // Basic initializations
1117 coder->sequence = SEQ_STREAM_HEADER;
1118 coder->block_size = (size_t)(block_size);
1119 coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
1120 coder->thread_error = LZMA_OK;
1121 coder->thr = NULL;
1122
1123 // Allocate the thread-specific base structures.
1124 assert(options->threads > 0);
1125 if (coder->threads_max != options->threads) {
1126 threads_end(coder, allocator);
1127
1128 coder->threads = NULL;
1129 coder->threads_max = 0;
1130
1131 coder->threads_initialized = 0;
1132 coder->threads_free = NULL;
1133
1134 coder->threads = lzma_alloc(
1135 options->threads * sizeof(worker_thread),
1136 allocator);
1137 if (coder->threads == NULL)
1138 return LZMA_MEM_ERROR;
1139
1140 coder->threads_max = options->threads;
1141 } else {
1142 // Reuse the old structures and threads. Tell the running
1143 // threads to stop and wait until they have stopped.
1144 threads_stop(coder, true);
1145 }
1146
1147 // Output queue
1148 return_if_error(lzma_outq_init(&coder->outq, allocator,
1149 options->threads));
1150
1151 // Timeout
1152 coder->timeout = options->timeout;
1153
1154 // Free the old filter chain and the cache.
1155 lzma_filters_free(coder->filters, allocator);
1156 lzma_filters_free(coder->filters_cache, allocator);
1157
1158 // Copy the new filter chain.
1159 return_if_error(lzma_filters_copy(
1160 filters, coder->filters, allocator));
1161
1162 // Index
1163 lzma_index_end(coder->index, allocator);
1164 coder->index = lzma_index_init(allocator);
1165 if (coder->index == NULL)
1166 return LZMA_MEM_ERROR;
1167
1168 // Stream Header
1169 coder->stream_flags.version = 0;
1170 coder->stream_flags.check = options->check;
1171 return_if_error(lzma_stream_header_encode(
1172 &coder->stream_flags, coder->header));
1173
1174 coder->header_pos = 0;
1175
1176 // Progress info
1177 coder->progress_in = 0;
1178 coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1179
1180 return LZMA_OK;
1181}
1182
1183
1184#ifdef HAVE_SYMBOL_VERSIONS_LINUX
1185// These are for compatibility with binaries linked against liblzma that
1186// has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
1187// Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
1188// but it has been added here anyway since someone might misread the
1189// RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
1190LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
1191 lzma_ret, lzma_stream_encoder_mt_512a)(
1192 lzma_stream *strm, const lzma_mt *options)
1193 lzma_nothrow lzma_attr_warn_unused_result
1194 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1195
1196LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
1197 lzma_ret, lzma_stream_encoder_mt_522)(
1198 lzma_stream *strm, const lzma_mt *options)
1199 lzma_nothrow lzma_attr_warn_unused_result
1200 __attribute__((__alias__("lzma_stream_encoder_mt_52")));
1201
1202LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
1203 lzma_ret, lzma_stream_encoder_mt_52)(
1204 lzma_stream *strm, const lzma_mt *options)
1205 lzma_nothrow lzma_attr_warn_unused_result;
1206
1207#define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
1208#endif
1209extern LZMA_API(lzma_ret)
1210lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1211{
1212 lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1213
1214 strm->internal->supported_actions[LZMA_RUN] = true;
1215// strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1216 strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1217 strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1218 strm->internal->supported_actions[LZMA_FINISH] = true;
1219
1220 return LZMA_OK;
1221}
1222
1223
1224#ifdef HAVE_SYMBOL_VERSIONS_LINUX
1225LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
1226 uint64_t, lzma_stream_encoder_mt_memusage_512a)(
1227 const lzma_mt *options) lzma_nothrow lzma_attr_pure
1228 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1229
1230LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
1231 uint64_t, lzma_stream_encoder_mt_memusage_522)(
1232 const lzma_mt *options) lzma_nothrow lzma_attr_pure
1233 __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1234
1235LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
1236 uint64_t, lzma_stream_encoder_mt_memusage_52)(
1237 const lzma_mt *options) lzma_nothrow lzma_attr_pure;
1238
1239#define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
1240#endif
1241// This function name is a monster but it's consistent with the older
1242// monster names. :-( 31 chars is the max that C99 requires so in that
1243// sense it's not too long. ;-)
1244extern LZMA_API(uint64_t)
1245lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1246{
1247 lzma_options_easy easy;
1248 const lzma_filter *filters;
1249 uint64_t block_size;
1250 uint64_t outbuf_size_max;
1251
1252 if (get_options(options, &easy, &filters, &block_size,
1253 &outbuf_size_max) != LZMA_OK)
1254 return UINT64_MAX;
1255
1256 // Memory usage of the input buffers
1257 const uint64_t inbuf_memusage = options->threads * block_size;
1258
1259 // Memory usage of the filter encoders
1260 uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1261 if (filters_memusage == UINT64_MAX)
1262 return UINT64_MAX;
1263
1264 filters_memusage *= options->threads;
1265
1266 // Memory usage of the output queue
1267 const uint64_t outq_memusage = lzma_outq_memusage(
1268 outbuf_size_max, options->threads);
1269 if (outq_memusage == UINT64_MAX)
1270 return UINT64_MAX;
1271
1272 // Sum them with overflow checking.
1273 uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1274 + sizeof(lzma_stream_coder)
1275 + options->threads * sizeof(worker_thread);
1276
1277 if (UINT64_MAX - total_memusage < inbuf_memusage)
1278 return UINT64_MAX;
1279
1280 total_memusage += inbuf_memusage;
1281
1282 if (UINT64_MAX - total_memusage < filters_memusage)
1283 return UINT64_MAX;
1284
1285 total_memusage += filters_memusage;
1286
1287 if (UINT64_MAX - total_memusage < outq_memusage)
1288 return UINT64_MAX;
1289
1290 return total_memusage + outq_memusage;
1291}
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette