v8
V8 is Google’s open source high-performance JavaScript and WebAssembly engine, written in C++.
Loading...
Searching...
No Matches
streaming-decoder.cc
Go to the documentation of this file.
1// Copyright 2017 the V8 project authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
6
7#include <optional>
8
10#include "src/wasm/decoder.h"
11#include "src/wasm/leb-helper.h"
17
18#define TRACE_STREAMING(...) \
19 do { \
20 if (v8_flags.trace_wasm_streaming) PrintF(__VA_ARGS__); \
21 } while (false)
22
23namespace v8::internal::wasm {
24
26 public:
27 explicit AsyncStreamingDecoder(std::unique_ptr<StreamingProcessor> processor);
30
31 void OnBytesReceived(base::Vector<const uint8_t> bytes) override;
32
33 void Finish(bool can_use_compiled_module) override;
34
35 void Abort() override;
36
38 auto& active_processor = processor_ ? processor_ : failed_processor_;
39 active_processor.reset();
41 DCHECK_NULL(failed_processor_);
42 }
43
44 void NotifyNativeModuleCreated(
45 const std::shared_ptr<NativeModule>& native_module) override;
46
47 private:
48 // The SectionBuffer is the data object for the content of a single section.
49 // It stores all bytes of the section (including section id and section
50 // length), and the offset where the actual payload starts.
52 public:
53 // id: The section id.
54 // payload_length: The length of the payload.
55 // length_bytes: The section length, as it is encoded in the module bytes.
56 SectionBuffer(uint32_t module_offset, uint8_t id, size_t payload_length,
57 base::Vector<const uint8_t> length_bytes)
58 : // ID + length + payload
59 module_offset_(module_offset),
60 bytes_(base::OwnedVector<uint8_t>::NewForOverwrite(
61 1 + length_bytes.length() + payload_length)),
62 payload_offset_(1 + length_bytes.length()) {
63 bytes_.begin()[0] = id;
64 memcpy(bytes_.begin() + 1, &length_bytes.first(), length_bytes.length());
65 }
66
68 return static_cast<SectionCode>(bytes_.begin()[0]);
69 }
70
72 DCHECK_LE(module_offset_, ref.offset());
73 uint32_t offset_in_code_buffer = ref.offset() - module_offset_;
74 return bytes().SubVector(offset_in_code_buffer,
75 offset_in_code_buffer + ref.length());
76 }
77
78 std::optional<ModuleWireBytes> GetModuleBytes() const final { return {}; }
79
80 uint32_t module_offset() const { return module_offset_; }
81 base::Vector<uint8_t> bytes() const { return bytes_.as_vector(); }
82 base::Vector<uint8_t> payload() const { return bytes() + payload_offset_; }
83 size_t length() const { return bytes_.size(); }
84 size_t payload_offset() const { return payload_offset_; }
85
86 private:
87 const uint32_t module_offset_;
89 const size_t payload_offset_;
90 };
91
92 // The decoding of a stream of wasm module bytes is organized in states. Each
93 // state provides a buffer to store the bytes required for the current state,
94 // information on how many bytes have already been received, how many bytes
95 // are needed, and a {Next} function which starts the next state once all
96 // bytes of the current state were received.
97 //
98 // The states change according to the following state diagram:
99 //
100 // Start
101 // |
102 // |
103 // v
104 // DecodeModuleHeader
105 // | _________________________________________
106 // | | |
107 // v v |
108 // DecodeSectionID --> DecodeSectionLength --> DecodeSectionPayload
109 // A |
110 // | | (if the section id == code)
111 // | v
112 // | DecodeNumberOfFunctions -- > DecodeFunctionLength
113 // | A |
114 // | | |
115 // | (after all functions were read) | v
116 // ------------------------------------- DecodeFunctionBody
117 //
119 public:
120 virtual ~DecodingState() = default;
121
122 // Reads the bytes for the current state and returns the number of read
123 // bytes.
124 virtual size_t ReadBytes(AsyncStreamingDecoder* streaming,
126
127 // Returns the next state of the streaming decoding.
128 virtual std::unique_ptr<DecodingState> Next(
129 AsyncStreamingDecoder* streaming) = 0;
130 // The buffer to store the received bytes.
132 // The number of bytes which were already received.
133 size_t offset() const { return offset_; }
134 void set_offset(size_t value) { offset_ = value; }
135 // A flag to indicate if finishing the streaming decoder is allowed without
136 // error.
137 virtual bool is_finishing_allowed() const { return false; }
138
139 private:
140 size_t offset_ = 0;
141 };
142
143 // Forward declarations of the concrete states. This is needed so that they
144 // can access private members of the AsyncStreamingDecoder.
145 class DecodeVarInt32;
146 class DecodeModuleHeader;
147 class DecodeSectionID;
152 class DecodeFunctionBody;
153
154 // Creates a buffer for the next section of the module.
155 SectionBuffer* CreateNewBuffer(uint32_t module_offset, uint8_t section_id,
156 size_t length,
157 base::Vector<const uint8_t> length_bytes);
158
159 std::unique_ptr<DecodingState> ToErrorState() {
160 Fail();
161 return nullptr;
162 }
163
165 if (!ok()) return;
166 if (!processor_->ProcessModuleHeader(state_->buffer())) Fail();
167 }
168
170 if (!ok()) return;
171 if (!processor_->ProcessSection(
172 buffer->section_code(), buffer->payload(),
173 buffer->module_offset() +
174 static_cast<uint32_t>(buffer->payload_offset()))) {
175 Fail();
176 }
177 }
178
179 void StartCodeSection(int num_functions,
180 std::shared_ptr<WireBytesStorage> wire_bytes_storage,
181 int code_section_start, int code_section_length) {
182 if (!ok()) return;
183 // The offset passed to {ProcessCodeSectionHeader} is an error offset and
184 // not the start offset of a buffer. Therefore we need the -1 here.
185 if (!processor_->ProcessCodeSectionHeader(
186 num_functions, module_offset() - 1, std::move(wire_bytes_storage),
187 code_section_start, code_section_length)) {
188 Fail();
189 }
190 }
191
193 uint32_t module_offset) {
194 if (!ok()) return;
195 if (!processor_->ProcessFunctionBody(bytes, module_offset)) Fail();
196 }
197
198 void Fail() {
199 // {Fail} cannot be called after {Finish}, {Abort}, or
200 // {NotifyCompilationDiscarded}.
201 DCHECK_EQ(processor_ == nullptr, failed_processor_ != nullptr);
202 if (processor_ != nullptr) failed_processor_ = std::move(processor_);
204 DCHECK_NOT_NULL(failed_processor_);
205 }
206
207 bool ok() const {
208 DCHECK_EQ(processor_ == nullptr, failed_processor_ != nullptr);
209 return processor_ != nullptr;
210 }
211
212 uint32_t module_offset() const { return module_offset_; }
213
214 // As long as we did not detect an invalid module, {processor_} will be set.
215 // On failure, the pointer is transferred to {failed_processor_} and will only
216 // be used for a final callback once all bytes have arrived. Finally, both
217 // {processor_} and {failed_processor_} will be null.
218 std::unique_ptr<StreamingProcessor> processor_;
219 std::unique_ptr<StreamingProcessor> failed_processor_;
220 std::unique_ptr<DecodingState> state_;
221 std::vector<std::shared_ptr<SectionBuffer>> section_buffers_;
222 bool code_section_processed_ = false;
223 uint32_t module_offset_ = 0;
224
225 // Store the full wire bytes in a vector of vectors to avoid having to grow
226 // large vectors (measured up to 100ms delay in 2023-03).
227 // TODO(clemensb): Avoid holding the wire bytes live twice (here and in the
228 // section buffers).
229 std::vector<std::vector<uint8_t>> full_wire_bytes_{{}};
230};
231
233 DCHECK(!full_wire_bytes_.empty());
234 // Fill the previous vector, growing up to 16kB. After that, allocate new
235 // vectors on overflow.
236 size_t remaining_capacity =
237 std::max(full_wire_bytes_.back().capacity(), size_t{16} * KB) -
238 full_wire_bytes_.back().size();
239 size_t bytes_for_existing_vector = std::min(remaining_capacity, bytes.size());
240 full_wire_bytes_.back().insert(full_wire_bytes_.back().end(), bytes.data(),
241 bytes.data() + bytes_for_existing_vector);
242 if (bytes.size() > bytes_for_existing_vector) {
243 // The previous vector's capacity is not enough to hold all new bytes, and
244 // it's bigger than 16kB, so expensive to copy. Allocate a new vector for
245 // the remaining bytes, growing exponentially.
246 size_t new_capacity = std::max(bytes.size() - bytes_for_existing_vector,
247 2 * full_wire_bytes_.back().capacity());
248 full_wire_bytes_.emplace_back();
249 full_wire_bytes_.back().reserve(new_capacity);
250 full_wire_bytes_.back().insert(full_wire_bytes_.back().end(),
251 bytes.data() + bytes_for_existing_vector,
252 bytes.end());
253 }
254
255 if (deserializing()) return;
256
257 TRACE_STREAMING("OnBytesReceived(%zu bytes)\n", bytes.size());
258
259 size_t current = 0;
260 while (ok() && current < bytes.size()) {
261 size_t num_bytes =
262 state_->ReadBytes(this, bytes.SubVector(current, bytes.size()));
263 current += num_bytes;
264 module_offset_ += num_bytes;
265 if (state_->offset() == state_->buffer().size()) {
266 state_ = state_->Next(this);
267 }
268 }
269 if (ok()) {
270 processor_->OnFinishedChunk();
271 }
272}
273
276 base::Vector<uint8_t> remaining_buf = buffer() + offset();
277 size_t num_bytes = std::min(bytes.size(), remaining_buf.size());
278 TRACE_STREAMING("ReadBytes(%zu bytes)\n", num_bytes);
279 memcpy(remaining_buf.begin(), &bytes.first(), num_bytes);
280 set_offset(offset() + num_bytes);
281 return num_bytes;
282}
283
284void AsyncStreamingDecoder::Finish(bool can_use_compiled_module) {
285 TRACE_STREAMING("Finish\n");
286 // {Finish} cannot be called after {Finish}, {Abort}, or
287 // {NotifyCompilationDiscarded}.
288 CHECK_EQ(processor_ == nullptr, failed_processor_ != nullptr);
289
290 // Create a final copy of the overall wire bytes; this will finally be
291 // transferred and stored in the NativeModule.
293 DCHECK_IMPLIES(full_wire_bytes_.back().empty(), full_wire_bytes_.size() == 1);
294 if (!full_wire_bytes_.back().empty()) {
295 size_t total_length = 0;
296 for (auto& bytes : full_wire_bytes_) total_length += bytes.size();
297 if (ok()) {
298 // {DecodeSectionLength} enforces this with graceful error reporting.
299 CHECK_LE(total_length, max_module_size());
300 }
301 auto all_bytes = base::OwnedVector<uint8_t>::NewForOverwrite(total_length);
302 uint8_t* ptr = all_bytes.begin();
303 for (auto& bytes : full_wire_bytes_) {
304 memcpy(ptr, bytes.data(), bytes.size());
305 ptr += bytes.size();
306 }
307 DCHECK_EQ(all_bytes.end(), ptr);
308 bytes_copy = std::move(all_bytes);
309 }
310
311 if (ok() && deserializing()) {
312 // Try to deserialize the module from wire bytes and module bytes.
313 if (can_use_compiled_module &&
315 base::VectorOf(bytes_copy))) {
316 return;
317 }
318
319 // Compiled module bytes are invalidated by can_use_compiled_module = false
320 // or the deserialization failed. Restart decoding using |bytes_copy|.
321 // Reset {full_wire_bytes} to a single empty vector.
322 full_wire_bytes_.assign({{}});
325 OnBytesReceived(base::VectorOf(bytes_copy));
326 // The decoder has received all wire bytes; fall through and finish.
327 }
328
329 if (ok() && !state_->is_finishing_allowed()) {
330 // The byte stream ended too early, we report an error.
331 Fail();
332 }
333
334 // Calling {OnFinishedStream} calls out to JS. Avoid further callbacks (by
335 // aborting the stream) by resetting the processor field before calling
336 // {OnFinishedStream}.
337 const bool failed = !ok();
338 std::unique_ptr<StreamingProcessor> processor =
339 failed ? std::move(failed_processor_) : std::move(processor_);
340 processor->OnFinishedStream(std::move(bytes_copy), failed);
341}
342
344 TRACE_STREAMING("Abort\n");
345 // Ignore {Abort} after {Finish}.
346 if (!processor_ && !failed_processor_) return;
347 Fail();
348 failed_processor_->OnAbort();
349 failed_processor_.reset();
350}
351
352namespace {
353
354class CallMoreFunctionsCanBeSerializedCallback
355 : public CompilationEventCallback {
356 public:
357 CallMoreFunctionsCanBeSerializedCallback(
358 std::weak_ptr<NativeModule> native_module,
360 : native_module_(std::move(native_module)),
361 callback_(std::move(callback)) {
362 // As a baseline we also count the modules that could be cached but
363 // never reach the threshold.
364 if (std::shared_ptr<NativeModule> module = native_module_.lock()) {
365 module->counters()->wasm_cache_count()->AddSample(0);
366 }
367 }
368
369 void call(CompilationEvent event) override {
370 if (event != CompilationEvent::kFinishedCompilationChunk) return;
371 // If the native module is still alive, get back a shared ptr and call the
372 // callback.
373 if (std::shared_ptr<NativeModule> native_module = native_module_.lock()) {
374 native_module->counters()->wasm_cache_count()->AddSample(++cache_count_);
375 callback_(native_module);
376 }
377 }
378
379 ReleaseAfterFinalEvent release_after_final_event() override {
380 return kKeepAfterFinalEvent;
381 }
382
383 private:
384 const std::weak_ptr<NativeModule> native_module_;
385 const AsyncStreamingDecoder::MoreFunctionsCanBeSerializedCallback callback_;
387};
388
389} // namespace
390
392 const std::shared_ptr<NativeModule>& native_module) {
394 auto* comp_state = native_module->compilation_state();
395
396 comp_state->AddCallback(
397 std::make_unique<CallMoreFunctionsCanBeSerializedCallback>(
398 native_module,
401}
402
403// An abstract class to share code among the states which decode VarInts. This
404// class takes over the decoding of the VarInt and then calls the actual decode
405// code with the decoded value.
407 public:
408 explicit DecodeVarInt32(size_t max_value, const char* field_name)
409 : max_value_(max_value), field_name_(field_name) {}
410
412 return base::ArrayVector(byte_buffer_);
413 }
414
415 size_t ReadBytes(AsyncStreamingDecoder* streaming,
416 base::Vector<const uint8_t> bytes) override;
417
418 std::unique_ptr<DecodingState> Next(
419 AsyncStreamingDecoder* streaming) override;
420
421 virtual std::unique_ptr<DecodingState> NextWithValue(
422 AsyncStreamingDecoder* streaming) = 0;
423
424 protected:
425 uint8_t byte_buffer_[kMaxVarInt32Size];
426 // The maximum valid value decoded in this state. {Next} returns an error if
427 // this value is exceeded.
428 const size_t max_value_;
429 const char* const field_name_;
430 size_t value_ = 0;
431 size_t bytes_consumed_ = 0;
432};
433
435 public:
437 return base::ArrayVector(byte_buffer_);
438 }
439
440 std::unique_ptr<DecodingState> Next(
441 AsyncStreamingDecoder* streaming) override;
442
443 private:
444 // Checks if the magic bytes of the module header are correct.
445 void CheckHeader(Decoder* decoder);
446
447 // The size of the module header.
448 static constexpr size_t kModuleHeaderSize = 8;
449 uint8_t byte_buffer_[kModuleHeaderSize];
450};
451
453 public:
454 explicit DecodeSectionID(uint32_t module_offset)
455 : module_offset_(module_offset) {}
456
457 base::Vector<uint8_t> buffer() override { return {&id_, 1}; }
458 bool is_finishing_allowed() const override { return true; }
459
460 std::unique_ptr<DecodingState> Next(
461 AsyncStreamingDecoder* streaming) override;
462
463 private:
464 uint8_t id_ = 0;
465 // The start offset of this section in the module.
466 const uint32_t module_offset_;
467};
468
470 public:
471 explicit DecodeSectionLength(uint8_t id, uint32_t module_offset)
472 : DecodeVarInt32(max_module_size(), "section length"),
473 section_id_(id),
474 module_offset_(module_offset) {}
475
476 std::unique_ptr<DecodingState> NextWithValue(
477 AsyncStreamingDecoder* streaming) override;
478
479 private:
480 const uint8_t section_id_;
481 // The start offset of this section in the module.
482 const uint32_t module_offset_;
483};
484
486 public:
487 explicit DecodeSectionPayload(SectionBuffer* section_buffer)
488 : section_buffer_(section_buffer) {}
489
490 base::Vector<uint8_t> buffer() override { return section_buffer_->payload(); }
491
492 std::unique_ptr<DecodingState> Next(
493 AsyncStreamingDecoder* streaming) override;
494
495 private:
497};
498
500 public:
501 explicit DecodeNumberOfFunctions(SectionBuffer* section_buffer)
502 : DecodeVarInt32(v8_flags.max_wasm_functions, "functions count"),
503 section_buffer_(section_buffer) {}
504
505 std::unique_ptr<DecodingState> NextWithValue(
506 AsyncStreamingDecoder* streaming) override;
507
508 private:
510};
511
513 public:
514 explicit DecodeFunctionLength(SectionBuffer* section_buffer,
515 size_t buffer_offset,
516 size_t num_remaining_functions)
517 : DecodeVarInt32(kV8MaxWasmFunctionSize, "function body size"),
518 section_buffer_(section_buffer),
519 buffer_offset_(buffer_offset),
520 // We are reading a new function, so one function less is remaining.
521 num_remaining_functions_(num_remaining_functions - 1) {
522 DCHECK_GT(num_remaining_functions, 0);
523 }
524
525 std::unique_ptr<DecodingState> NextWithValue(
526 AsyncStreamingDecoder* streaming) override;
527
528 private:
530 const size_t buffer_offset_;
532};
533
535 public:
536 explicit DecodeFunctionBody(SectionBuffer* section_buffer,
537 size_t buffer_offset, size_t function_body_length,
538 size_t num_remaining_functions,
539 uint32_t module_offset)
540 : section_buffer_(section_buffer),
541 buffer_offset_(buffer_offset),
542 function_body_length_(function_body_length),
543 num_remaining_functions_(num_remaining_functions),
544 module_offset_(module_offset) {}
545
547 base::Vector<uint8_t> remaining_buffer =
548 section_buffer_->bytes() + buffer_offset_;
549 return remaining_buffer.SubVector(0, function_body_length_);
550 }
551
552 std::unique_ptr<DecodingState> Next(
553 AsyncStreamingDecoder* streaming) override;
554
555 private:
557 const size_t buffer_offset_;
560 const uint32_t module_offset_;
561};
562
565 base::Vector<uint8_t> buf = buffer();
566 base::Vector<uint8_t> remaining_buf = buf + offset();
567 size_t new_bytes = std::min(bytes.size(), remaining_buf.size());
568 TRACE_STREAMING("ReadBytes of a VarInt\n");
569 memcpy(remaining_buf.begin(), &bytes.first(), new_bytes);
570 buf.Truncate(offset() + new_bytes);
571 Decoder decoder(buf,
572 streaming->module_offset() - static_cast<uint32_t>(offset()));
573 value_ = decoder.consume_u32v(field_name_);
574
575 if (decoder.failed()) {
576 if (new_bytes == remaining_buf.size()) {
577 // We only report an error if we read all bytes.
578 streaming->Fail();
579 }
580 set_offset(offset() + new_bytes);
581 return new_bytes;
582 }
583
584 // The number of bytes we actually needed to read.
585 DCHECK_GT(decoder.pc(), buffer().begin());
586 bytes_consumed_ = static_cast<size_t>(decoder.pc() - buf.begin());
587 TRACE_STREAMING(" ==> %zu bytes consumed\n", bytes_consumed_);
588
589 // We read all the bytes we needed.
590 DCHECK_GT(bytes_consumed_, offset());
591 new_bytes = bytes_consumed_ - offset();
592 // Set the offset to the buffer size to signal that we are at the end of this
593 // section.
594 set_offset(buffer().size());
595 return new_bytes;
596}
597
598std::unique_ptr<AsyncStreamingDecoder::DecodingState>
600 if (!streaming->ok()) return nullptr;
601
602 if (value_ > max_value_) return streaming->ToErrorState();
603
604 return NextWithValue(streaming);
605}
606
607std::unique_ptr<AsyncStreamingDecoder::DecodingState>
609 AsyncStreamingDecoder* streaming) {
610 TRACE_STREAMING("DecodeModuleHeader\n");
611 streaming->ProcessModuleHeader();
612 if (!streaming->ok()) return nullptr;
613 return std::make_unique<DecodeSectionID>(streaming->module_offset());
614}
615
616std::unique_ptr<AsyncStreamingDecoder::DecodingState>
618 TRACE_STREAMING("DecodeSectionID: %u (%s)\n", id_,
619 SectionName(static_cast<SectionCode>(id_)));
620 if (!IsValidSectionCode(id_)) return streaming->ToErrorState();
622 // Explicitly check for multiple code sections as module decoder never
623 // sees the code section and hence cannot track this section.
624 if (streaming->code_section_processed_) return streaming->ToErrorState();
625 streaming->code_section_processed_ = true;
626 }
627 return std::make_unique<DecodeSectionLength>(id_, module_offset_);
628}
629
630std::unique_ptr<AsyncStreamingDecoder::DecodingState>
632 AsyncStreamingDecoder* streaming) {
633 TRACE_STREAMING("DecodeSectionLength(%zu)\n", value_);
634 // Check if this section fits into the overall module length limit.
635 // Note: {this->module_offset_} is the position of the section ID byte,
636 // {streaming->module_offset_} is the start of the section's payload (i.e.
637 // right after the just-decoded section length varint).
638 // The latter can already exceed the max module size, when the previous
639 // section barely fit into it, and this new section's ID or length crossed
640 // the threshold.
641 uint32_t payload_start = streaming->module_offset();
642 size_t max_size = max_module_size();
643 if (payload_start > max_size || max_size - payload_start < value_) {
644 return streaming->ToErrorState();
645 }
646 SectionBuffer* buf =
647 streaming->CreateNewBuffer(module_offset_, section_id_, value_,
648 buffer().SubVector(0, bytes_consumed_));
649 DCHECK_NOT_NULL(buf);
650 if (value_ == 0) {
651 if (section_id_ == SectionCode::kCodeSectionCode) {
652 return streaming->ToErrorState();
653 }
654 // Process section without payload as well, to enforce section order and
655 // other feature checks specific to each individual section.
656 streaming->ProcessSection(buf);
657 if (!streaming->ok()) return nullptr;
658 // There is no payload, we go to the next section immediately.
659 return std::make_unique<DecodeSectionID>(streaming->module_offset_);
660 }
661 if (section_id_ == SectionCode::kCodeSectionCode) {
662 // We reached the code section. All functions of the code section are put
663 // into the same SectionBuffer.
664 return std::make_unique<DecodeNumberOfFunctions>(buf);
665 }
666 return std::make_unique<DecodeSectionPayload>(buf);
667}
668
669std::unique_ptr<AsyncStreamingDecoder::DecodingState>
671 AsyncStreamingDecoder* streaming) {
672 TRACE_STREAMING("DecodeSectionPayload\n");
673 streaming->ProcessSection(section_buffer_);
674 if (!streaming->ok()) return nullptr;
675 return std::make_unique<DecodeSectionID>(streaming->module_offset());
676}
677
678std::unique_ptr<AsyncStreamingDecoder::DecodingState>
680 AsyncStreamingDecoder* streaming) {
681 TRACE_STREAMING("DecodeNumberOfFunctions(%zu)\n", value_);
682 // Copy the bytes we read into the section buffer.
683 base::Vector<uint8_t> payload_buf = section_buffer_->payload();
684 if (payload_buf.size() < bytes_consumed_) return streaming->ToErrorState();
685 memcpy(payload_buf.begin(), buffer().begin(), bytes_consumed_);
686
687 DCHECK_GE(kMaxInt, section_buffer_->module_offset() +
688 section_buffer_->payload_offset());
689 int code_section_start = static_cast<int>(section_buffer_->module_offset() +
690 section_buffer_->payload_offset());
691 DCHECK_GE(kMaxInt, payload_buf.length());
692 int code_section_len = static_cast<int>(payload_buf.length());
694 streaming->StartCodeSection(static_cast<int>(value_),
695 streaming->section_buffers_.back(),
696 code_section_start, code_section_len);
697 if (!streaming->ok()) return nullptr;
698
699 // {value} is the number of functions.
700 if (value_ == 0) {
701 if (payload_buf.size() != bytes_consumed_) {
702 return streaming->ToErrorState();
703 }
704 return std::make_unique<DecodeSectionID>(streaming->module_offset());
705 }
706
707 return std::make_unique<DecodeFunctionLength>(
708 section_buffer_, section_buffer_->payload_offset() + bytes_consumed_,
709 value_);
710}
711
712std::unique_ptr<AsyncStreamingDecoder::DecodingState>
714 AsyncStreamingDecoder* streaming) {
715 TRACE_STREAMING("DecodeFunctionLength(%zu)\n", value_);
716 // Copy the bytes we consumed into the section buffer.
717 base::Vector<uint8_t> fun_length_buffer =
718 section_buffer_->bytes() + buffer_offset_;
719 if (fun_length_buffer.size() < bytes_consumed_) {
720 return streaming->ToErrorState();
721 }
722 memcpy(fun_length_buffer.begin(), buffer().begin(), bytes_consumed_);
723
724 // {value} is the length of the function.
725 if (value_ == 0) return streaming->ToErrorState();
726
727 if (buffer_offset_ + bytes_consumed_ + value_ > section_buffer_->length()) {
728 return streaming->ToErrorState();
729 }
730
731 return std::make_unique<DecodeFunctionBody>(
732 section_buffer_, buffer_offset_ + bytes_consumed_, value_,
733 num_remaining_functions_, streaming->module_offset());
734}
735
736std::unique_ptr<AsyncStreamingDecoder::DecodingState>
738 AsyncStreamingDecoder* streaming) {
739 TRACE_STREAMING("DecodeFunctionBody\n");
740 streaming->ProcessFunctionBody(buffer(), module_offset_);
741 if (!streaming->ok()) return nullptr;
742
743 size_t end_offset = buffer_offset_ + function_body_length_;
744 if (num_remaining_functions_ > 0) {
745 return std::make_unique<DecodeFunctionLength>(section_buffer_, end_offset,
746 num_remaining_functions_);
747 }
748 // We just read the last function body. Continue with the next section.
749 if (end_offset != section_buffer_->length()) {
750 return streaming->ToErrorState();
751 }
752 return std::make_unique<DecodeSectionID>(streaming->module_offset());
753}
754
756 std::unique_ptr<StreamingProcessor> processor)
757 : processor_(std::move(processor)),
758 // A module always starts with a module header.
759 state_(new DecodeModuleHeader()) {}
760
762 uint32_t module_offset, uint8_t section_id, size_t length,
763 base::Vector<const uint8_t> length_bytes) {
764 // Section buffers are allocated in the same order they appear in the module,
765 // they will be processed and later on concatenated in that same order.
766 section_buffers_.emplace_back(std::make_shared<SectionBuffer>(
767 module_offset, section_id, length, length_bytes));
768 return section_buffers_.back().get();
769}
770
771std::unique_ptr<StreamingDecoder> StreamingDecoder::CreateAsyncStreamingDecoder(
772 std::unique_ptr<StreamingProcessor> processor) {
773 return std::make_unique<AsyncStreamingDecoder>(std::move(processor));
774}
775
776} // namespace v8::internal::wasm
777
778#undef TRACE_STREAMING
static OwnedVector< T > NewForOverwrite(size_t size)
Definition vector.h:294
int length() const
Definition vector.h:64
Vector< T > SubVector(size_t from, size_t to) const
Definition vector.h:41
void Truncate(size_t length)
Definition vector.h:120
constexpr size_t size() const
Definition vector.h:70
constexpr T * begin() const
Definition vector.h:96
DecodeFunctionBody(SectionBuffer *section_buffer, size_t buffer_offset, size_t function_body_length, size_t num_remaining_functions, uint32_t module_offset)
std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming) override
DecodeFunctionLength(SectionBuffer *section_buffer, size_t buffer_offset, size_t num_remaining_functions)
std::unique_ptr< DecodingState > NextWithValue(AsyncStreamingDecoder *streaming) override
std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming) override
std::unique_ptr< DecodingState > NextWithValue(AsyncStreamingDecoder *streaming) override
std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming) override
std::unique_ptr< DecodingState > NextWithValue(AsyncStreamingDecoder *streaming) override
std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming) override
size_t ReadBytes(AsyncStreamingDecoder *streaming, base::Vector< const uint8_t > bytes) override
std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming) override
DecodeVarInt32(size_t max_value, const char *field_name)
virtual std::unique_ptr< DecodingState > NextWithValue(AsyncStreamingDecoder *streaming)=0
virtual std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming)=0
virtual size_t ReadBytes(AsyncStreamingDecoder *streaming, base::Vector< const uint8_t > bytes)
SectionBuffer(uint32_t module_offset, uint8_t id, size_t payload_length, base::Vector< const uint8_t > length_bytes)
base::Vector< const uint8_t > GetCode(WireBytesRef ref) const final
std::optional< ModuleWireBytes > GetModuleBytes() const final
void Finish(bool can_use_compiled_module) override
AsyncStreamingDecoder(const AsyncStreamingDecoder &)=delete
AsyncStreamingDecoder & operator=(const AsyncStreamingDecoder &)=delete
void OnBytesReceived(base::Vector< const uint8_t > bytes) override
AsyncStreamingDecoder(std::unique_ptr< StreamingProcessor > processor)
std::unique_ptr< StreamingProcessor > failed_processor_
SectionBuffer * CreateNewBuffer(uint32_t module_offset, uint8_t section_id, size_t length, base::Vector< const uint8_t > length_bytes)
std::unique_ptr< DecodingState > ToErrorState()
std::vector< std::vector< uint8_t > > full_wire_bytes_
void StartCodeSection(int num_functions, std::shared_ptr< WireBytesStorage > wire_bytes_storage, int code_section_start, int code_section_length)
std::unique_ptr< DecodingState > state_
void NotifyNativeModuleCreated(const std::shared_ptr< NativeModule > &native_module) override
std::vector< std::shared_ptr< SectionBuffer > > section_buffers_
void ProcessFunctionBody(base::Vector< const uint8_t > bytes, uint32_t module_offset)
std::unique_ptr< StreamingProcessor > processor_
const uint8_t * pc() const
Definition decoder.h:408
uint32_t consume_u32v(const char *name="var_uint32")
Definition decoder.h:251
MoreFunctionsCanBeSerializedCallback more_functions_can_be_serialized_callback_
std::function< void(const std::shared_ptr< NativeModule > &)> MoreFunctionsCanBeSerializedCallback
static std::unique_ptr< StreamingDecoder > CreateAsyncStreamingDecoder(std::unique_ptr< StreamingProcessor > processor)
base::Vector< const uint8_t > compiled_module_bytes_
Operand const offset_
Register const value_
enum v8::internal::@1270::DeoptimizableCodeIterator::@67 state_
int32_t offset
TNode< Object > callback
std::shared_ptr< NativeModule > native_module_
#define TRACE_STREAMING(...)
ProcessorImpl * processor_
Definition mul-fft.cc:474
STL namespace.
constexpr Vector< T > VectorOf(T *start, size_t size)
Definition vector.h:360
const char * SectionName(SectionCode code)
constexpr size_t kV8MaxWasmFunctionSize
Definition wasm-limits.h:51
bool IsValidSectionCode(uint8_t byte)
constexpr size_t kMaxVarInt32Size
Definition leb-helper.h:20
V8_EXPORT_PRIVATE FlagValues v8_flags
constexpr int kMaxInt
Definition globals.h:374
#define DCHECK_LE(v1, v2)
Definition logging.h:490
#define DCHECK_NULL(val)
Definition logging.h:491
#define CHECK_LE(lhs, rhs)
#define DCHECK_NOT_NULL(val)
Definition logging.h:492
#define DCHECK_IMPLIES(v1, v2)
Definition logging.h:493
#define DCHECK_GE(v1, v2)
Definition logging.h:488
#define CHECK_EQ(lhs, rhs)
#define DCHECK(condition)
Definition logging.h:482
#define DCHECK_EQ(v1, v2)
Definition logging.h:485
#define DCHECK_GT(v1, v2)
Definition logging.h:487
#define V8_EXPORT_PRIVATE
Definition macros.h:460
const AsyncStreamingDecoder::MoreFunctionsCanBeSerializedCallback callback_
int cache_count_
std::unique_ptr< ValueMirror > value