18#define TRACE_STREAMING(...) \
20 if (v8_flags.trace_wasm_streaming) PrintF(__VA_ARGS__); \
33 void Finish(
bool can_use_compiled_module)
override;
35 void Abort()
override;
39 active_processor.reset();
44 void NotifyNativeModuleCreated(
45 const std::shared_ptr<NativeModule>& native_module)
override;
56 SectionBuffer(uint32_t module_offset, uint8_t
id,
size_t payload_length,
59 module_offset_(module_offset),
60 bytes_(
base::OwnedVector<uint8_t>::NewForOverwrite(
61 1 + length_bytes.length() + payload_length)),
62 payload_offset_(1 + length_bytes.length()) {
63 bytes_.begin()[0] =
id;
64 memcpy(bytes_.begin() + 1, &length_bytes.
first(), length_bytes.
length());
73 uint32_t offset_in_code_buffer = ref.offset() - module_offset_;
74 return bytes().SubVector(offset_in_code_buffer,
75 offset_in_code_buffer + ref.length());
83 size_t length()
const {
return bytes_.size(); }
128 virtual std::unique_ptr<DecodingState>
Next(
155 SectionBuffer* CreateNewBuffer(uint32_t module_offset, uint8_t section_id,
180 std::shared_ptr<WireBytesStorage> wire_bytes_storage,
181 int code_section_start,
int code_section_length) {
186 num_functions, module_offset() - 1, std::move(wire_bytes_storage),
187 code_section_start, code_section_length)) {
193 uint32_t module_offset) {
195 if (!
processor_->ProcessFunctionBody(bytes, module_offset)) Fail();
222 bool code_section_processed_ =
false;
223 uint32_t module_offset_ = 0;
229 std::vector<std::vector<uint8_t>> full_wire_bytes_{{}};
236 size_t remaining_capacity =
239 size_t bytes_for_existing_vector = std::min(remaining_capacity, bytes.size());
241 bytes.data() + bytes_for_existing_vector);
242 if (bytes.size() > bytes_for_existing_vector) {
246 size_t new_capacity = std::max(bytes.size() - bytes_for_existing_vector,
251 bytes.data() + bytes_for_existing_vector,
260 while (
ok() && current < bytes.size()) {
262 state_->ReadBytes(
this, bytes.SubVector(current, bytes.size()));
263 current += num_bytes;
277 size_t num_bytes = std::min(bytes.size(), remaining_buf.
size());
279 memcpy(remaining_buf.
begin(), &bytes.first(), num_bytes);
295 size_t total_length = 0;
302 uint8_t* ptr = all_bytes.begin();
304 memcpy(ptr, bytes.data(), bytes.size());
308 bytes_copy = std::move(all_bytes);
313 if (can_use_compiled_module &&
329 if (
ok() && !
state_->is_finishing_allowed()) {
337 const bool failed = !
ok();
338 std::unique_ptr<StreamingProcessor> processor =
340 processor->OnFinishedStream(std::move(bytes_copy), failed);
354class CallMoreFunctionsCanBeSerializedCallback
357 CallMoreFunctionsCanBeSerializedCallback(
358 std::weak_ptr<NativeModule> native_module,
364 if (std::shared_ptr<NativeModule> module =
native_module_.lock()) {
365 module->counters()->wasm_cache_count()->AddSample(0);
369 void call(CompilationEvent event)
override {
370 if (event != CompilationEvent::kFinishedCompilationChunk)
return;
373 if (std::shared_ptr<NativeModule> native_module =
native_module_.lock()) {
374 native_module->counters()->wasm_cache_count()->AddSample(++
cache_count_);
379 ReleaseAfterFinalEvent release_after_final_event()
override {
380 return kKeepAfterFinalEvent;
385 const AsyncStreamingDecoder::MoreFunctionsCanBeSerializedCallback
callback_;
392 const std::shared_ptr<NativeModule>& native_module) {
394 auto* comp_state = native_module->compilation_state();
396 comp_state->AddCallback(
397 std::make_unique<CallMoreFunctionsCanBeSerializedCallback>(
409 : max_value_(max_value), field_name_(field_name) {}
412 return base::ArrayVector(byte_buffer_);
418 std::unique_ptr<DecodingState> Next(
431 size_t bytes_consumed_ = 0;
437 return base::ArrayVector(byte_buffer_);
440 std::unique_ptr<DecodingState> Next(
448 static constexpr size_t kModuleHeaderSize = 8;
449 uint8_t byte_buffer_[kModuleHeaderSize];
455 : module_offset_(module_offset) {}
460 std::unique_ptr<DecodingState> Next(
474 module_offset_(module_offset) {}
476 std::unique_ptr<DecodingState> NextWithValue(
488 : section_buffer_(section_buffer) {}
492 std::unique_ptr<DecodingState> Next(
503 section_buffer_(section_buffer) {}
505 std::unique_ptr<DecodingState> NextWithValue(
515 size_t buffer_offset,
516 size_t num_remaining_functions)
518 section_buffer_(section_buffer),
519 buffer_offset_(buffer_offset),
521 num_remaining_functions_(num_remaining_functions - 1) {
525 std::unique_ptr<DecodingState> NextWithValue(
537 size_t buffer_offset,
size_t function_body_length,
538 size_t num_remaining_functions,
539 uint32_t module_offset)
540 : section_buffer_(section_buffer),
541 buffer_offset_(buffer_offset),
542 function_body_length_(function_body_length),
543 num_remaining_functions_(num_remaining_functions),
544 module_offset_(module_offset) {}
548 section_buffer_->bytes() + buffer_offset_;
549 return remaining_buffer.
SubVector(0, function_body_length_);
552 std::unique_ptr<DecodingState> Next(
567 size_t new_bytes = std::min(bytes.size(), remaining_buf.
size());
569 memcpy(remaining_buf.
begin(), &bytes.first(), new_bytes);
572 streaming->module_offset() -
static_cast<uint32_t
>(
offset()));
576 if (new_bytes == remaining_buf.
size()) {
580 set_offset(
offset() + new_bytes);
586 bytes_consumed_ =
static_cast<size_t>(decoder.
pc() - buf.
begin());
591 new_bytes = bytes_consumed_ -
offset();
594 set_offset(buffer().
size());
598std::unique_ptr<AsyncStreamingDecoder::DecodingState>
600 if (!streaming->ok())
return nullptr;
602 if (
value_ > max_value_)
return streaming->ToErrorState();
604 return NextWithValue(streaming);
607std::unique_ptr<AsyncStreamingDecoder::DecodingState>
611 streaming->ProcessModuleHeader();
612 if (!streaming->ok())
return nullptr;
613 return std::make_unique<DecodeSectionID>(streaming->module_offset());
616std::unique_ptr<AsyncStreamingDecoder::DecodingState>
624 if (streaming->code_section_processed_)
return streaming->ToErrorState();
625 streaming->code_section_processed_ =
true;
627 return std::make_unique<DecodeSectionLength>(id_,
module_offset_);
630std::unique_ptr<AsyncStreamingDecoder::DecodingState>
641 uint32_t payload_start = streaming->module_offset();
643 if (payload_start > max_size || max_size - payload_start <
value_) {
644 return streaming->ToErrorState();
648 buffer().SubVector(0, bytes_consumed_));
652 return streaming->ToErrorState();
656 streaming->ProcessSection(buf);
657 if (!streaming->ok())
return nullptr;
659 return std::make_unique<DecodeSectionID>(streaming->module_offset_);
664 return std::make_unique<DecodeNumberOfFunctions>(buf);
666 return std::make_unique<DecodeSectionPayload>(buf);
669std::unique_ptr<AsyncStreamingDecoder::DecodingState>
673 streaming->ProcessSection(section_buffer_);
674 if (!streaming->ok())
return nullptr;
675 return std::make_unique<DecodeSectionID>(streaming->module_offset());
678std::unique_ptr<AsyncStreamingDecoder::DecodingState>
684 if (payload_buf.
size() < bytes_consumed_)
return streaming->ToErrorState();
685 memcpy(payload_buf.
begin(), buffer().begin(), bytes_consumed_);
688 section_buffer_->payload_offset());
689 int code_section_start =
static_cast<int>(section_buffer_->module_offset() +
690 section_buffer_->payload_offset());
692 int code_section_len =
static_cast<int>(payload_buf.
length());
694 streaming->StartCodeSection(
static_cast<int>(
value_),
695 streaming->section_buffers_.back(),
696 code_section_start, code_section_len);
697 if (!streaming->ok())
return nullptr;
701 if (payload_buf.
size() != bytes_consumed_) {
702 return streaming->ToErrorState();
704 return std::make_unique<DecodeSectionID>(streaming->module_offset());
707 return std::make_unique<DecodeFunctionLength>(
708 section_buffer_, section_buffer_->payload_offset() + bytes_consumed_,
712std::unique_ptr<AsyncStreamingDecoder::DecodingState>
718 section_buffer_->bytes() + buffer_offset_;
719 if (fun_length_buffer.
size() < bytes_consumed_) {
720 return streaming->ToErrorState();
722 memcpy(fun_length_buffer.
begin(), buffer().begin(), bytes_consumed_);
725 if (
value_ == 0)
return streaming->ToErrorState();
727 if (buffer_offset_ + bytes_consumed_ +
value_ > section_buffer_->length()) {
728 return streaming->ToErrorState();
731 return std::make_unique<DecodeFunctionBody>(
732 section_buffer_, buffer_offset_ + bytes_consumed_,
value_,
733 num_remaining_functions_, streaming->module_offset());
736std::unique_ptr<AsyncStreamingDecoder::DecodingState>
741 if (!streaming->ok())
return nullptr;
743 size_t end_offset = buffer_offset_ + function_body_length_;
744 if (num_remaining_functions_ > 0) {
745 return std::make_unique<DecodeFunctionLength>(section_buffer_, end_offset,
746 num_remaining_functions_);
749 if (end_offset != section_buffer_->length()) {
750 return streaming->ToErrorState();
752 return std::make_unique<DecodeSectionID>(streaming->module_offset());
756 std::unique_ptr<StreamingProcessor> processor)
762 uint32_t module_offset, uint8_t section_id,
size_t length,
772 std::unique_ptr<StreamingProcessor> processor) {
773 return std::make_unique<AsyncStreamingDecoder>(std::move(processor));
778#undef TRACE_STREAMING
static OwnedVector< T > NewForOverwrite(size_t size)
Vector< T > SubVector(size_t from, size_t to) const
void Truncate(size_t length)
constexpr size_t size() const
constexpr T * begin() const
const uint32_t module_offset_
const size_t buffer_offset_
base::Vector< uint8_t > buffer() override
SectionBuffer *const section_buffer_
DecodeFunctionBody(SectionBuffer *section_buffer, size_t buffer_offset, size_t function_body_length, size_t num_remaining_functions, uint32_t module_offset)
std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming) override
const size_t num_remaining_functions_
const size_t function_body_length_
const size_t num_remaining_functions_
DecodeFunctionLength(SectionBuffer *section_buffer, size_t buffer_offset, size_t num_remaining_functions)
const size_t buffer_offset_
SectionBuffer *const section_buffer_
std::unique_ptr< DecodingState > NextWithValue(AsyncStreamingDecoder *streaming) override
DecodeNumberOfFunctions(SectionBuffer *section_buffer)
SectionBuffer *const section_buffer_
std::unique_ptr< DecodingState > NextWithValue(AsyncStreamingDecoder *streaming) override
DecodeSectionID(uint32_t module_offset)
const uint32_t module_offset_
std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming) override
bool is_finishing_allowed() const override
base::Vector< uint8_t > buffer() override
DecodeSectionLength(uint8_t id, uint32_t module_offset)
const uint8_t section_id_
const uint32_t module_offset_
std::unique_ptr< DecodingState > NextWithValue(AsyncStreamingDecoder *streaming) override
base::Vector< uint8_t > buffer() override
DecodeSectionPayload(SectionBuffer *section_buffer)
std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming) override
SectionBuffer *const section_buffer_
size_t ReadBytes(AsyncStreamingDecoder *streaming, base::Vector< const uint8_t > bytes) override
std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming) override
const char *const field_name_
base::Vector< uint8_t > buffer() override
DecodeVarInt32(size_t max_value, const char *field_name)
virtual std::unique_ptr< DecodingState > NextWithValue(AsyncStreamingDecoder *streaming)=0
virtual std::unique_ptr< DecodingState > Next(AsyncStreamingDecoder *streaming)=0
virtual bool is_finishing_allowed() const
virtual ~DecodingState()=default
void set_offset(size_t value)
virtual size_t ReadBytes(AsyncStreamingDecoder *streaming, base::Vector< const uint8_t > bytes)
virtual base::Vector< uint8_t > buffer()=0
const base::OwnedVector< uint8_t > bytes_
size_t payload_offset() const
SectionBuffer(uint32_t module_offset, uint8_t id, size_t payload_length, base::Vector< const uint8_t > length_bytes)
base::Vector< const uint8_t > GetCode(WireBytesRef ref) const final
base::Vector< uint8_t > payload() const
std::optional< ModuleWireBytes > GetModuleBytes() const final
uint32_t module_offset() const
SectionCode section_code() const
const size_t payload_offset_
const uint32_t module_offset_
base::Vector< uint8_t > bytes() const
void Finish(bool can_use_compiled_module) override
AsyncStreamingDecoder(const AsyncStreamingDecoder &)=delete
void ProcessModuleHeader()
AsyncStreamingDecoder & operator=(const AsyncStreamingDecoder &)=delete
void OnBytesReceived(base::Vector< const uint8_t > bytes) override
AsyncStreamingDecoder(std::unique_ptr< StreamingProcessor > processor)
std::unique_ptr< StreamingProcessor > failed_processor_
uint32_t module_offset() const
SectionBuffer * CreateNewBuffer(uint32_t module_offset, uint8_t section_id, size_t length, base::Vector< const uint8_t > length_bytes)
std::unique_ptr< DecodingState > ToErrorState()
std::vector< std::vector< uint8_t > > full_wire_bytes_
void StartCodeSection(int num_functions, std::shared_ptr< WireBytesStorage > wire_bytes_storage, int code_section_start, int code_section_length)
std::unique_ptr< DecodingState > state_
void NotifyNativeModuleCreated(const std::shared_ptr< NativeModule > &native_module) override
std::vector< std::shared_ptr< SectionBuffer > > section_buffers_
void ProcessFunctionBody(base::Vector< const uint8_t > bytes, uint32_t module_offset)
std::unique_ptr< StreamingProcessor > processor_
void NotifyCompilationDiscarded() override
void ProcessSection(SectionBuffer *buffer)
const uint8_t * pc() const
uint32_t consume_u32v(const char *name="var_uint32")
MoreFunctionsCanBeSerializedCallback more_functions_can_be_serialized_callback_
bool deserializing() const
std::function< void(const std::shared_ptr< NativeModule > &)> MoreFunctionsCanBeSerializedCallback
static std::unique_ptr< StreamingDecoder > CreateAsyncStreamingDecoder(std::unique_ptr< StreamingProcessor > processor)
base::Vector< const uint8_t > compiled_module_bytes_
enum v8::internal::@1270::DeoptimizableCodeIterator::@67 state_
std::shared_ptr< NativeModule > native_module_
#define TRACE_STREAMING(...)
ProcessorImpl * processor_
constexpr Vector< T > VectorOf(T *start, size_t size)
const char * SectionName(SectionCode code)
constexpr size_t kV8MaxWasmFunctionSize
bool IsValidSectionCode(uint8_t byte)
constexpr size_t kMaxVarInt32Size
V8_EXPORT_PRIVATE FlagValues v8_flags
#define DCHECK_LE(v1, v2)
#define CHECK_LE(lhs, rhs)
#define DCHECK_NOT_NULL(val)
#define DCHECK_IMPLIES(v1, v2)
#define DCHECK_GE(v1, v2)
#define CHECK_EQ(lhs, rhs)
#define DCHECK(condition)
#define DCHECK_EQ(v1, v2)
#define DCHECK_GT(v1, v2)
#define V8_EXPORT_PRIVATE
const AsyncStreamingDecoder::MoreFunctionsCanBeSerializedCallback callback_
std::unique_ptr< ValueMirror > value