Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ analyze-mago *args:
src/lib/azure-sdk \
src/lib/doctrine-dbal-bulk \
src/lib/snappy \
src/lib/parquet
src/lib/parquet \
src/core/etl

# Auto-fix code style with Mago (format + lint --fix).
fix:
Expand Down
40 changes: 20 additions & 20 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions mago.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ ignore = [
"src/lib/types/tests/Flow/Types/Tests/Unit/Type/Logical/HTMLTypeTest.php",
"src/lib/types/tests/Flow/Types/Tests/Unit/Type/Native/StringTypeTest.php",
"src/lib/types/tests/Flow/Types/Tests/Unit/Type/TypeDetectorTest.php",
"src/core/etl/src/Flow/ETL/Row/Entry/HTMLEntry.php",
"src/core/etl/src/Flow/ETL/Row/Entry/HTMLElementEntry.php",
] },
]

Expand Down
4 changes: 2 additions & 2 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ assert (!(with-c && with-pg-query-ext)) || builtins.throw "Cannot use --arg with

let
nixpkgs = fetchTarball {
url = "https://github.com/NixOS/nixpkgs/archive/03158cb739fc4a09babe7949900bfc221871d642.tar.gz";
sha256 = "0vl4iwqcpbczhzb3xbckjb86brcsxkcz4wcbqzidia2fyhm9llmx";
url = "https://github.com/NixOS/nixpkgs/archive/0c4e77908e1204498184d81cda8716e1ba4c47af.tar.gz";
sha256 = "0mbr776gj2qk9klvara4zlww3g0da4nfbnrmi114nnmmayx3pyj4";
};

pkgs = import nixpkgs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,7 @@ private function registerCatalogProviders(array $catalogProviders, ContainerBuil
$providerDef = new Definition(ArrayCatalogProvider::class, [$providerConfig['catalog']]);
$providerDef->addTag('flow.postgresql.catalog_provider');
$container->setDefinition("flow.postgresql.catalog_provider.{$i}", $providerDef);
} elseif (
array_key_exists('catalog_provider_id', $providerConfig)
&& $providerConfig['catalog_provider_id'] !== null
) {
} elseif ($providerConfig['catalog_provider_id'] !== null) {
$configProviderServiceIds[] = type_string()->assert($providerConfig['catalog_provider_id']);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private function valueEquals($a, $b): bool
return false;
}

