1 | ///////////////////////////////////////////////////////////////////////////////
|
---|
2 | //
|
---|
3 | /// \file stream_encoder_mt.c
|
---|
4 | /// \brief Multithreaded .xz Stream encoder
|
---|
5 | //
|
---|
6 | // Author: Lasse Collin
|
---|
7 | //
|
---|
8 | // This file has been put into the public domain.
|
---|
9 | // You can do whatever you want with this file.
|
---|
10 | //
|
---|
11 | ///////////////////////////////////////////////////////////////////////////////
|
---|
12 |
|
---|
13 | #include "filter_encoder.h"
|
---|
14 | #include "easy_preset.h"
|
---|
15 | #include "block_encoder.h"
|
---|
16 | #include "block_buffer_encoder.h"
|
---|
17 | #include "index_encoder.h"
|
---|
18 | #include "outqueue.h"
|
---|
19 |
|
---|
20 |
|
---|
21 | /// Maximum supported block size. This makes it simpler to prevent integer
|
---|
22 | /// overflows if we are given unusually large block size.
|
---|
23 | #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
|
---|
24 |
|
---|
25 |
|
---|
26 | typedef enum {
|
---|
27 | /// Waiting for work.
|
---|
28 | THR_IDLE,
|
---|
29 |
|
---|
30 | /// Encoding is in progress.
|
---|
31 | THR_RUN,
|
---|
32 |
|
---|
33 | /// Encoding is in progress but no more input data will
|
---|
34 | /// be read.
|
---|
35 | THR_FINISH,
|
---|
36 |
|
---|
37 | /// The main thread wants the thread to stop whatever it was doing
|
---|
38 | /// but not exit.
|
---|
39 | THR_STOP,
|
---|
40 |
|
---|
41 | /// The main thread wants the thread to exit. We could use
|
---|
42 | /// cancellation but since there's stopped anyway, this is lazier.
|
---|
43 | THR_EXIT,
|
---|
44 |
|
---|
45 | } worker_state;
|
---|
46 |
|
---|
47 | typedef struct lzma_stream_coder_s lzma_stream_coder;
|
---|
48 |
|
---|
49 | typedef struct worker_thread_s worker_thread;
|
---|
50 | struct worker_thread_s {
|
---|
51 | worker_state state;
|
---|
52 |
|
---|
53 | /// Input buffer of coder->block_size bytes. The main thread will
|
---|
54 | /// put new input into this and update in_size accordingly. Once
|
---|
55 | /// no more input is coming, state will be set to THR_FINISH.
|
---|
56 | uint8_t *in;
|
---|
57 |
|
---|
58 | /// Amount of data available in the input buffer. This is modified
|
---|
59 | /// only by the main thread.
|
---|
60 | size_t in_size;
|
---|
61 |
|
---|
62 | /// Output buffer for this thread. This is set by the main
|
---|
63 | /// thread every time a new Block is started with this thread
|
---|
64 | /// structure.
|
---|
65 | lzma_outbuf *outbuf;
|
---|
66 |
|
---|
67 | /// Pointer to the main structure is needed when putting this
|
---|
68 | /// thread back to the stack of free threads.
|
---|
69 | lzma_stream_coder *coder;
|
---|
70 |
|
---|
71 | /// The allocator is set by the main thread. Since a copy of the
|
---|
72 | /// pointer is kept here, the application must not change the
|
---|
73 | /// allocator before calling lzma_end().
|
---|
74 | const lzma_allocator *allocator;
|
---|
75 |
|
---|
76 | /// Amount of uncompressed data that has already been compressed.
|
---|
77 | uint64_t progress_in;
|
---|
78 |
|
---|
79 | /// Amount of compressed data that is ready.
|
---|
80 | uint64_t progress_out;
|
---|
81 |
|
---|
82 | /// Block encoder
|
---|
83 | lzma_next_coder block_encoder;
|
---|
84 |
|
---|
85 | /// Compression options for this Block
|
---|
86 | lzma_block block_options;
|
---|
87 |
|
---|
88 | /// Filter chain for this thread. By copying the filters array
|
---|
89 | /// to each thread it is possible to change the filter chain
|
---|
90 | /// between Blocks using lzma_filters_update().
|
---|
91 | lzma_filter filters[LZMA_FILTERS_MAX + 1];
|
---|
92 |
|
---|
93 | /// Next structure in the stack of free worker threads.
|
---|
94 | worker_thread *next;
|
---|
95 |
|
---|
96 | mythread_mutex mutex;
|
---|
97 | mythread_cond cond;
|
---|
98 |
|
---|
99 | /// The ID of this thread is used to join the thread
|
---|
100 | /// when it's not needed anymore.
|
---|
101 | mythread thread_id;
|
---|
102 | };
|
---|
103 |
|
---|
104 |
|
---|
105 | struct lzma_stream_coder_s {
|
---|
106 | enum {
|
---|
107 | SEQ_STREAM_HEADER,
|
---|
108 | SEQ_BLOCK,
|
---|
109 | SEQ_INDEX,
|
---|
110 | SEQ_STREAM_FOOTER,
|
---|
111 | } sequence;
|
---|
112 |
|
---|
113 | /// Start a new Block every block_size bytes of input unless
|
---|
114 | /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
|
---|
115 | size_t block_size;
|
---|
116 |
|
---|
117 | /// The filter chain to use for the next Block.
|
---|
118 | /// This can be updated using lzma_filters_update()
|
---|
119 | /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
|
---|
120 | lzma_filter filters[LZMA_FILTERS_MAX + 1];
|
---|
121 |
|
---|
122 | /// A copy of filters[] will be put here when attempting to get
|
---|
123 | /// a new worker thread. This will be copied to a worker thread
|
---|
124 | /// when a thread becomes free and then this cache is marked as
|
---|
125 | /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
|
---|
126 | /// the filter options from filters[] would get uselessly copied
|
---|
127 | /// multiple times (allocated and freed) when waiting for a new free
|
---|
128 | /// worker thread.
|
---|
129 | ///
|
---|
130 | /// This is freed if filters[] is updated via lzma_filters_update().
|
---|
131 | lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
|
---|
132 |
|
---|
133 |
|
---|
134 | /// Index to hold sizes of the Blocks
|
---|
135 | lzma_index *index;
|
---|
136 |
|
---|
137 | /// Index encoder
|
---|
138 | lzma_next_coder index_encoder;
|
---|
139 |
|
---|
140 |
|
---|
141 | /// Stream Flags for encoding the Stream Header and Stream Footer.
|
---|
142 | lzma_stream_flags stream_flags;
|
---|
143 |
|
---|
144 | /// Buffer to hold Stream Header and Stream Footer.
|
---|
145 | uint8_t header[LZMA_STREAM_HEADER_SIZE];
|
---|
146 |
|
---|
147 | /// Read position in header[]
|
---|
148 | size_t header_pos;
|
---|
149 |
|
---|
150 |
|
---|
151 | /// Output buffer queue for compressed data
|
---|
152 | lzma_outq outq;
|
---|
153 |
|
---|
154 | /// How much memory to allocate for each lzma_outbuf.buf
|
---|
155 | size_t outbuf_alloc_size;
|
---|
156 |
|
---|
157 |
|
---|
158 | /// Maximum wait time if cannot use all the input and cannot
|
---|
159 | /// fill the output buffer. This is in milliseconds.
|
---|
160 | uint32_t timeout;
|
---|
161 |
|
---|
162 |
|
---|
163 | /// Error code from a worker thread
|
---|
164 | lzma_ret thread_error;
|
---|
165 |
|
---|
166 | /// Array of allocated thread-specific structures
|
---|
167 | worker_thread *threads;
|
---|
168 |
|
---|
169 | /// Number of structures in "threads" above. This is also the
|
---|
170 | /// number of threads that will be created at maximum.
|
---|
171 | uint32_t threads_max;
|
---|
172 |
|
---|
173 | /// Number of thread structures that have been initialized, and
|
---|
174 | /// thus the number of worker threads actually created so far.
|
---|
175 | uint32_t threads_initialized;
|
---|
176 |
|
---|
177 | /// Stack of free threads. When a thread finishes, it puts itself
|
---|
178 | /// back into this stack. This starts as empty because threads
|
---|
179 | /// are created only when actually needed.
|
---|
180 | worker_thread *threads_free;
|
---|
181 |
|
---|
182 | /// The most recent worker thread to which the main thread writes
|
---|
183 | /// the new input from the application.
|
---|
184 | worker_thread *thr;
|
---|
185 |
|
---|
186 |
|
---|
187 | /// Amount of uncompressed data in Blocks that have already
|
---|
188 | /// been finished.
|
---|
189 | uint64_t progress_in;
|
---|
190 |
|
---|
191 | /// Amount of compressed data in Stream Header + Blocks that
|
---|
192 | /// have already been finished.
|
---|
193 | uint64_t progress_out;
|
---|
194 |
|
---|
195 |
|
---|
196 | mythread_mutex mutex;
|
---|
197 | mythread_cond cond;
|
---|
198 | };
|
---|
199 |
|
---|
200 |
|
---|
201 | /// Tell the main thread that something has gone wrong.
|
---|
202 | static void
|
---|
203 | worker_error(worker_thread *thr, lzma_ret ret)
|
---|
204 | {
|
---|
205 | assert(ret != LZMA_OK);
|
---|
206 | assert(ret != LZMA_STREAM_END);
|
---|
207 |
|
---|
208 | mythread_sync(thr->coder->mutex) {
|
---|
209 | if (thr->coder->thread_error == LZMA_OK)
|
---|
210 | thr->coder->thread_error = ret;
|
---|
211 |
|
---|
212 | mythread_cond_signal(&thr->coder->cond);
|
---|
213 | }
|
---|
214 |
|
---|
215 | return;
|
---|
216 | }
|
---|
217 |
|
---|
218 |
|
---|
219 | static worker_state
|
---|
220 | worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
|
---|
221 | {
|
---|
222 | assert(thr->progress_in == 0);
|
---|
223 | assert(thr->progress_out == 0);
|
---|
224 |
|
---|
225 | // Set the Block options.
|
---|
226 | thr->block_options = (lzma_block){
|
---|
227 | .version = 0,
|
---|
228 | .check = thr->coder->stream_flags.check,
|
---|
229 | .compressed_size = thr->outbuf->allocated,
|
---|
230 | .uncompressed_size = thr->coder->block_size,
|
---|
231 | .filters = thr->filters,
|
---|
232 | };
|
---|
233 |
|
---|
234 | // Calculate maximum size of the Block Header. This amount is
|
---|
235 | // reserved in the beginning of the buffer so that Block Header
|
---|
236 | // along with Compressed Size and Uncompressed Size can be
|
---|
237 | // written there.
|
---|
238 | lzma_ret ret = lzma_block_header_size(&thr->block_options);
|
---|
239 | if (ret != LZMA_OK) {
|
---|
240 | worker_error(thr, ret);
|
---|
241 | return THR_STOP;
|
---|
242 | }
|
---|
243 |
|
---|
244 | // Initialize the Block encoder.
|
---|
245 | ret = lzma_block_encoder_init(&thr->block_encoder,
|
---|
246 | thr->allocator, &thr->block_options);
|
---|
247 | if (ret != LZMA_OK) {
|
---|
248 | worker_error(thr, ret);
|
---|
249 | return THR_STOP;
|
---|
250 | }
|
---|
251 |
|
---|
252 | size_t in_pos = 0;
|
---|
253 | size_t in_size = 0;
|
---|
254 |
|
---|
255 | *out_pos = thr->block_options.header_size;
|
---|
256 | const size_t out_size = thr->outbuf->allocated;
|
---|
257 |
|
---|
258 | do {
|
---|
259 | mythread_sync(thr->mutex) {
|
---|
260 | // Store in_pos and *out_pos into *thr so that
|
---|
261 | // an application may read them via
|
---|
262 | // lzma_get_progress() to get progress information.
|
---|
263 | //
|
---|
264 | // NOTE: These aren't updated when the encoding
|
---|
265 | // finishes. Instead, the final values are taken
|
---|
266 | // later from thr->outbuf.
|
---|
267 | thr->progress_in = in_pos;
|
---|
268 | thr->progress_out = *out_pos;
|
---|
269 |
|
---|
270 | while (in_size == thr->in_size
|
---|
271 | && thr->state == THR_RUN)
|
---|
272 | mythread_cond_wait(&thr->cond, &thr->mutex);
|
---|
273 |
|
---|
274 | state = thr->state;
|
---|
275 | in_size = thr->in_size;
|
---|
276 | }
|
---|
277 |
|
---|
278 | // Return if we were asked to stop or exit.
|
---|
279 | if (state >= THR_STOP)
|
---|
280 | return state;
|
---|
281 |
|
---|
282 | lzma_action action = state == THR_FINISH
|
---|
283 | ? LZMA_FINISH : LZMA_RUN;
|
---|
284 |
|
---|
285 | // Limit the amount of input given to the Block encoder
|
---|
286 | // at once. This way this thread can react fairly quickly
|
---|
287 | // if the main thread wants us to stop or exit.
|
---|
288 | static const size_t in_chunk_max = 16384;
|
---|
289 | size_t in_limit = in_size;
|
---|
290 | if (in_size - in_pos > in_chunk_max) {
|
---|
291 | in_limit = in_pos + in_chunk_max;
|
---|
292 | action = LZMA_RUN;
|
---|
293 | }
|
---|
294 |
|
---|
295 | ret = thr->block_encoder.code(
|
---|
296 | thr->block_encoder.coder, thr->allocator,
|
---|
297 | thr->in, &in_pos, in_limit, thr->outbuf->buf,
|
---|
298 | out_pos, out_size, action);
|
---|
299 | } while (ret == LZMA_OK && *out_pos < out_size);
|
---|
300 |
|
---|
301 | switch (ret) {
|
---|
302 | case LZMA_STREAM_END:
|
---|
303 | assert(state == THR_FINISH);
|
---|
304 |
|
---|
305 | // Encode the Block Header. By doing it after
|
---|
306 | // the compression, we can store the Compressed Size
|
---|
307 | // and Uncompressed Size fields.
|
---|
308 | ret = lzma_block_header_encode(&thr->block_options,
|
---|
309 | thr->outbuf->buf);
|
---|
310 | if (ret != LZMA_OK) {
|
---|
311 | worker_error(thr, ret);
|
---|
312 | return THR_STOP;
|
---|
313 | }
|
---|
314 |
|
---|
315 | break;
|
---|
316 |
|
---|
317 | case LZMA_OK:
|
---|
318 | // The data was incompressible. Encode it using uncompressed
|
---|
319 | // LZMA2 chunks.
|
---|
320 | //
|
---|
321 | // First wait that we have gotten all the input.
|
---|
322 | mythread_sync(thr->mutex) {
|
---|
323 | while (thr->state == THR_RUN)
|
---|
324 | mythread_cond_wait(&thr->cond, &thr->mutex);
|
---|
325 |
|
---|
326 | state = thr->state;
|
---|
327 | in_size = thr->in_size;
|
---|
328 | }
|
---|
329 |
|
---|
330 | if (state >= THR_STOP)
|
---|
331 | return state;
|
---|
332 |
|
---|
333 | // Do the encoding. This takes care of the Block Header too.
|
---|
334 | *out_pos = 0;
|
---|
335 | ret = lzma_block_uncomp_encode(&thr->block_options,
|
---|
336 | thr->in, in_size, thr->outbuf->buf,
|
---|
337 | out_pos, out_size);
|
---|
338 |
|
---|
339 | // It shouldn't fail.
|
---|
340 | if (ret != LZMA_OK) {
|
---|
341 | worker_error(thr, LZMA_PROG_ERROR);
|
---|
342 | return THR_STOP;
|
---|
343 | }
|
---|
344 |
|
---|
345 | break;
|
---|
346 |
|
---|
347 | default:
|
---|
348 | worker_error(thr, ret);
|
---|
349 | return THR_STOP;
|
---|
350 | }
|
---|
351 |
|
---|
352 | // Set the size information that will be read by the main thread
|
---|
353 | // to write the Index field.
|
---|
354 | thr->outbuf->unpadded_size
|
---|
355 | = lzma_block_unpadded_size(&thr->block_options);
|
---|
356 | assert(thr->outbuf->unpadded_size != 0);
|
---|
357 | thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
|
---|
358 |
|
---|
359 | return THR_FINISH;
|
---|
360 | }
|
---|
361 |
|
---|
362 |
|
---|
363 | static MYTHREAD_RET_TYPE
|
---|
364 | #ifndef VBOX
|
---|
365 | worker_start(void *thr_ptr)
|
---|
366 | #else
|
---|
367 | worker_start(RTTHREAD hThread, void *thr_ptr)
|
---|
368 | #endif
|
---|
369 | {
|
---|
370 | worker_thread *thr = thr_ptr;
|
---|
371 | worker_state state = THR_IDLE; // Init to silence a warning
|
---|
372 |
|
---|
373 | while (true) {
|
---|
374 | // Wait for work.
|
---|
375 | mythread_sync(thr->mutex) {
|
---|
376 | while (true) {
|
---|
377 | // The thread is already idle so if we are
|
---|
378 | // requested to stop, just set the state.
|
---|
379 | if (thr->state == THR_STOP) {
|
---|
380 | thr->state = THR_IDLE;
|
---|
381 | mythread_cond_signal(&thr->cond);
|
---|
382 | }
|
---|
383 |
|
---|
384 | state = thr->state;
|
---|
385 | if (state != THR_IDLE)
|
---|
386 | break;
|
---|
387 |
|
---|
388 | mythread_cond_wait(&thr->cond, &thr->mutex);
|
---|
389 | }
|
---|
390 | }
|
---|
391 |
|
---|
392 | size_t out_pos = 0;
|
---|
393 |
|
---|
394 | assert(state != THR_IDLE);
|
---|
395 | assert(state != THR_STOP);
|
---|
396 |
|
---|
397 | if (state <= THR_FINISH)
|
---|
398 | state = worker_encode(thr, &out_pos, state);
|
---|
399 |
|
---|
400 | if (state == THR_EXIT)
|
---|
401 | break;
|
---|
402 |
|
---|
403 | // Mark the thread as idle unless the main thread has
|
---|
404 | // told us to exit. Signal is needed for the case
|
---|
405 | // where the main thread is waiting for the threads to stop.
|
---|
406 | mythread_sync(thr->mutex) {
|
---|
407 | if (thr->state != THR_EXIT) {
|
---|
408 | thr->state = THR_IDLE;
|
---|
409 | mythread_cond_signal(&thr->cond);
|
---|
410 | }
|
---|
411 | }
|
---|
412 |
|
---|
413 | mythread_sync(thr->coder->mutex) {
|
---|
414 | // If no errors occurred, make the encoded data
|
---|
415 | // available to be copied out.
|
---|
416 | if (state == THR_FINISH) {
|
---|
417 | thr->outbuf->pos = out_pos;
|
---|
418 | thr->outbuf->finished = true;
|
---|
419 | }
|
---|
420 |
|
---|
421 | // Update the main progress info.
|
---|
422 | thr->coder->progress_in
|
---|
423 | += thr->outbuf->uncompressed_size;
|
---|
424 | thr->coder->progress_out += out_pos;
|
---|
425 | thr->progress_in = 0;
|
---|
426 | thr->progress_out = 0;
|
---|
427 |
|
---|
428 | // Return this thread to the stack of free threads.
|
---|
429 | thr->next = thr->coder->threads_free;
|
---|
430 | thr->coder->threads_free = thr;
|
---|
431 |
|
---|
432 | mythread_cond_signal(&thr->coder->cond);
|
---|
433 | }
|
---|
434 | }
|
---|
435 |
|
---|
436 | // Exiting, free the resources.
|
---|
437 | lzma_filters_free(thr->filters, thr->allocator);
|
---|
438 |
|
---|
439 | mythread_mutex_destroy(&thr->mutex);
|
---|
440 | mythread_cond_destroy(&thr->cond);
|
---|
441 |
|
---|
442 | lzma_next_end(&thr->block_encoder, thr->allocator);
|
---|
443 | lzma_free(thr->in, thr->allocator);
|
---|
444 | return MYTHREAD_RET_VALUE;
|
---|
445 | }
|
---|
446 |
|
---|
447 |
|
---|
448 | /// Make the threads stop but not exit. Optionally wait for them to stop.
|
---|
449 | static void
|
---|
450 | threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
|
---|
451 | {
|
---|
452 | // Tell the threads to stop.
|
---|
453 | for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
|
---|
454 | mythread_sync(coder->threads[i].mutex) {
|
---|
455 | coder->threads[i].state = THR_STOP;
|
---|
456 | mythread_cond_signal(&coder->threads[i].cond);
|
---|
457 | }
|
---|
458 | }
|
---|
459 |
|
---|
460 | if (!wait_for_threads)
|
---|
461 | return;
|
---|
462 |
|
---|
463 | // Wait for the threads to settle in the idle state.
|
---|
464 | for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
|
---|
465 | mythread_sync(coder->threads[i].mutex) {
|
---|
466 | while (coder->threads[i].state != THR_IDLE)
|
---|
467 | mythread_cond_wait(&coder->threads[i].cond,
|
---|
468 | &coder->threads[i].mutex);
|
---|
469 | }
|
---|
470 | }
|
---|
471 |
|
---|
472 | return;
|
---|
473 | }
|
---|
474 |
|
---|
475 |
|
---|
476 | /// Stop the threads and free the resources associated with them.
|
---|
477 | /// Wait until the threads have exited.
|
---|
478 | static void
|
---|
479 | threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
|
---|
480 | {
|
---|
481 | for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
|
---|
482 | mythread_sync(coder->threads[i].mutex) {
|
---|
483 | coder->threads[i].state = THR_EXIT;
|
---|
484 | mythread_cond_signal(&coder->threads[i].cond);
|
---|
485 | }
|
---|
486 | }
|
---|
487 |
|
---|
488 | for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
|
---|
489 | int ret = mythread_join(coder->threads[i].thread_id);
|
---|
490 | assert(ret == 0);
|
---|
491 | (void)ret;
|
---|
492 | }
|
---|
493 |
|
---|
494 | lzma_free(coder->threads, allocator);
|
---|
495 | return;
|
---|
496 | }
|
---|
497 |
|
---|
498 |
|
---|
499 | /// Initialize a new worker_thread structure and create a new thread.
|
---|
500 | static lzma_ret
|
---|
501 | initialize_new_thread(lzma_stream_coder *coder,
|
---|
502 | const lzma_allocator *allocator)
|
---|
503 | {
|
---|
504 | worker_thread *thr = &coder->threads[coder->threads_initialized];
|
---|
505 |
|
---|
506 | thr->in = lzma_alloc(coder->block_size, allocator);
|
---|
507 | if (thr->in == NULL)
|
---|
508 | return LZMA_MEM_ERROR;
|
---|
509 |
|
---|
510 | if (mythread_mutex_init(&thr->mutex))
|
---|
511 | goto error_mutex;
|
---|
512 |
|
---|
513 | if (mythread_cond_init(&thr->cond))
|
---|
514 | goto error_cond;
|
---|
515 |
|
---|
516 | thr->state = THR_IDLE;
|
---|
517 | thr->allocator = allocator;
|
---|
518 | thr->coder = coder;
|
---|
519 | thr->progress_in = 0;
|
---|
520 | thr->progress_out = 0;
|
---|
521 | thr->block_encoder = LZMA_NEXT_CODER_INIT;
|
---|
522 | thr->filters[0].id = LZMA_VLI_UNKNOWN;
|
---|
523 |
|
---|
524 | if (mythread_create(&thr->thread_id, &worker_start, thr))
|
---|
525 | goto error_thread;
|
---|
526 |
|
---|
527 | ++coder->threads_initialized;
|
---|
528 | coder->thr = thr;
|
---|
529 |
|
---|
530 | return LZMA_OK;
|
---|
531 |
|
---|
532 | error_thread:
|
---|
533 | mythread_cond_destroy(&thr->cond);
|
---|
534 |
|
---|
535 | error_cond:
|
---|
536 | mythread_mutex_destroy(&thr->mutex);
|
---|
537 |
|
---|
538 | error_mutex:
|
---|
539 | lzma_free(thr->in, allocator);
|
---|
540 | return LZMA_MEM_ERROR;
|
---|
541 | }
|
---|
542 |
|
---|
543 |
|
---|
544 | static lzma_ret
|
---|
545 | get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
|
---|
546 | {
|
---|
547 | // If there are no free output subqueues, there is no
|
---|
548 | // point to try getting a thread.
|
---|
549 | if (!lzma_outq_has_buf(&coder->outq))
|
---|
550 | return LZMA_OK;
|
---|
551 |
|
---|
552 | // That's also true if we cannot allocate memory for the output
|
---|
553 | // buffer in the output queue.
|
---|
554 | return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
|
---|
555 | coder->outbuf_alloc_size));
|
---|
556 |
|
---|
557 | // Make a thread-specific copy of the filter chain. Put it in
|
---|
558 | // the cache array first so that if we cannot get a new thread yet,
|
---|
559 | // the allocation is ready when we try again.
|
---|
560 | if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
|
---|
561 | return_if_error(lzma_filters_copy(
|
---|
562 | coder->filters, coder->filters_cache, allocator));
|
---|
563 |
|
---|
564 | // If there is a free structure on the stack, use it.
|
---|
565 | mythread_sync(coder->mutex) {
|
---|
566 | if (coder->threads_free != NULL) {
|
---|
567 | coder->thr = coder->threads_free;
|
---|
568 | coder->threads_free = coder->threads_free->next;
|
---|
569 | }
|
---|
570 | }
|
---|
571 |
|
---|
572 | if (coder->thr == NULL) {
|
---|
573 | // If there are no uninitialized structures left, return.
|
---|
574 | if (coder->threads_initialized == coder->threads_max)
|
---|
575 | return LZMA_OK;
|
---|
576 |
|
---|
577 | // Initialize a new thread.
|
---|
578 | return_if_error(initialize_new_thread(coder, allocator));
|
---|
579 | }
|
---|
580 |
|
---|
581 | // Reset the parts of the thread state that have to be done
|
---|
582 | // in the main thread.
|
---|
583 | mythread_sync(coder->thr->mutex) {
|
---|
584 | coder->thr->state = THR_RUN;
|
---|
585 | coder->thr->in_size = 0;
|
---|
586 | coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
|
---|
587 |
|
---|
588 | // Free the old thread-specific filter options and replace
|
---|
589 | // them with the already-allocated new options from
|
---|
590 | // coder->filters_cache[]. Then mark the cache as empty.
|
---|
591 | lzma_filters_free(coder->thr->filters, allocator);
|
---|
592 | memcpy(coder->thr->filters, coder->filters_cache,
|
---|
593 | sizeof(coder->filters_cache));
|
---|
594 | coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
|
---|
595 |
|
---|
596 | mythread_cond_signal(&coder->thr->cond);
|
---|
597 | }
|
---|
598 |
|
---|
599 | return LZMA_OK;
|
---|
600 | }
|
---|
601 |
|
---|
602 |
|
---|
603 | static lzma_ret
|
---|
604 | stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
|
---|
605 | const uint8_t *restrict in, size_t *restrict in_pos,
|
---|
606 | size_t in_size, lzma_action action)
|
---|
607 | {
|
---|
608 | while (*in_pos < in_size
|
---|
609 | || (coder->thr != NULL && action != LZMA_RUN)) {
|
---|
610 | if (coder->thr == NULL) {
|
---|
611 | // Get a new thread.
|
---|
612 | const lzma_ret ret = get_thread(coder, allocator);
|
---|
613 | if (coder->thr == NULL)
|
---|
614 | return ret;
|
---|
615 | }
|
---|
616 |
|
---|
617 | // Copy the input data to thread's buffer.
|
---|
618 | size_t thr_in_size = coder->thr->in_size;
|
---|
619 | lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
|
---|
620 | &thr_in_size, coder->block_size);
|
---|
621 |
|
---|
622 | // Tell the Block encoder to finish if
|
---|
623 | // - it has got block_size bytes of input; or
|
---|
624 | // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
|
---|
625 | // or LZMA_FULL_BARRIER was used.
|
---|
626 | //
|
---|
627 | // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
|
---|
628 | const bool finish = thr_in_size == coder->block_size
|
---|
629 | || (*in_pos == in_size && action != LZMA_RUN);
|
---|
630 |
|
---|
631 | bool block_error = false;
|
---|
632 |
|
---|
633 | mythread_sync(coder->thr->mutex) {
|
---|
634 | if (coder->thr->state == THR_IDLE) {
|
---|
635 | // Something has gone wrong with the Block
|
---|
636 | // encoder. It has set coder->thread_error
|
---|
637 | // which we will read a few lines later.
|
---|
638 | block_error = true;
|
---|
639 | } else {
|
---|
640 | // Tell the Block encoder its new amount
|
---|
641 | // of input and update the state if needed.
|
---|
642 | coder->thr->in_size = thr_in_size;
|
---|
643 |
|
---|
644 | if (finish)
|
---|
645 | coder->thr->state = THR_FINISH;
|
---|
646 |
|
---|
647 | mythread_cond_signal(&coder->thr->cond);
|
---|
648 | }
|
---|
649 | }
|
---|
650 |
|
---|
651 | if (block_error) {
|
---|
652 | #ifndef VBOX
|
---|
653 | lzma_ret ret;
|
---|
654 | #else
|
---|
655 | lzma_ret ret = LZMA_OK; /* Shut up msc who can't grok the mythread_sync construct below. */
|
---|
656 | #endif
|
---|
657 |
|
---|
658 | mythread_sync(coder->mutex) {
|
---|
659 | ret = coder->thread_error;
|
---|
660 | }
|
---|
661 |
|
---|
662 | return ret;
|
---|
663 | }
|
---|
664 |
|
---|
665 | if (finish)
|
---|
666 | coder->thr = NULL;
|
---|
667 | }
|
---|
668 |
|
---|
669 | return LZMA_OK;
|
---|
670 | }
|
---|
671 |
|
---|
672 |
|
---|
673 | /// Wait until more input can be consumed, more output can be read, or
|
---|
674 | /// an optional timeout is reached.
|
---|
675 | static bool
|
---|
676 | wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
|
---|
677 | bool *has_blocked, bool has_input)
|
---|
678 | {
|
---|
679 | if (coder->timeout != 0 && !*has_blocked) {
|
---|
680 | // Every time when stream_encode_mt() is called via
|
---|
681 | // lzma_code(), *has_blocked starts as false. We set it
|
---|
682 | // to true here and calculate the absolute time when
|
---|
683 | // we must return if there's nothing to do.
|
---|
684 | //
|
---|
685 | // This way if we block multiple times for short moments
|
---|
686 | // less than "timeout" milliseconds, we will return once
|
---|
687 | // "timeout" amount of time has passed since the *first*
|
---|
688 | // blocking occurred. If the absolute time was calculated
|
---|
689 | // again every time we block, "timeout" would effectively
|
---|
690 | // be meaningless if we never consecutively block longer
|
---|
691 | // than "timeout" ms.
|
---|
692 | *has_blocked = true;
|
---|
693 | mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
|
---|
694 | }
|
---|
695 |
|
---|
696 | bool timed_out = false;
|
---|
697 |
|
---|
698 | mythread_sync(coder->mutex) {
|
---|
699 | // There are four things that we wait. If one of them
|
---|
700 | // becomes possible, we return.
|
---|
701 | // - If there is input left, we need to get a free
|
---|
702 | // worker thread and an output buffer for it.
|
---|
703 | // - Data ready to be read from the output queue.
|
---|
704 | // - A worker thread indicates an error.
|
---|
705 | // - Time out occurs.
|
---|
706 | while ((!has_input || coder->threads_free == NULL
|
---|
707 | || !lzma_outq_has_buf(&coder->outq))
|
---|
708 | && !lzma_outq_is_readable(&coder->outq)
|
---|
709 | && coder->thread_error == LZMA_OK
|
---|
710 | && !timed_out) {
|
---|
711 | if (coder->timeout != 0)
|
---|
712 | timed_out = mythread_cond_timedwait(
|
---|
713 | &coder->cond, &coder->mutex,
|
---|
714 | wait_abs) != 0;
|
---|
715 | else
|
---|
716 | mythread_cond_wait(&coder->cond,
|
---|
717 | &coder->mutex);
|
---|
718 | }
|
---|
719 | }
|
---|
720 |
|
---|
721 | return timed_out;
|
---|
722 | }
|
---|
723 |
|
---|
724 |
|
---|
725 | static lzma_ret
|
---|
726 | stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
|
---|
727 | const uint8_t *restrict in, size_t *restrict in_pos,
|
---|
728 | size_t in_size, uint8_t *restrict out,
|
---|
729 | size_t *restrict out_pos, size_t out_size, lzma_action action)
|
---|
730 | {
|
---|
731 | lzma_stream_coder *coder = coder_ptr;
|
---|
732 |
|
---|
733 | switch (coder->sequence) {
|
---|
734 | case SEQ_STREAM_HEADER:
|
---|
735 | lzma_bufcpy(coder->header, &coder->header_pos,
|
---|
736 | sizeof(coder->header),
|
---|
737 | out, out_pos, out_size);
|
---|
738 | if (coder->header_pos < sizeof(coder->header))
|
---|
739 | return LZMA_OK;
|
---|
740 |
|
---|
741 | coder->header_pos = 0;
|
---|
742 | coder->sequence = SEQ_BLOCK;
|
---|
743 |
|
---|
744 | // Fall through
|
---|
745 |
|
---|
746 | case SEQ_BLOCK: {
|
---|
747 | // Initialized to silence warnings.
|
---|
748 | lzma_vli unpadded_size = 0;
|
---|
749 | lzma_vli uncompressed_size = 0;
|
---|
750 | lzma_ret ret = LZMA_OK;
|
---|
751 |
|
---|
752 | // These are for wait_for_work().
|
---|
753 | bool has_blocked = false;
|
---|
754 | mythread_condtime wait_abs;
|
---|
755 |
|
---|
756 | while (true) {
|
---|
757 | mythread_sync(coder->mutex) {
|
---|
758 | // Check for Block encoder errors.
|
---|
759 | ret = coder->thread_error;
|
---|
760 | if (ret != LZMA_OK) {
|
---|
761 | assert(ret != LZMA_STREAM_END);
|
---|
762 | break; // Break out of mythread_sync.
|
---|
763 | }
|
---|
764 |
|
---|
765 | // Try to read compressed data to out[].
|
---|
766 | ret = lzma_outq_read(&coder->outq, allocator,
|
---|
767 | out, out_pos, out_size,
|
---|
768 | &unpadded_size,
|
---|
769 | &uncompressed_size);
|
---|
770 | }
|
---|
771 |
|
---|
772 | if (ret == LZMA_STREAM_END) {
|
---|
773 | // End of Block. Add it to the Index.
|
---|
774 | ret = lzma_index_append(coder->index,
|
---|
775 | allocator, unpadded_size,
|
---|
776 | uncompressed_size);
|
---|
777 | if (ret != LZMA_OK) {
|
---|
778 | threads_stop(coder, false);
|
---|
779 | return ret;
|
---|
780 | }
|
---|
781 |
|
---|
782 | // If we didn't fill the output buffer yet,
|
---|
783 | // try to read more data. Maybe the next
|
---|
784 | // outbuf has been finished already too.
|
---|
785 | if (*out_pos < out_size)
|
---|
786 | continue;
|
---|
787 | }
|
---|
788 |
|
---|
789 | if (ret != LZMA_OK) {
|
---|
790 | // coder->thread_error was set.
|
---|
791 | threads_stop(coder, false);
|
---|
792 | return ret;
|
---|
793 | }
|
---|
794 |
|
---|
795 | // Try to give uncompressed data to a worker thread.
|
---|
796 | ret = stream_encode_in(coder, allocator,
|
---|
797 | in, in_pos, in_size, action);
|
---|
798 | if (ret != LZMA_OK) {
|
---|
799 | threads_stop(coder, false);
|
---|
800 | return ret;
|
---|
801 | }
|
---|
802 |
|
---|
803 | // See if we should wait or return.
|
---|
804 | //
|
---|
805 | // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
|
---|
806 | if (*in_pos == in_size) {
|
---|
807 | // LZMA_RUN: More data is probably coming
|
---|
808 | // so return to let the caller fill the
|
---|
809 | // input buffer.
|
---|
810 | if (action == LZMA_RUN)
|
---|
811 | return LZMA_OK;
|
---|
812 |
|
---|
813 | // LZMA_FULL_BARRIER: The same as with
|
---|
814 | // LZMA_RUN but tell the caller that the
|
---|
815 | // barrier was completed.
|
---|
816 | if (action == LZMA_FULL_BARRIER)
|
---|
817 | return LZMA_STREAM_END;
|
---|
818 |
|
---|
819 | // Finishing or flushing isn't completed until
|
---|
820 | // all input data has been encoded and copied
|
---|
821 | // to the output buffer.
|
---|
822 | if (lzma_outq_is_empty(&coder->outq)) {
|
---|
823 | // LZMA_FINISH: Continue to encode
|
---|
824 | // the Index field.
|
---|
825 | if (action == LZMA_FINISH)
|
---|
826 | break;
|
---|
827 |
|
---|
828 | // LZMA_FULL_FLUSH: Return to tell
|
---|
829 | // the caller that flushing was
|
---|
830 | // completed.
|
---|
831 | if (action == LZMA_FULL_FLUSH)
|
---|
832 | return LZMA_STREAM_END;
|
---|
833 | }
|
---|
834 | }
|
---|
835 |
|
---|
836 | // Return if there is no output space left.
|
---|
837 | // This check must be done after testing the input
|
---|
838 | // buffer, because we might want to use a different
|
---|
839 | // return code.
|
---|
840 | if (*out_pos == out_size)
|
---|
841 | return LZMA_OK;
|
---|
842 |
|
---|
843 | // Neither in nor out has been used completely.
|
---|
844 | // Wait until there's something we can do.
|
---|
845 | if (wait_for_work(coder, &wait_abs, &has_blocked,
|
---|
846 | *in_pos < in_size))
|
---|
847 | return LZMA_TIMED_OUT;
|
---|
848 | }
|
---|
849 |
|
---|
850 | // All Blocks have been encoded and the threads have stopped.
|
---|
851 | // Prepare to encode the Index field.
|
---|
852 | return_if_error(lzma_index_encoder_init(
|
---|
853 | &coder->index_encoder, allocator,
|
---|
854 | coder->index));
|
---|
855 | coder->sequence = SEQ_INDEX;
|
---|
856 |
|
---|
857 | // Update the progress info to take the Index and
|
---|
858 | // Stream Footer into account. Those are very fast to encode
|
---|
859 | // so in terms of progress information they can be thought
|
---|
860 | // to be ready to be copied out.
|
---|
861 | coder->progress_out += lzma_index_size(coder->index)
|
---|
862 | + LZMA_STREAM_HEADER_SIZE;
|
---|
863 | }
|
---|
864 |
|
---|
865 | // Fall through
|
---|
866 |
|
---|
867 | case SEQ_INDEX: {
|
---|
868 | // Call the Index encoder. It doesn't take any input, so
|
---|
869 | // those pointers can be NULL.
|
---|
870 | const lzma_ret ret = coder->index_encoder.code(
|
---|
871 | coder->index_encoder.coder, allocator,
|
---|
872 | NULL, NULL, 0,
|
---|
873 | out, out_pos, out_size, LZMA_RUN);
|
---|
874 | if (ret != LZMA_STREAM_END)
|
---|
875 | return ret;
|
---|
876 |
|
---|
877 | // Encode the Stream Footer into coder->buffer.
|
---|
878 | coder->stream_flags.backward_size
|
---|
879 | = lzma_index_size(coder->index);
|
---|
880 | if (lzma_stream_footer_encode(&coder->stream_flags,
|
---|
881 | coder->header) != LZMA_OK)
|
---|
882 | return LZMA_PROG_ERROR;
|
---|
883 |
|
---|
884 | coder->sequence = SEQ_STREAM_FOOTER;
|
---|
885 | }
|
---|
886 |
|
---|
887 | // Fall through
|
---|
888 |
|
---|
889 | case SEQ_STREAM_FOOTER:
|
---|
890 | lzma_bufcpy(coder->header, &coder->header_pos,
|
---|
891 | sizeof(coder->header),
|
---|
892 | out, out_pos, out_size);
|
---|
893 | return coder->header_pos < sizeof(coder->header)
|
---|
894 | ? LZMA_OK : LZMA_STREAM_END;
|
---|
895 | }
|
---|
896 |
|
---|
897 | assert(0);
|
---|
898 | return LZMA_PROG_ERROR;
|
---|
899 | }
|
---|
900 |
|
---|
901 |
|
---|
902 | static void
|
---|
903 | stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
|
---|
904 | {
|
---|
905 | lzma_stream_coder *coder = coder_ptr;
|
---|
906 |
|
---|
907 | // Threads must be killed before the output queue can be freed.
|
---|
908 | threads_end(coder, allocator);
|
---|
909 | lzma_outq_end(&coder->outq, allocator);
|
---|
910 |
|
---|
911 | lzma_filters_free(coder->filters, allocator);
|
---|
912 | lzma_filters_free(coder->filters_cache, allocator);
|
---|
913 |
|
---|
914 | lzma_next_end(&coder->index_encoder, allocator);
|
---|
915 | lzma_index_end(coder->index, allocator);
|
---|
916 |
|
---|
917 | mythread_cond_destroy(&coder->cond);
|
---|
918 | mythread_mutex_destroy(&coder->mutex);
|
---|
919 |
|
---|
920 | lzma_free(coder, allocator);
|
---|
921 | return;
|
---|
922 | }
|
---|
923 |
|
---|
924 |
|
---|
925 | static lzma_ret
|
---|
926 | stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
|
---|
927 | const lzma_filter *filters,
|
---|
928 | const lzma_filter *reversed_filters
|
---|
929 | lzma_attribute((__unused__)))
|
---|
930 | {
|
---|
931 | lzma_stream_coder *coder = coder_ptr;
|
---|
932 |
|
---|
933 | // Applications shouldn't attempt to change the options when
|
---|
934 | // we are already encoding the Index or Stream Footer.
|
---|
935 | if (coder->sequence > SEQ_BLOCK)
|
---|
936 | return LZMA_PROG_ERROR;
|
---|
937 |
|
---|
938 | // For now the threaded encoder doesn't support changing
|
---|
939 | // the options in the middle of a Block.
|
---|
940 | if (coder->thr != NULL)
|
---|
941 | return LZMA_PROG_ERROR;
|
---|
942 |
|
---|
943 | // Check if the filter chain seems mostly valid. See the comment
|
---|
944 | // in stream_encoder_mt_init().
|
---|
945 | if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
|
---|
946 | return LZMA_OPTIONS_ERROR;
|
---|
947 |
|
---|
948 | // Make a copy to a temporary buffer first. This way the encoder
|
---|
949 | // state stays unchanged if an error occurs in lzma_filters_copy().
|
---|
950 | lzma_filter temp[LZMA_FILTERS_MAX + 1];
|
---|
951 | return_if_error(lzma_filters_copy(filters, temp, allocator));
|
---|
952 |
|
---|
953 | // Free the options of the old chain as well as the cache.
|
---|
954 | lzma_filters_free(coder->filters, allocator);
|
---|
955 | lzma_filters_free(coder->filters_cache, allocator);
|
---|
956 |
|
---|
957 | // Copy the new filter chain in place.
|
---|
958 | memcpy(coder->filters, temp, sizeof(temp));
|
---|
959 |
|
---|
960 | return LZMA_OK;
|
---|
961 | }
|
---|
962 |
|
---|
963 |
|
---|
964 | /// Options handling for lzma_stream_encoder_mt_init() and
|
---|
965 | /// lzma_stream_encoder_mt_memusage()
|
---|
966 | static lzma_ret
|
---|
967 | get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
|
---|
968 | const lzma_filter **filters, uint64_t *block_size,
|
---|
969 | uint64_t *outbuf_size_max)
|
---|
970 | {
|
---|
971 | // Validate some of the options.
|
---|
972 | if (options == NULL)
|
---|
973 | return LZMA_PROG_ERROR;
|
---|
974 |
|
---|
975 | if (options->flags != 0 || options->threads == 0
|
---|
976 | || options->threads > LZMA_THREADS_MAX)
|
---|
977 | return LZMA_OPTIONS_ERROR;
|
---|
978 |
|
---|
979 | if (options->filters != NULL) {
|
---|
980 | // Filter chain was given, use it as is.
|
---|
981 | *filters = options->filters;
|
---|
982 | } else {
|
---|
983 | // Use a preset.
|
---|
984 | if (lzma_easy_preset(opt_easy, options->preset))
|
---|
985 | return LZMA_OPTIONS_ERROR;
|
---|
986 |
|
---|
987 | *filters = opt_easy->filters;
|
---|
988 | }
|
---|
989 |
|
---|
990 | // Block size
|
---|
991 | if (options->block_size > 0) {
|
---|
992 | if (options->block_size > BLOCK_SIZE_MAX)
|
---|
993 | return LZMA_OPTIONS_ERROR;
|
---|
994 |
|
---|
995 | *block_size = options->block_size;
|
---|
996 | } else {
|
---|
997 | // Determine the Block size from the filter chain.
|
---|
998 | *block_size = lzma_mt_block_size(*filters);
|
---|
999 | if (*block_size == 0)
|
---|
1000 | return LZMA_OPTIONS_ERROR;
|
---|
1001 |
|
---|
1002 | assert(*block_size <= BLOCK_SIZE_MAX);
|
---|
1003 | }
|
---|
1004 |
|
---|
1005 | // Calculate the maximum amount output that a single output buffer
|
---|
1006 | // may need to hold. This is the same as the maximum total size of
|
---|
1007 | // a Block.
|
---|
1008 | *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
|
---|
1009 | if (*outbuf_size_max == 0)
|
---|
1010 | return LZMA_MEM_ERROR;
|
---|
1011 |
|
---|
1012 | return LZMA_OK;
|
---|
1013 | }
|
---|
1014 |
|
---|
1015 |
|
---|
1016 | static void
|
---|
1017 | get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
|
---|
1018 | {
|
---|
1019 | lzma_stream_coder *coder = coder_ptr;
|
---|
1020 |
|
---|
1021 | // Lock coder->mutex to prevent finishing threads from moving their
|
---|
1022 | // progress info from the worker_thread structure to lzma_stream_coder.
|
---|
1023 | mythread_sync(coder->mutex) {
|
---|
1024 | *progress_in = coder->progress_in;
|
---|
1025 | *progress_out = coder->progress_out;
|
---|
1026 |
|
---|
1027 | for (size_t i = 0; i < coder->threads_initialized; ++i) {
|
---|
1028 | mythread_sync(coder->threads[i].mutex) {
|
---|
1029 | *progress_in += coder->threads[i].progress_in;
|
---|
1030 | *progress_out += coder->threads[i]
|
---|
1031 | .progress_out;
|
---|
1032 | }
|
---|
1033 | }
|
---|
1034 | }
|
---|
1035 |
|
---|
1036 | return;
|
---|
1037 | }
|
---|
1038 |
|
---|
1039 |
|
---|
1040 | static lzma_ret
|
---|
1041 | stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
|
---|
1042 | const lzma_mt *options)
|
---|
1043 | {
|
---|
1044 | lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
|
---|
1045 |
|
---|
1046 | // Get the filter chain.
|
---|
1047 | lzma_options_easy easy;
|
---|
1048 | const lzma_filter *filters;
|
---|
1049 | uint64_t block_size;
|
---|
1050 | uint64_t outbuf_size_max;
|
---|
1051 | return_if_error(get_options(options, &easy, &filters,
|
---|
1052 | &block_size, &outbuf_size_max));
|
---|
1053 |
|
---|
1054 | #if SIZE_MAX < UINT64_MAX
|
---|
1055 | if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
|
---|
1056 | return LZMA_MEM_ERROR;
|
---|
1057 | #endif
|
---|
1058 |
|
---|
1059 | // Validate the filter chain so that we can give an error in this
|
---|
1060 | // function instead of delaying it to the first call to lzma_code().
|
---|
1061 | // The memory usage calculation verifies the filter chain as
|
---|
1062 | // a side effect so we take advantage of that. It's not a perfect
|
---|
1063 | // check though as raw encoder allows LZMA1 too but such problems
|
---|
1064 | // will be caught eventually with Block Header encoder.
|
---|
1065 | if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
|
---|
1066 | return LZMA_OPTIONS_ERROR;
|
---|
1067 |
|
---|
1068 | // Validate the Check ID.
|
---|
1069 | if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
|
---|
1070 | return LZMA_PROG_ERROR;
|
---|
1071 |
|
---|
1072 | if (!lzma_check_is_supported(options->check))
|
---|
1073 | return LZMA_UNSUPPORTED_CHECK;
|
---|
1074 |
|
---|
1075 | // Allocate and initialize the base structure if needed.
|
---|
1076 | lzma_stream_coder *coder = next->coder;
|
---|
1077 | if (coder == NULL) {
|
---|
1078 | coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
|
---|
1079 | if (coder == NULL)
|
---|
1080 | return LZMA_MEM_ERROR;
|
---|
1081 |
|
---|
1082 | next->coder = coder;
|
---|
1083 |
|
---|
1084 | // For the mutex and condition variable initializations
|
---|
1085 | // the error handling has to be done here because
|
---|
1086 | // stream_encoder_mt_end() doesn't know if they have
|
---|
1087 | // already been initialized or not.
|
---|
1088 | if (mythread_mutex_init(&coder->mutex)) {
|
---|
1089 | lzma_free(coder, allocator);
|
---|
1090 | next->coder = NULL;
|
---|
1091 | return LZMA_MEM_ERROR;
|
---|
1092 | }
|
---|
1093 |
|
---|
1094 | if (mythread_cond_init(&coder->cond)) {
|
---|
1095 | mythread_mutex_destroy(&coder->mutex);
|
---|
1096 | lzma_free(coder, allocator);
|
---|
1097 | next->coder = NULL;
|
---|
1098 | return LZMA_MEM_ERROR;
|
---|
1099 | }
|
---|
1100 |
|
---|
1101 | next->code = &stream_encode_mt;
|
---|
1102 | next->end = &stream_encoder_mt_end;
|
---|
1103 | next->get_progress = &get_progress;
|
---|
1104 | next->update = &stream_encoder_mt_update;
|
---|
1105 |
|
---|
1106 | coder->filters[0].id = LZMA_VLI_UNKNOWN;
|
---|
1107 | coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
|
---|
1108 | coder->index_encoder = LZMA_NEXT_CODER_INIT;
|
---|
1109 | coder->index = NULL;
|
---|
1110 | memzero(&coder->outq, sizeof(coder->outq));
|
---|
1111 | coder->threads = NULL;
|
---|
1112 | coder->threads_max = 0;
|
---|
1113 | coder->threads_initialized = 0;
|
---|
1114 | }
|
---|
1115 |
|
---|
1116 | // Basic initializations
|
---|
1117 | coder->sequence = SEQ_STREAM_HEADER;
|
---|
1118 | coder->block_size = (size_t)(block_size);
|
---|
1119 | coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
|
---|
1120 | coder->thread_error = LZMA_OK;
|
---|
1121 | coder->thr = NULL;
|
---|
1122 |
|
---|
1123 | // Allocate the thread-specific base structures.
|
---|
1124 | assert(options->threads > 0);
|
---|
1125 | if (coder->threads_max != options->threads) {
|
---|
1126 | threads_end(coder, allocator);
|
---|
1127 |
|
---|
1128 | coder->threads = NULL;
|
---|
1129 | coder->threads_max = 0;
|
---|
1130 |
|
---|
1131 | coder->threads_initialized = 0;
|
---|
1132 | coder->threads_free = NULL;
|
---|
1133 |
|
---|
1134 | coder->threads = lzma_alloc(
|
---|
1135 | options->threads * sizeof(worker_thread),
|
---|
1136 | allocator);
|
---|
1137 | if (coder->threads == NULL)
|
---|
1138 | return LZMA_MEM_ERROR;
|
---|
1139 |
|
---|
1140 | coder->threads_max = options->threads;
|
---|
1141 | } else {
|
---|
1142 | // Reuse the old structures and threads. Tell the running
|
---|
1143 | // threads to stop and wait until they have stopped.
|
---|
1144 | threads_stop(coder, true);
|
---|
1145 | }
|
---|
1146 |
|
---|
1147 | // Output queue
|
---|
1148 | return_if_error(lzma_outq_init(&coder->outq, allocator,
|
---|
1149 | options->threads));
|
---|
1150 |
|
---|
1151 | // Timeout
|
---|
1152 | coder->timeout = options->timeout;
|
---|
1153 |
|
---|
1154 | // Free the old filter chain and the cache.
|
---|
1155 | lzma_filters_free(coder->filters, allocator);
|
---|
1156 | lzma_filters_free(coder->filters_cache, allocator);
|
---|
1157 |
|
---|
1158 | // Copy the new filter chain.
|
---|
1159 | return_if_error(lzma_filters_copy(
|
---|
1160 | filters, coder->filters, allocator));
|
---|
1161 |
|
---|
1162 | // Index
|
---|
1163 | lzma_index_end(coder->index, allocator);
|
---|
1164 | coder->index = lzma_index_init(allocator);
|
---|
1165 | if (coder->index == NULL)
|
---|
1166 | return LZMA_MEM_ERROR;
|
---|
1167 |
|
---|
1168 | // Stream Header
|
---|
1169 | coder->stream_flags.version = 0;
|
---|
1170 | coder->stream_flags.check = options->check;
|
---|
1171 | return_if_error(lzma_stream_header_encode(
|
---|
1172 | &coder->stream_flags, coder->header));
|
---|
1173 |
|
---|
1174 | coder->header_pos = 0;
|
---|
1175 |
|
---|
1176 | // Progress info
|
---|
1177 | coder->progress_in = 0;
|
---|
1178 | coder->progress_out = LZMA_STREAM_HEADER_SIZE;
|
---|
1179 |
|
---|
1180 | return LZMA_OK;
|
---|
1181 | }
|
---|
1182 |
|
---|
1183 |
|
---|
1184 | #ifdef HAVE_SYMBOL_VERSIONS_LINUX
|
---|
1185 | // These are for compatibility with binaries linked against liblzma that
|
---|
1186 | // has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
|
---|
1187 | // Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
|
---|
1188 | // but it has been added here anyway since someone might misread the
|
---|
1189 | // RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
|
---|
1190 | LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
|
---|
1191 | lzma_ret, lzma_stream_encoder_mt_512a)(
|
---|
1192 | lzma_stream *strm, const lzma_mt *options)
|
---|
1193 | lzma_nothrow lzma_attr_warn_unused_result
|
---|
1194 | __attribute__((__alias__("lzma_stream_encoder_mt_52")));
|
---|
1195 |
|
---|
1196 | LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
|
---|
1197 | lzma_ret, lzma_stream_encoder_mt_522)(
|
---|
1198 | lzma_stream *strm, const lzma_mt *options)
|
---|
1199 | lzma_nothrow lzma_attr_warn_unused_result
|
---|
1200 | __attribute__((__alias__("lzma_stream_encoder_mt_52")));
|
---|
1201 |
|
---|
1202 | LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
|
---|
1203 | lzma_ret, lzma_stream_encoder_mt_52)(
|
---|
1204 | lzma_stream *strm, const lzma_mt *options)
|
---|
1205 | lzma_nothrow lzma_attr_warn_unused_result;
|
---|
1206 |
|
---|
1207 | #define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
|
---|
1208 | #endif
|
---|
1209 | extern LZMA_API(lzma_ret)
|
---|
1210 | lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
|
---|
1211 | {
|
---|
1212 | lzma_next_strm_init(stream_encoder_mt_init, strm, options);
|
---|
1213 |
|
---|
1214 | strm->internal->supported_actions[LZMA_RUN] = true;
|
---|
1215 | // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
|
---|
1216 | strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
|
---|
1217 | strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
|
---|
1218 | strm->internal->supported_actions[LZMA_FINISH] = true;
|
---|
1219 |
|
---|
1220 | return LZMA_OK;
|
---|
1221 | }
|
---|
1222 |
|
---|
1223 |
|
---|
1224 | #ifdef HAVE_SYMBOL_VERSIONS_LINUX
|
---|
1225 | LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
|
---|
1226 | uint64_t, lzma_stream_encoder_mt_memusage_512a)(
|
---|
1227 | const lzma_mt *options) lzma_nothrow lzma_attr_pure
|
---|
1228 | __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
|
---|
1229 |
|
---|
1230 | LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
|
---|
1231 | uint64_t, lzma_stream_encoder_mt_memusage_522)(
|
---|
1232 | const lzma_mt *options) lzma_nothrow lzma_attr_pure
|
---|
1233 | __attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
|
---|
1234 |
|
---|
1235 | LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
|
---|
1236 | uint64_t, lzma_stream_encoder_mt_memusage_52)(
|
---|
1237 | const lzma_mt *options) lzma_nothrow lzma_attr_pure;
|
---|
1238 |
|
---|
1239 | #define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
|
---|
1240 | #endif
|
---|
1241 | // This function name is a monster but it's consistent with the older
|
---|
1242 | // monster names. :-( 31 chars is the max that C99 requires so in that
|
---|
1243 | // sense it's not too long. ;-)
|
---|
1244 | extern LZMA_API(uint64_t)
|
---|
1245 | lzma_stream_encoder_mt_memusage(const lzma_mt *options)
|
---|
1246 | {
|
---|
1247 | lzma_options_easy easy;
|
---|
1248 | const lzma_filter *filters;
|
---|
1249 | uint64_t block_size;
|
---|
1250 | uint64_t outbuf_size_max;
|
---|
1251 |
|
---|
1252 | if (get_options(options, &easy, &filters, &block_size,
|
---|
1253 | &outbuf_size_max) != LZMA_OK)
|
---|
1254 | return UINT64_MAX;
|
---|
1255 |
|
---|
1256 | // Memory usage of the input buffers
|
---|
1257 | const uint64_t inbuf_memusage = options->threads * block_size;
|
---|
1258 |
|
---|
1259 | // Memory usage of the filter encoders
|
---|
1260 | uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
|
---|
1261 | if (filters_memusage == UINT64_MAX)
|
---|
1262 | return UINT64_MAX;
|
---|
1263 |
|
---|
1264 | filters_memusage *= options->threads;
|
---|
1265 |
|
---|
1266 | // Memory usage of the output queue
|
---|
1267 | const uint64_t outq_memusage = lzma_outq_memusage(
|
---|
1268 | outbuf_size_max, options->threads);
|
---|
1269 | if (outq_memusage == UINT64_MAX)
|
---|
1270 | return UINT64_MAX;
|
---|
1271 |
|
---|
1272 | // Sum them with overflow checking.
|
---|
1273 | uint64_t total_memusage = LZMA_MEMUSAGE_BASE
|
---|
1274 | + sizeof(lzma_stream_coder)
|
---|
1275 | + options->threads * sizeof(worker_thread);
|
---|
1276 |
|
---|
1277 | if (UINT64_MAX - total_memusage < inbuf_memusage)
|
---|
1278 | return UINT64_MAX;
|
---|
1279 |
|
---|
1280 | total_memusage += inbuf_memusage;
|
---|
1281 |
|
---|
1282 | if (UINT64_MAX - total_memusage < filters_memusage)
|
---|
1283 | return UINT64_MAX;
|
---|
1284 |
|
---|
1285 | total_memusage += filters_memusage;
|
---|
1286 |
|
---|
1287 | if (UINT64_MAX - total_memusage < outq_memusage)
|
---|
1288 | return UINT64_MAX;
|
---|
1289 |
|
---|
1290 | return total_memusage + outq_memusage;
|
---|
1291 | }
|
---|