// @mago-ignore analysis:mixed-assignment
foreach ($b as $key => $value) {
if (!array_key_exists($key, $a)) {
return false;
Expand Down
17 changes: 15 additions & 2 deletions src/core/etl/src/Flow/Calculator/Calculator.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,29 @@ public function divide(
$useNewNaming = defined('Brick\Math\RoundingMode::Up');

$brickMode = match ($rounding) {
// @mago-ignore analysis:non-existent-class-constant
Rounding::UP => $useNewNaming ? RoundingMode::Up : RoundingMode::UP,
// @mago-ignore analysis:non-existent-class-constant
Rounding::DOWN => $useNewNaming ? RoundingMode::Down : RoundingMode::DOWN,
// @mago-ignore analysis:non-existent-class-constant
Rounding::CEILING => $useNewNaming ? RoundingMode::Ceiling : RoundingMode::CEILING,
// @mago-ignore analysis:non-existent-class-constant
Rounding::FLOOR => $useNewNaming ? RoundingMode::Floor : RoundingMode::FLOOR,
// @mago-ignore analysis:non-existent-class-constant
Rounding::HALF_UP => $useNewNaming ? RoundingMode::HalfUp : RoundingMode::HALF_UP,
// @mago-ignore analysis:non-existent-class-constant
Rounding::HALF_DOWN => $useNewNaming ? RoundingMode::HalfDown : RoundingMode::HALF_DOWN,
// @mago-ignore analysis:non-existent-class-constant
Rounding::HALF_CEILING => $useNewNaming ? RoundingMode::HalfCeiling : RoundingMode::HALF_CEILING,
// @mago-ignore analysis:non-existent-class-constant
Rounding::HALF_FLOOR => $useNewNaming ? RoundingMode::HalfFloor : RoundingMode::HALF_FLOOR,
// @mago-ignore analysis:non-existent-class-constant
Rounding::HALF_EVEN => $useNewNaming ? RoundingMode::HalfEven : RoundingMode::HALF_EVEN,
// @mago-ignore analysis:non-existent-class-constant
default => $useNewNaming ? RoundingMode::Unnecessary : RoundingMode::UNNECESSARY,
};

// @mago-ignore analysis:possibly-invalid-argument
$result = $aDecimal->dividedBy(BigDecimal::of((string) $b), $effectiveScale, $brickMode);

if (!self::hasNonZeroFractionalPart($result)) {
Expand All @@ -76,9 +87,9 @@ public function divide(

return $result->toFloat();
} catch (DivisionByZeroException $e) {
throw new DivisionByZeroError('Division by zero.', $e->getCode(), $e);
throw new DivisionByZeroError('Division by zero.', (int) $e->getCode(), $e);
} catch (RoundingNecessaryException $e) {
throw new Exception\RoundingNecessaryException($e->getMessage(), $e->getCode(), $e);
throw new Exception\RoundingNecessaryException($e->getMessage(), (int) $e->getCode(), $e);
}
}

Expand Down Expand Up @@ -112,6 +123,7 @@ public function multiply(int|float|string $a, int|float|string $b): int|float
*/
public function power(int|float|string $a, int|string $b): int|float
{
// @mago-ignore analysis:possibly-invalid-argument
$result = BigDecimal::of((string) $a)->power(BigInteger::of((string) $b)->toInt());

if (!self::hasNonZeroFractionalPart($result)) {
Expand Down Expand Up @@ -139,6 +151,7 @@ public function subtract(int|float|string $a, int|float|string $b): int|float
private static function hasNonZeroFractionalPart(BigDecimal $result): bool
{
if (method_exists($result, 'hasNonZeroFractionalPart')) {
// @mago-ignore analysis:mixed-return-statement
return $result->hasNonZeroFractionalPart();
}

Expand Down
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/Calculator/NumberNormalizer.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ final class NumberNormalizer
*/
public static function toNumber(string $number): float|int
{
// @mago-ignore analysis:impossible-condition,redundant-type-comparison
if (!is_numeric($number)) {
throw new NonNumericValueException((string) $number);
}
Expand All @@ -33,6 +34,7 @@ public static function toNumber(string $number): float|int
}

if (str_contains($number, '.')) {
// @mago-ignore analysis:invalid-type-cast
return (float) $number;
}

Expand All @@ -50,6 +52,7 @@ public static function toString(string|float|int $number, int $scale): string
throw new InvalidScaleException($scale);
}

// @mago-ignore analysis:impossible-condition,redundant-type-comparison
if (!is_numeric($number)) {
throw new NonNumericValueException((string) $number);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public function delete(string $key): void

public function get(string $key): Row|Rows|CacheIndex
{
// @mago-ignore analysis:mixed-assignment
$serializedValue = $this->cache->get($key);

if (!$serializedValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public function cache(Cache $cache): self
*/
public function externalSortBucketsCount(int $externalSortBucketsCount): self
{
// @mago-ignore analysis:impossible-condition,redundant-comparison
if ($externalSortBucketsCount < 1) {
throw new InvalidArgumentException('External sort buckets count must be greater than 0');
}
Expand Down
8 changes: 5 additions & 3 deletions src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,22 @@ public function build(EntryFactory $entryFactory = new EntryFactory()): Config
$this->serializer ??= new Base64Serializer(new NativePHPSerializer());
$this->optimizer ??= new Optimizer(new LimitOptimization(), new BatchSizeOptimization(batchSize: 1000));

$serializer = $this->serializer;
$optimizer = $this->optimizer;
$dataframeName = $this->name ?? 'flow_dataframe';

return new Config(
$this->id,
$dataframeName,
$this->version,
$this->serializer,
$serializer,
$this->getClock(),
$this->fstab(),
new FilesystemStreams($this->fstab()),
$this->optimizer,
$optimizer,
$this->putInputIntoRows,
$entryFactory,
$this->cache->build($this->fstab(), $this->serializer, $this->telemetryConfig, $dataframeName),
$this->cache->build($this->fstab(), $serializer, $this->telemetryConfig, $dataframeName),
$this->sort->build(),
$this->analyze,
$this->telemetryConfig ?? TelemetryConfig::default($this->getClock()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public function build(): SortConfig
} else {
$memoryLimit = ini_get('memory_limit');

if ($memoryLimit === '-1') {
if ($memoryLimit === false || $memoryLimit === '-1') {
$this->sortMemoryLimit = Unit::fromBytes(PHP_INT_MAX);
} else {
$this->sortMemoryLimit = Unit::fromString(
Expand Down
19 changes: 12 additions & 7 deletions src/core/etl/src/Flow/ETL/Config/Telemetry/TelemetryContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public function dataFrameBatchProcessed(Rows $rows, FlowContext $context): void
*/
public function dataFrameCompleted(FlowContext $context, array $attributes = []): void
{
if ($this->dataFrameSpan === null) {
$dataFrameSpan = $this->dataFrameSpan;

if ($dataFrameSpan === null) {
return;
}

Expand All @@ -90,7 +92,7 @@ public function dataFrameCompleted(FlowContext $context, array $attributes = [])
'memory_min_mb' => $this->memory->min()->inMb(),
'memory_max_mb' => $this->memory->max()->inMb(),
],
spanContext: $this->dataFrameSpan->context(),
spanContext: $dataFrameSpan->context(),
);

$throughput = 0.0;
Expand All @@ -101,7 +103,7 @@ public function dataFrameCompleted(FlowContext $context, array $attributes = [])
}

$this->tracer->complete(
$this->dataFrameSpan
$dataFrameSpan
->setAttributes(array_merge($attributes, [
'dataframe.id' => $context->config->id(),
'dataframe.name' => $context->config->name(),
Expand Down Expand Up @@ -135,7 +137,9 @@ public function dataFrameCompleted(FlowContext $context, array $attributes = [])
*/
public function dataFrameFailed(FlowContext $context, Throwable $exception, array $attributes = []): void
{
if ($this->dataFrameSpan === null) {
$dataFrameSpan = $this->dataFrameSpan;

if ($dataFrameSpan === null) {
return;
}

Expand All @@ -153,7 +157,7 @@ public function dataFrameFailed(FlowContext $context, Throwable $exception, arra
}

$this->tracer->complete(
$this->dataFrameSpan
$dataFrameSpan
->setAttributes(array_merge($attributes, [
'dataframe.id' => $context->config->id(),
'dataframe.name' => $context->config->name(),
Expand Down Expand Up @@ -185,14 +189,15 @@ public function dataFrameFailed(FlowContext $context, Throwable $exception, arra
public function dataFrameStarted(FlowContext $context): void
{
$this->context = $context;
$this->dataFrameSpan = $this->tracer->span(
$dataFrameSpan = $this->tracer->span(
'DataFrame ' . $context->config->name(),
SpanKind::INTERNAL,
Attributes::create([
'dataframe.id' => $context->config->id(),
'dataframe.name' => $context->config->name(),
]),
);
$this->dataFrameSpan = $dataFrameSpan;

$this->logger()->debug(
'Data frame processing started',
Expand All @@ -215,7 +220,7 @@ public function dataFrameStarted(FlowContext $context): void
$context->config->fstab()->filesystems(),
),
],
spanContext: $this->dataFrameSpan->context(),
spanContext: $dataFrameSpan->context(),
);

if ($this->options->collectMetrics) {
Expand Down
Loading
Loading